rtmp_rs/client/
puller.rs

1//! RTMP stream puller
2//!
3//! High-level API for pulling streams from RTMP servers.
4
5use tokio::sync::mpsc;
6
7use crate::error::Result;
8use crate::media::{AacData, FlvTag, H264Data};
9use crate::protocol::message::RtmpMessage;
10
11use super::config::ClientConfig;
12use super::connector::RtmpConnector;
13
14/// Events from the RTMP puller
15#[derive(Debug)]
16pub enum ClientEvent {
17    /// Connected to server
18    Connected,
19
20    /// Stream metadata received
21    Metadata(std::collections::HashMap<String, crate::amf::AmfValue>),
22
23    /// Video frame received
24    VideoFrame { timestamp: u32, data: H264Data },
25
26    /// Audio frame received
27    AudioFrame { timestamp: u32, data: AacData },
28
29    /// Raw video tag (if configured)
30    VideoTag(FlvTag),
31
32    /// Raw audio tag (if configured)
33    AudioTag(FlvTag),
34
35    /// Stream ended
36    StreamEnd,
37
38    /// Error occurred
39    Error(String),
40
41    /// Disconnected
42    Disconnected,
43}
44
45/// RTMP stream puller
46pub struct RtmpPuller {
47    config: ClientConfig,
48    event_tx: mpsc::Sender<ClientEvent>,
49    event_rx: Option<mpsc::Receiver<ClientEvent>>,
50}
51
52impl RtmpPuller {
53    /// Create a new puller
54    ///
55    /// Returns the puller and a receiver for events.
56    pub fn new(config: ClientConfig) -> (Self, mpsc::Receiver<ClientEvent>) {
57        let (tx, rx) = mpsc::channel(1024);
58
59        let puller = Self {
60            config,
61            event_tx: tx,
62            event_rx: None, // Receiver is returned to caller
63        };
64
65        (puller, rx)
66    }
67
68    /// Take the event receiver (if not already taken)
69    pub fn take_events(&mut self) -> Option<mpsc::Receiver<ClientEvent>> {
70        self.event_rx.take()
71    }
72
73    /// Start pulling the stream
74    pub async fn start(&self) -> Result<()> {
75        let tx = self.event_tx.clone();
76
77        // Connect
78        let mut connector = RtmpConnector::connect(self.config.clone()).await?;
79        let _ = tx.send(ClientEvent::Connected).await;
80
81        // Get stream name from URL
82        let stream_name = self
83            .config
84            .parse_url()
85            .and_then(|u| u.stream_key)
86            .unwrap_or_default();
87
88        // Start playing
89        connector.play(&stream_name).await?;
90
91        // Read messages
92        loop {
93            match connector.read_message().await {
94                Ok(msg) => {
95                    if !self.handle_message(msg, &tx).await {
96                        break;
97                    }
98                }
99                Err(e) => {
100                    let _ = tx.send(ClientEvent::Error(e.to_string())).await;
101                    break;
102                }
103            }
104        }
105
106        let _ = tx.send(ClientEvent::Disconnected).await;
107        Ok(())
108    }
109
110    /// Handle a received message
111    async fn handle_message(&self, msg: RtmpMessage, tx: &mpsc::Sender<ClientEvent>) -> bool {
112        match msg {
113            RtmpMessage::Video { timestamp, data } => {
114                // Send raw tag
115                let tag = FlvTag::video(timestamp, data.clone());
116                let _ = tx.send(ClientEvent::VideoTag(tag)).await;
117
118                // Parse and send frame
119                if data.len() >= 2 && (data[0] & 0x0F) == 7 {
120                    if let Ok(h264) = H264Data::parse(data.slice(1..)) {
121                        let _ = tx
122                            .send(ClientEvent::VideoFrame {
123                                timestamp,
124                                data: h264,
125                            })
126                            .await;
127                    }
128                }
129            }
130
131            RtmpMessage::Audio { timestamp, data } => {
132                // Send raw tag
133                let tag = FlvTag::audio(timestamp, data.clone());
134                let _ = tx.send(ClientEvent::AudioTag(tag)).await;
135
136                // Parse and send frame
137                if data.len() >= 2 && (data[0] >> 4) == 10 {
138                    if let Ok(aac) = AacData::parse(data.slice(1..)) {
139                        let _ = tx
140                            .send(ClientEvent::AudioFrame {
141                                timestamp,
142                                data: aac,
143                            })
144                            .await;
145                    }
146                }
147            }
148
149            RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
150                if data.name == "onMetaData" || data.name == "@setDataFrame" {
151                    if let Some(metadata) = data.values.first().and_then(|v| v.as_object()) {
152                        let _ = tx.send(ClientEvent::Metadata(metadata.clone())).await;
153                    }
154                }
155            }
156
157            RtmpMessage::Command(cmd) if cmd.name == "onStatus" => {
158                if let Some(info) = cmd.arguments.first().and_then(|v| v.as_object()) {
159                    if let Some(code) = info.get("code").and_then(|v| v.as_str()) {
160                        if code == crate::protocol::constants::NS_PLAY_STOP {
161                            let _ = tx.send(ClientEvent::StreamEnd).await;
162                            return false;
163                        }
164                    }
165                }
166            }
167
168            RtmpMessage::UserControl(crate::protocol::message::UserControlEvent::StreamEof(_)) => {
169                let _ = tx.send(ClientEvent::StreamEnd).await;
170                return false;
171            }
172
173            _ => {}
174        }
175
176        true
177    }
178}