rtmp_rs/client/
connector.rs

1//! RTMP client connector
2//!
3//! Low-level client for connecting to RTMP servers.
4
5use std::collections::HashMap;
6
7use bytes::{Buf, Bytes, BytesMut};
8use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
9use tokio::net::TcpStream;
10use tokio::time::timeout;
11
12use crate::amf::AmfValue;
13use crate::error::{Error, Result};
14use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
15use crate::protocol::constants::*;
16use crate::protocol::handshake::{Handshake, HandshakeRole};
17use crate::protocol::message::{Command, RtmpMessage};
18
19use super::config::{ClientConfig, ParsedUrl};
20
21/// RTMP client connector
22pub struct RtmpConnector {
23    config: ClientConfig,
24    parsed_url: ParsedUrl,
25    reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
26    writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
27    read_buf: BytesMut,
28    write_buf: BytesMut,
29    chunk_decoder: ChunkDecoder,
30    chunk_encoder: ChunkEncoder,
31    stream_id: u32,
32}
33
34impl RtmpConnector {
35    /// Connect to an RTMP server
36    pub async fn connect(config: ClientConfig) -> Result<Self> {
37        let parsed_url = config
38            .parse_url()
39            .ok_or_else(|| Error::Config("Invalid RTMP URL".into()))?;
40
41        let addr = format!("{}:{}", parsed_url.host, parsed_url.port);
42
43        let socket = timeout(config.connect_timeout, TcpStream::connect(&addr))
44            .await
45            .map_err(|_| Error::Timeout)?
46            .map_err(Error::Io)?;
47
48        if config.tcp_nodelay {
49            socket.set_nodelay(true)?;
50        }
51
52        let (read_half, write_half) = tokio::io::split(socket);
53
54        let mut connector = Self {
55            config,
56            parsed_url,
57            reader: BufReader::with_capacity(64 * 1024, read_half),
58            writer: BufWriter::with_capacity(64 * 1024, write_half),
59            read_buf: BytesMut::with_capacity(64 * 1024),
60            write_buf: BytesMut::with_capacity(64 * 1024),
61            chunk_decoder: ChunkDecoder::new(),
62            chunk_encoder: ChunkEncoder::new(),
63            stream_id: 0,
64        };
65
66        connector.do_handshake().await?;
67        connector.do_connect().await?;
68
69        Ok(connector)
70    }
71
72    /// Perform handshake
73    async fn do_handshake(&mut self) -> Result<()> {
74        let mut handshake = Handshake::new(HandshakeRole::Client);
75
76        // Send C0C1
77        let c0c1 = handshake.generate_initial().ok_or(Error::Protocol(
78            crate::error::ProtocolError::InvalidChunkHeader,
79        ))?;
80        self.writer.write_all(&c0c1).await?;
81        self.writer.flush().await?;
82
83        // Wait for S0S1S2 and send C2
84        let timeout_duration = self.config.connect_timeout;
85        timeout(timeout_duration, async {
86            loop {
87                let n = self.reader.read_buf(&mut self.read_buf).await?;
88                if n == 0 {
89                    return Err(Error::ConnectionClosed);
90                }
91
92                let mut buf = Bytes::copy_from_slice(&self.read_buf);
93                if let Some(response) = handshake.process(&mut buf)? {
94                    let consumed = self.read_buf.len() - buf.len();
95                    self.read_buf.advance(consumed);
96
97                    self.writer.write_all(&response).await?;
98                    self.writer.flush().await?;
99                }
100
101                if handshake.is_done() {
102                    break;
103                }
104            }
105            Ok::<_, Error>(())
106        })
107        .await
108        .map_err(|_| Error::Timeout)??;
109
110        Ok(())
111    }
112
113    /// Send connect command
114    async fn do_connect(&mut self) -> Result<()> {
115        let mut obj = HashMap::new();
116        obj.insert(
117            "app".to_string(),
118            AmfValue::String(self.parsed_url.app.clone()),
119        );
120        obj.insert("type".to_string(), AmfValue::String("nonprivate".into()));
121        obj.insert(
122            "flashVer".to_string(),
123            AmfValue::String(self.config.flash_ver.clone()),
124        );
125        obj.insert(
126            "tcUrl".to_string(),
127            AmfValue::String(self.config.url.clone()),
128        );
129        obj.insert("fpad".to_string(), AmfValue::Boolean(false));
130        obj.insert("capabilities".to_string(), AmfValue::Number(15.0));
131        obj.insert("audioCodecs".to_string(), AmfValue::Number(3191.0));
132        obj.insert("videoCodecs".to_string(), AmfValue::Number(252.0));
133        obj.insert("videoFunction".to_string(), AmfValue::Number(1.0));
134
135        let cmd = Command {
136            name: CMD_CONNECT.to_string(),
137            transaction_id: 1.0,
138            command_object: AmfValue::Object(obj),
139            arguments: vec![],
140            stream_id: 0,
141        };
142
143        self.send_command(&cmd).await?;
144
145        // Wait for connect result
146        loop {
147            let msg = self.read_message().await?;
148            match msg {
149                RtmpMessage::Command(cmd) if cmd.name == CMD_RESULT => {
150                    // Connect successful
151                    break;
152                }
153                RtmpMessage::Command(cmd) if cmd.name == CMD_ERROR => {
154                    return Err(Error::Rejected("Connect rejected".into()));
155                }
156                RtmpMessage::SetChunkSize(size) => {
157                    self.chunk_decoder.set_chunk_size(size);
158                }
159                RtmpMessage::WindowAckSize(_) | RtmpMessage::SetPeerBandwidth { .. } => {
160                    // Ignore these during connect
161                }
162                _ => {}
163            }
164        }
165
166        // Set our chunk size
167        self.chunk_encoder.set_chunk_size(RECOMMENDED_CHUNK_SIZE);
168        self.send_message(&RtmpMessage::SetChunkSize(RECOMMENDED_CHUNK_SIZE))
169            .await?;
170
171        Ok(())
172    }
173
174    /// Create a stream for publishing or playing
175    pub async fn create_stream(&mut self) -> Result<u32> {
176        let cmd = Command {
177            name: CMD_CREATE_STREAM.to_string(),
178            transaction_id: 2.0,
179            command_object: AmfValue::Null,
180            arguments: vec![],
181            stream_id: 0,
182        };
183
184        self.send_command(&cmd).await?;
185
186        // Wait for result
187        loop {
188            let msg = self.read_message().await?;
189            if let RtmpMessage::Command(result) = msg {
190                if result.name == CMD_RESULT && result.transaction_id == 2.0 {
191                    if let Some(id) = result.arguments.first().and_then(|v| v.as_number()) {
192                        self.stream_id = id as u32;
193                        return Ok(self.stream_id);
194                    }
195                }
196            }
197        }
198    }
199
200    /// Start playing a stream
201    pub async fn play(&mut self, stream_name: &str) -> Result<()> {
202        if self.stream_id == 0 {
203            self.create_stream().await?;
204        }
205
206        // Set buffer length
207        self.send_message(&RtmpMessage::UserControl(
208            crate::protocol::message::UserControlEvent::SetBufferLength {
209                stream_id: self.stream_id,
210                buffer_ms: self.config.buffer_length,
211            },
212        ))
213        .await?;
214
215        // Send play command
216        let cmd = Command {
217            name: CMD_PLAY.to_string(),
218            transaction_id: 0.0,
219            command_object: AmfValue::Null,
220            arguments: vec![
221                AmfValue::String(stream_name.to_string()),
222                AmfValue::Number(-2.0),  // Start: live or recorded
223                AmfValue::Number(-1.0),  // Duration: play until end
224                AmfValue::Boolean(true), // Reset
225            ],
226            stream_id: self.stream_id,
227        };
228
229        self.send_command(&cmd).await?;
230
231        // Wait for onStatus
232        loop {
233            let msg = self.read_message().await?;
234            if let RtmpMessage::Command(status) = msg {
235                if status.name == CMD_ON_STATUS {
236                    if let Some(info) = status.arguments.first().and_then(|v| v.as_object()) {
237                        if let Some(code) = info.get("code").and_then(|v| v.as_str()) {
238                            if code == NS_PLAY_START {
239                                return Ok(());
240                            } else if code.contains("Failed") || code.contains("Error") {
241                                return Err(Error::Rejected(code.to_string()));
242                            }
243                        }
244                    }
245                }
246            }
247        }
248    }
249
250    /// Read the next RTMP message
251    pub async fn read_message(&mut self) -> Result<RtmpMessage> {
252        loop {
253            // Try to decode from buffer
254            while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
255                return RtmpMessage::from_chunk(&chunk);
256            }
257
258            // Need more data
259            let n = self.reader.read_buf(&mut self.read_buf).await?;
260            if n == 0 {
261                return Err(Error::ConnectionClosed);
262            }
263        }
264    }
265
266    /// Send an RTMP message
267    async fn send_message(&mut self, msg: &RtmpMessage) -> Result<()> {
268        let (msg_type, payload) = msg.encode();
269
270        let csid = match msg {
271            RtmpMessage::SetChunkSize(_)
272            | RtmpMessage::WindowAckSize(_)
273            | RtmpMessage::SetPeerBandwidth { .. }
274            | RtmpMessage::UserControl(_) => CSID_PROTOCOL_CONTROL,
275            RtmpMessage::Command(_) | RtmpMessage::CommandAmf3(_) => CSID_COMMAND,
276            _ => CSID_COMMAND,
277        };
278
279        let chunk = RtmpChunk {
280            csid,
281            timestamp: 0,
282            message_type: msg_type,
283            stream_id: 0,
284            payload,
285        };
286
287        self.write_buf.clear();
288        self.chunk_encoder.encode(&chunk, &mut self.write_buf);
289        self.writer.write_all(&self.write_buf).await?;
290        self.writer.flush().await?;
291
292        Ok(())
293    }
294
295    /// Send a command
296    async fn send_command(&mut self, cmd: &Command) -> Result<()> {
297        self.send_message(&RtmpMessage::Command(cmd.clone())).await
298    }
299
300    /// Get the stream ID
301    pub fn stream_id(&self) -> u32 {
302        self.stream_id
303    }
304}