1use tokio::sync::mpsc;
6
7use crate::error::Result;
8use crate::media::{AacData, FlvTag, H264Data};
9use crate::protocol::message::RtmpMessage;
10
11use super::config::ClientConfig;
12use super::connector::RtmpConnector;
13
14#[derive(Debug)]
16pub enum ClientEvent {
17 Connected,
19
20 Metadata(std::collections::HashMap<String, crate::amf::AmfValue>),
22
23 VideoFrame { timestamp: u32, data: H264Data },
25
26 AudioFrame { timestamp: u32, data: AacData },
28
29 VideoTag(FlvTag),
31
32 AudioTag(FlvTag),
34
35 StreamEnd,
37
38 Error(String),
40
41 Disconnected,
43}
44
45pub struct RtmpPuller {
47 config: ClientConfig,
48 event_tx: mpsc::Sender<ClientEvent>,
49 event_rx: Option<mpsc::Receiver<ClientEvent>>,
50}
51
52impl RtmpPuller {
53 pub fn new(config: ClientConfig) -> (Self, mpsc::Receiver<ClientEvent>) {
57 let (tx, rx) = mpsc::channel(1024);
58
59 let puller = Self {
60 config,
61 event_tx: tx,
62 event_rx: None, };
64
65 (puller, rx)
66 }
67
68 pub fn take_events(&mut self) -> Option<mpsc::Receiver<ClientEvent>> {
70 self.event_rx.take()
71 }
72
73 pub async fn start(&self) -> Result<()> {
75 let tx = self.event_tx.clone();
76
77 let mut connector = RtmpConnector::connect(self.config.clone()).await?;
79 let _ = tx.send(ClientEvent::Connected).await;
80
81 let stream_name = self
83 .config
84 .parse_url()
85 .and_then(|u| u.stream_key)
86 .unwrap_or_default();
87
88 connector.play(&stream_name).await?;
90
91 loop {
93 match connector.read_message().await {
94 Ok(msg) => {
95 if !self.handle_message(msg, &tx).await {
96 break;
97 }
98 }
99 Err(e) => {
100 let _ = tx.send(ClientEvent::Error(e.to_string())).await;
101 break;
102 }
103 }
104 }
105
106 let _ = tx.send(ClientEvent::Disconnected).await;
107 Ok(())
108 }
109
110 async fn handle_message(&self, msg: RtmpMessage, tx: &mpsc::Sender<ClientEvent>) -> bool {
112 match msg {
113 RtmpMessage::Video { timestamp, data } => {
114 let tag = FlvTag::video(timestamp, data.clone());
116 let _ = tx.send(ClientEvent::VideoTag(tag)).await;
117
118 if data.len() >= 2 && (data[0] & 0x0F) == 7 {
120 if let Ok(h264) = H264Data::parse(data.slice(1..)) {
121 let _ = tx
122 .send(ClientEvent::VideoFrame {
123 timestamp,
124 data: h264,
125 })
126 .await;
127 }
128 }
129 }
130
131 RtmpMessage::Audio { timestamp, data } => {
132 let tag = FlvTag::audio(timestamp, data.clone());
134 let _ = tx.send(ClientEvent::AudioTag(tag)).await;
135
136 if data.len() >= 2 && (data[0] >> 4) == 10 {
138 if let Ok(aac) = AacData::parse(data.slice(1..)) {
139 let _ = tx
140 .send(ClientEvent::AudioFrame {
141 timestamp,
142 data: aac,
143 })
144 .await;
145 }
146 }
147 }
148
149 RtmpMessage::Data(data) | RtmpMessage::DataAmf3(data) => {
150 if data.name == "onMetaData" || data.name == "@setDataFrame" {
151 if let Some(metadata) = data.values.first().and_then(|v| v.as_object()) {
152 let _ = tx.send(ClientEvent::Metadata(metadata.clone())).await;
153 }
154 }
155 }
156
157 RtmpMessage::Command(cmd) if cmd.name == "onStatus" => {
158 if let Some(info) = cmd.arguments.first().and_then(|v| v.as_object()) {
159 if let Some(code) = info.get("code").and_then(|v| v.as_str()) {
160 if code == crate::protocol::constants::NS_PLAY_STOP {
161 let _ = tx.send(ClientEvent::StreamEnd).await;
162 return false;
163 }
164 }
165 }
166 }
167
168 RtmpMessage::UserControl(crate::protocol::message::UserControlEvent::StreamEof(_)) => {
169 let _ = tx.send(ClientEvent::StreamEnd).await;
170 return false;
171 }
172
173 _ => {}
174 }
175
176 true
177 }
178}