rtmp_rs/client/
connector.rs1use std::collections::HashMap;
6
7use bytes::{Buf, Bytes, BytesMut};
8use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
9use tokio::net::TcpStream;
10use tokio::time::timeout;
11
12use crate::amf::AmfValue;
13use crate::error::{Error, Result};
14use crate::protocol::chunk::{ChunkDecoder, ChunkEncoder, RtmpChunk};
15use crate::protocol::constants::*;
16use crate::protocol::handshake::{Handshake, HandshakeRole};
17use crate::protocol::message::{Command, RtmpMessage};
18
19use super::config::{ClientConfig, ParsedUrl};
20
21pub struct RtmpConnector {
23 config: ClientConfig,
24 parsed_url: ParsedUrl,
25 reader: BufReader<tokio::io::ReadHalf<TcpStream>>,
26 writer: BufWriter<tokio::io::WriteHalf<TcpStream>>,
27 read_buf: BytesMut,
28 write_buf: BytesMut,
29 chunk_decoder: ChunkDecoder,
30 chunk_encoder: ChunkEncoder,
31 stream_id: u32,
32}
33
34impl RtmpConnector {
35 pub async fn connect(config: ClientConfig) -> Result<Self> {
37 let parsed_url = config
38 .parse_url()
39 .ok_or_else(|| Error::Config("Invalid RTMP URL".into()))?;
40
41 let addr = format!("{}:{}", parsed_url.host, parsed_url.port);
42
43 let socket = timeout(config.connect_timeout, TcpStream::connect(&addr))
44 .await
45 .map_err(|_| Error::Timeout)?
46 .map_err(Error::Io)?;
47
48 if config.tcp_nodelay {
49 socket.set_nodelay(true)?;
50 }
51
52 let (read_half, write_half) = tokio::io::split(socket);
53
54 let mut connector = Self {
55 config,
56 parsed_url,
57 reader: BufReader::with_capacity(64 * 1024, read_half),
58 writer: BufWriter::with_capacity(64 * 1024, write_half),
59 read_buf: BytesMut::with_capacity(64 * 1024),
60 write_buf: BytesMut::with_capacity(64 * 1024),
61 chunk_decoder: ChunkDecoder::new(),
62 chunk_encoder: ChunkEncoder::new(),
63 stream_id: 0,
64 };
65
66 connector.do_handshake().await?;
67 connector.do_connect().await?;
68
69 Ok(connector)
70 }
71
72 async fn do_handshake(&mut self) -> Result<()> {
74 let mut handshake = Handshake::new(HandshakeRole::Client);
75
76 let c0c1 = handshake.generate_initial().ok_or(Error::Protocol(
78 crate::error::ProtocolError::InvalidChunkHeader,
79 ))?;
80 self.writer.write_all(&c0c1).await?;
81 self.writer.flush().await?;
82
83 let timeout_duration = self.config.connect_timeout;
85 timeout(timeout_duration, async {
86 loop {
87 let n = self.reader.read_buf(&mut self.read_buf).await?;
88 if n == 0 {
89 return Err(Error::ConnectionClosed);
90 }
91
92 let mut buf = Bytes::copy_from_slice(&self.read_buf);
93 if let Some(response) = handshake.process(&mut buf)? {
94 let consumed = self.read_buf.len() - buf.len();
95 self.read_buf.advance(consumed);
96
97 self.writer.write_all(&response).await?;
98 self.writer.flush().await?;
99 }
100
101 if handshake.is_done() {
102 break;
103 }
104 }
105 Ok::<_, Error>(())
106 })
107 .await
108 .map_err(|_| Error::Timeout)??;
109
110 Ok(())
111 }
112
113 async fn do_connect(&mut self) -> Result<()> {
115 let mut obj = HashMap::new();
116 obj.insert(
117 "app".to_string(),
118 AmfValue::String(self.parsed_url.app.clone()),
119 );
120 obj.insert("type".to_string(), AmfValue::String("nonprivate".into()));
121 obj.insert(
122 "flashVer".to_string(),
123 AmfValue::String(self.config.flash_ver.clone()),
124 );
125 obj.insert(
126 "tcUrl".to_string(),
127 AmfValue::String(self.config.url.clone()),
128 );
129 obj.insert("fpad".to_string(), AmfValue::Boolean(false));
130 obj.insert("capabilities".to_string(), AmfValue::Number(15.0));
131 obj.insert("audioCodecs".to_string(), AmfValue::Number(3191.0));
132 obj.insert("videoCodecs".to_string(), AmfValue::Number(252.0));
133 obj.insert("videoFunction".to_string(), AmfValue::Number(1.0));
134
135 let cmd = Command {
136 name: CMD_CONNECT.to_string(),
137 transaction_id: 1.0,
138 command_object: AmfValue::Object(obj),
139 arguments: vec![],
140 stream_id: 0,
141 };
142
143 self.send_command(&cmd).await?;
144
145 loop {
147 let msg = self.read_message().await?;
148 match msg {
149 RtmpMessage::Command(cmd) if cmd.name == CMD_RESULT => {
150 break;
152 }
153 RtmpMessage::Command(cmd) if cmd.name == CMD_ERROR => {
154 return Err(Error::Rejected("Connect rejected".into()));
155 }
156 RtmpMessage::SetChunkSize(size) => {
157 self.chunk_decoder.set_chunk_size(size);
158 }
159 RtmpMessage::WindowAckSize(_) | RtmpMessage::SetPeerBandwidth { .. } => {
160 }
162 _ => {}
163 }
164 }
165
166 self.chunk_encoder.set_chunk_size(RECOMMENDED_CHUNK_SIZE);
168 self.send_message(&RtmpMessage::SetChunkSize(RECOMMENDED_CHUNK_SIZE))
169 .await?;
170
171 Ok(())
172 }
173
174 pub async fn create_stream(&mut self) -> Result<u32> {
176 let cmd = Command {
177 name: CMD_CREATE_STREAM.to_string(),
178 transaction_id: 2.0,
179 command_object: AmfValue::Null,
180 arguments: vec![],
181 stream_id: 0,
182 };
183
184 self.send_command(&cmd).await?;
185
186 loop {
188 let msg = self.read_message().await?;
189 if let RtmpMessage::Command(result) = msg {
190 if result.name == CMD_RESULT && result.transaction_id == 2.0 {
191 if let Some(id) = result.arguments.first().and_then(|v| v.as_number()) {
192 self.stream_id = id as u32;
193 return Ok(self.stream_id);
194 }
195 }
196 }
197 }
198 }
199
200 pub async fn play(&mut self, stream_name: &str) -> Result<()> {
202 if self.stream_id == 0 {
203 self.create_stream().await?;
204 }
205
206 self.send_message(&RtmpMessage::UserControl(
208 crate::protocol::message::UserControlEvent::SetBufferLength {
209 stream_id: self.stream_id,
210 buffer_ms: self.config.buffer_length,
211 },
212 ))
213 .await?;
214
215 let cmd = Command {
217 name: CMD_PLAY.to_string(),
218 transaction_id: 0.0,
219 command_object: AmfValue::Null,
220 arguments: vec![
221 AmfValue::String(stream_name.to_string()),
222 AmfValue::Number(-2.0), AmfValue::Number(-1.0), AmfValue::Boolean(true), ],
226 stream_id: self.stream_id,
227 };
228
229 self.send_command(&cmd).await?;
230
231 loop {
233 let msg = self.read_message().await?;
234 if let RtmpMessage::Command(status) = msg {
235 if status.name == CMD_ON_STATUS {
236 if let Some(info) = status.arguments.first().and_then(|v| v.as_object()) {
237 if let Some(code) = info.get("code").and_then(|v| v.as_str()) {
238 if code == NS_PLAY_START {
239 return Ok(());
240 } else if code.contains("Failed") || code.contains("Error") {
241 return Err(Error::Rejected(code.to_string()));
242 }
243 }
244 }
245 }
246 }
247 }
248 }
249
250 pub async fn read_message(&mut self) -> Result<RtmpMessage> {
252 loop {
253 while let Some(chunk) = self.chunk_decoder.decode(&mut self.read_buf)? {
255 return RtmpMessage::from_chunk(&chunk);
256 }
257
258 let n = self.reader.read_buf(&mut self.read_buf).await?;
260 if n == 0 {
261 return Err(Error::ConnectionClosed);
262 }
263 }
264 }
265
266 async fn send_message(&mut self, msg: &RtmpMessage) -> Result<()> {
268 let (msg_type, payload) = msg.encode();
269
270 let csid = match msg {
271 RtmpMessage::SetChunkSize(_)
272 | RtmpMessage::WindowAckSize(_)
273 | RtmpMessage::SetPeerBandwidth { .. }
274 | RtmpMessage::UserControl(_) => CSID_PROTOCOL_CONTROL,
275 RtmpMessage::Command(_) | RtmpMessage::CommandAmf3(_) => CSID_COMMAND,
276 _ => CSID_COMMAND,
277 };
278
279 let chunk = RtmpChunk {
280 csid,
281 timestamp: 0,
282 message_type: msg_type,
283 stream_id: 0,
284 payload,
285 };
286
287 self.write_buf.clear();
288 self.chunk_encoder.encode(&chunk, &mut self.write_buf);
289 self.writer.write_all(&self.write_buf).await?;
290 self.writer.flush().await?;
291
292 Ok(())
293 }
294
295 async fn send_command(&mut self, cmd: &Command) -> Result<()> {
297 self.send_message(&RtmpMessage::Command(cmd.clone())).await
298 }
299
300 pub fn stream_id(&self) -> u32 {
302 self.stream_id
303 }
304}