1use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::collections::HashMap;
13
14use crate::amf::{Amf0Decoder, Amf0Encoder, AmfValue};
15use crate::error::{AmfError, ProtocolError, Result};
16use crate::protocol::chunk::RtmpChunk;
17use crate::protocol::constants::*;
18
19#[derive(Debug, Clone)]
21pub enum RtmpMessage {
22 SetChunkSize(u32),
24
25 Abort { csid: u32 },
27
28 Acknowledgement { sequence: u32 },
30
31 UserControl(UserControlEvent),
33
34 WindowAckSize(u32),
36
37 SetPeerBandwidth { size: u32, limit_type: u8 },
39
40 Audio { timestamp: u32, data: Bytes },
42
43 Video { timestamp: u32, data: Bytes },
45
46 Command(Command),
48
49 Data(DataMessage),
51
52 CommandAmf3(Command),
54
55 DataAmf3(DataMessage),
57
58 Aggregate { data: Bytes },
60
61 Unknown { type_id: u8, data: Bytes },
63}
64
65#[derive(Debug, Clone)]
67pub enum UserControlEvent {
68 StreamBegin(u32),
69 StreamEof(u32),
70 StreamDry(u32),
71 SetBufferLength { stream_id: u32, buffer_ms: u32 },
72 StreamIsRecorded(u32),
73 PingRequest(u32),
74 PingResponse(u32),
75 Unknown { event_type: u16, data: Bytes },
76}
77
78#[derive(Debug, Clone)]
80pub struct Command {
81 pub name: String,
83 pub transaction_id: f64,
85 pub command_object: AmfValue,
87 pub arguments: Vec<AmfValue>,
89 pub stream_id: u32,
91}
92
93#[derive(Debug, Clone)]
95pub struct DataMessage {
96 pub name: String,
98 pub values: Vec<AmfValue>,
100 pub stream_id: u32,
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct ConnectParams {
107 pub app: String,
109 pub flash_ver: Option<String>,
111 pub swf_url: Option<String>,
113 pub tc_url: Option<String>,
115 pub fpad: bool,
117 pub audio_codecs: u32,
119 pub video_codecs: u32,
121 pub video_function: u32,
123 pub page_url: Option<String>,
125 pub object_encoding: f64,
127 pub extra: HashMap<String, AmfValue>,
129}
130
131impl ConnectParams {
132 pub fn from_amf(obj: &AmfValue) -> Self {
134 let mut params = ConnectParams::default();
135
136 if let Some(map) = obj.as_object() {
137 for (key, value) in map {
138 match key.as_str() {
139 "app" => {
140 if let Some(s) = value.as_str() {
141 params.app = s.to_string();
142 }
143 }
144 "flashVer" | "flashver" => {
145 params.flash_ver = value.as_str().map(|s| s.to_string());
146 }
147 "swfUrl" | "swfurl" => {
148 params.swf_url = value.as_str().map(|s| s.to_string());
149 }
150 "tcUrl" | "tcurl" => {
151 params.tc_url = value.as_str().map(|s| s.to_string());
152 }
153 "fpad" => {
154 params.fpad = value.as_bool().unwrap_or(false);
155 }
156 "audioCodecs" | "audiocodecs" => {
157 params.audio_codecs = value.as_number().unwrap_or(0.0) as u32;
158 }
159 "videoCodecs" | "videocodecs" => {
160 params.video_codecs = value.as_number().unwrap_or(0.0) as u32;
161 }
162 "videoFunction" | "videofunction" => {
163 params.video_function = value.as_number().unwrap_or(0.0) as u32;
164 }
165 "pageUrl" | "pageurl" => {
166 params.page_url = value.as_str().map(|s| s.to_string());
167 }
168 "objectEncoding" | "objectencoding" => {
169 params.object_encoding = value.as_number().unwrap_or(0.0);
170 }
171 _ => {
172 params.extra.insert(key.clone(), value.clone());
173 }
174 }
175 }
176 }
177
178 params
179 }
180}
181
182#[derive(Debug, Clone)]
184pub struct PublishParams {
185 pub stream_key: String,
187 pub publish_type: String,
189 pub stream_id: u32,
191}
192
193#[derive(Debug, Clone)]
195pub struct PlayParams {
196 pub stream_name: String,
198 pub start: f64,
200 pub duration: f64,
202 pub reset: bool,
204 pub stream_id: u32,
206}
207
208impl RtmpMessage {
209 pub fn from_chunk(chunk: &RtmpChunk) -> Result<Self> {
211 let mut payload = chunk.payload.clone();
212
213 match chunk.message_type {
214 MSG_SET_CHUNK_SIZE => {
215 if payload.len() < 4 {
216 return Err(ProtocolError::InvalidChunkHeader.into());
217 }
218 let size = payload.get_u32() & 0x7FFFFFFF; Ok(RtmpMessage::SetChunkSize(size))
220 }
221
222 MSG_ABORT => {
223 if payload.len() < 4 {
224 return Err(ProtocolError::InvalidChunkHeader.into());
225 }
226 Ok(RtmpMessage::Abort {
227 csid: payload.get_u32(),
228 })
229 }
230
231 MSG_ACKNOWLEDGEMENT => {
232 if payload.len() < 4 {
233 return Err(ProtocolError::InvalidChunkHeader.into());
234 }
235 Ok(RtmpMessage::Acknowledgement {
236 sequence: payload.get_u32(),
237 })
238 }
239
240 MSG_USER_CONTROL => Self::parse_user_control(&mut payload),
241
242 MSG_WINDOW_ACK_SIZE => {
243 if payload.len() < 4 {
244 return Err(ProtocolError::InvalidChunkHeader.into());
245 }
246 Ok(RtmpMessage::WindowAckSize(payload.get_u32()))
247 }
248
249 MSG_SET_PEER_BANDWIDTH => {
250 if payload.len() < 5 {
251 return Err(ProtocolError::InvalidChunkHeader.into());
252 }
253 let size = payload.get_u32();
254 let limit_type = payload.get_u8();
255 Ok(RtmpMessage::SetPeerBandwidth { size, limit_type })
256 }
257
258 MSG_AUDIO => Ok(RtmpMessage::Audio {
259 timestamp: chunk.timestamp,
260 data: payload,
261 }),
262
263 MSG_VIDEO => Ok(RtmpMessage::Video {
264 timestamp: chunk.timestamp,
265 data: payload,
266 }),
267
268 MSG_COMMAND_AMF0 => {
269 let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
270 Ok(RtmpMessage::Command(cmd))
271 }
272
273 MSG_COMMAND_AMF3 => {
274 if !payload.is_empty() && payload[0] == 0x00 {
276 payload.advance(1);
277 }
278 let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
279 Ok(RtmpMessage::CommandAmf3(cmd))
280 }
281
282 MSG_DATA_AMF0 => {
283 let data = Self::parse_data(&mut payload, chunk.stream_id)?;
284 Ok(RtmpMessage::Data(data))
285 }
286
287 MSG_DATA_AMF3 => {
288 if !payload.is_empty() && payload[0] == 0x00 {
289 payload.advance(1);
290 }
291 let data = Self::parse_data(&mut payload, chunk.stream_id)?;
292 Ok(RtmpMessage::DataAmf3(data))
293 }
294
295 MSG_AGGREGATE => Ok(RtmpMessage::Aggregate { data: payload }),
296
297 _ => Ok(RtmpMessage::Unknown {
298 type_id: chunk.message_type,
299 data: payload,
300 }),
301 }
302 }
303
304 fn parse_user_control(payload: &mut Bytes) -> Result<Self> {
306 if payload.len() < 6 {
307 return Err(ProtocolError::InvalidChunkHeader.into());
308 }
309
310 let event_type = payload.get_u16();
311 let event = match event_type {
312 UC_STREAM_BEGIN => UserControlEvent::StreamBegin(payload.get_u32()),
313 UC_STREAM_EOF => UserControlEvent::StreamEof(payload.get_u32()),
314 UC_STREAM_DRY => UserControlEvent::StreamDry(payload.get_u32()),
315 UC_SET_BUFFER_LENGTH => {
316 if payload.len() < 8 {
317 return Err(ProtocolError::InvalidChunkHeader.into());
318 }
319 let stream_id = payload.get_u32();
320 let buffer_ms = payload.get_u32();
321 UserControlEvent::SetBufferLength {
322 stream_id,
323 buffer_ms,
324 }
325 }
326 UC_STREAM_IS_RECORDED => UserControlEvent::StreamIsRecorded(payload.get_u32()),
327 UC_PING_REQUEST => UserControlEvent::PingRequest(payload.get_u32()),
328 UC_PING_RESPONSE => UserControlEvent::PingResponse(payload.get_u32()),
329 _ => UserControlEvent::Unknown {
330 event_type,
331 data: payload.clone(),
332 },
333 };
334
335 Ok(RtmpMessage::UserControl(event))
336 }
337
338 fn parse_command(payload: &mut Bytes, stream_id: u32) -> Result<Command> {
340 let mut decoder = Amf0Decoder::new();
341
342 let name = match decoder.decode(payload)? {
344 AmfValue::String(s) => s,
345 _ => return Err(ProtocolError::InvalidCommand("Expected command name".into()).into()),
346 };
347
348 let transaction_id = match decoder.decode(payload)? {
350 AmfValue::Number(n) => n,
351 _ => 0.0, };
353
354 let command_object = if payload.has_remaining() {
356 decoder.decode(payload)?
357 } else {
358 AmfValue::Null
359 };
360
361 let mut arguments = Vec::new();
363 while payload.has_remaining() {
364 match decoder.decode(payload) {
365 Ok(v) => arguments.push(v),
366 Err(AmfError::UnexpectedEof) => break,
367 Err(e) => return Err(e.into()),
368 }
369 }
370
371 Ok(Command {
372 name,
373 transaction_id,
374 command_object,
375 arguments,
376 stream_id,
377 })
378 }
379
380 fn parse_data(payload: &mut Bytes, stream_id: u32) -> Result<DataMessage> {
382 let mut decoder = Amf0Decoder::new();
383
384 let name = match decoder.decode(payload)? {
386 AmfValue::String(s) => s,
387 _ => String::new(), };
389
390 let mut values = Vec::new();
392 while payload.has_remaining() {
393 match decoder.decode(payload) {
394 Ok(v) => values.push(v),
395 Err(AmfError::UnexpectedEof) => break,
396 Err(e) => return Err(e.into()),
397 }
398 }
399
400 Ok(DataMessage {
401 name,
402 values,
403 stream_id,
404 })
405 }
406
407 pub fn encode(&self) -> (u8, Bytes) {
409 match self {
410 RtmpMessage::SetChunkSize(size) => {
411 let mut buf = BytesMut::with_capacity(4);
412 buf.put_u32(*size);
413 (MSG_SET_CHUNK_SIZE, buf.freeze())
414 }
415
416 RtmpMessage::Abort { csid } => {
417 let mut buf = BytesMut::with_capacity(4);
418 buf.put_u32(*csid);
419 (MSG_ABORT, buf.freeze())
420 }
421
422 RtmpMessage::Acknowledgement { sequence } => {
423 let mut buf = BytesMut::with_capacity(4);
424 buf.put_u32(*sequence);
425 (MSG_ACKNOWLEDGEMENT, buf.freeze())
426 }
427
428 RtmpMessage::WindowAckSize(size) => {
429 let mut buf = BytesMut::with_capacity(4);
430 buf.put_u32(*size);
431 (MSG_WINDOW_ACK_SIZE, buf.freeze())
432 }
433
434 RtmpMessage::SetPeerBandwidth { size, limit_type } => {
435 let mut buf = BytesMut::with_capacity(5);
436 buf.put_u32(*size);
437 buf.put_u8(*limit_type);
438 (MSG_SET_PEER_BANDWIDTH, buf.freeze())
439 }
440
441 RtmpMessage::UserControl(event) => {
442 let mut buf = BytesMut::with_capacity(10);
443 match event {
444 UserControlEvent::StreamBegin(id) => {
445 buf.put_u16(UC_STREAM_BEGIN);
446 buf.put_u32(*id);
447 }
448 UserControlEvent::StreamEof(id) => {
449 buf.put_u16(UC_STREAM_EOF);
450 buf.put_u32(*id);
451 }
452 UserControlEvent::StreamDry(id) => {
453 buf.put_u16(UC_STREAM_DRY);
454 buf.put_u32(*id);
455 }
456 UserControlEvent::SetBufferLength {
457 stream_id,
458 buffer_ms,
459 } => {
460 buf.put_u16(UC_SET_BUFFER_LENGTH);
461 buf.put_u32(*stream_id);
462 buf.put_u32(*buffer_ms);
463 }
464 UserControlEvent::StreamIsRecorded(id) => {
465 buf.put_u16(UC_STREAM_IS_RECORDED);
466 buf.put_u32(*id);
467 }
468 UserControlEvent::PingRequest(ts) => {
469 buf.put_u16(UC_PING_REQUEST);
470 buf.put_u32(*ts);
471 }
472 UserControlEvent::PingResponse(ts) => {
473 buf.put_u16(UC_PING_RESPONSE);
474 buf.put_u32(*ts);
475 }
476 UserControlEvent::Unknown { event_type, data } => {
477 buf.put_u16(*event_type);
478 buf.put_slice(data);
479 }
480 }
481 (MSG_USER_CONTROL, buf.freeze())
482 }
483
484 RtmpMessage::Audio { data, .. } => (MSG_AUDIO, data.clone()),
485
486 RtmpMessage::Video { data, .. } => (MSG_VIDEO, data.clone()),
487
488 RtmpMessage::Command(cmd) => {
489 let payload = encode_command(cmd);
490 (MSG_COMMAND_AMF0, payload)
491 }
492
493 RtmpMessage::CommandAmf3(cmd) => {
494 let mut buf = BytesMut::new();
495 buf.put_u8(0x00); buf.put_slice(&encode_command(cmd));
497 (MSG_COMMAND_AMF3, buf.freeze())
498 }
499
500 RtmpMessage::Data(data) => {
501 let payload = encode_data(data);
502 (MSG_DATA_AMF0, payload)
503 }
504
505 RtmpMessage::DataAmf3(data) => {
506 let mut buf = BytesMut::new();
507 buf.put_u8(0x00);
508 buf.put_slice(&encode_data(data));
509 (MSG_DATA_AMF3, buf.freeze())
510 }
511
512 RtmpMessage::Aggregate { data } => (MSG_AGGREGATE, data.clone()),
513
514 RtmpMessage::Unknown { type_id, data } => (*type_id, data.clone()),
515 }
516 }
517}
518
519fn encode_command(cmd: &Command) -> Bytes {
521 let mut encoder = Amf0Encoder::new();
522 encoder.encode(&AmfValue::String(cmd.name.clone()));
523 encoder.encode(&AmfValue::Number(cmd.transaction_id));
524 encoder.encode(&cmd.command_object);
525 for arg in &cmd.arguments {
526 encoder.encode(arg);
527 }
528 encoder.finish()
529}
530
531fn encode_data(data: &DataMessage) -> Bytes {
533 let mut encoder = Amf0Encoder::new();
534 encoder.encode(&AmfValue::String(data.name.clone()));
535 for value in &data.values {
536 encoder.encode(value);
537 }
538 encoder.finish()
539}
540
541impl Command {
543 pub fn result(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
545 Command {
546 name: CMD_RESULT.to_string(),
547 transaction_id,
548 command_object: properties,
549 arguments: vec![info],
550 stream_id: 0,
551 }
552 }
553
554 pub fn error(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
556 Command {
557 name: CMD_ERROR.to_string(),
558 transaction_id,
559 command_object: properties,
560 arguments: vec![info],
561 stream_id: 0,
562 }
563 }
564
565 pub fn on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Self {
567 let mut info = HashMap::new();
568 info.insert("level".to_string(), AmfValue::String(level.to_string()));
569 info.insert("code".to_string(), AmfValue::String(code.to_string()));
570 info.insert(
571 "description".to_string(),
572 AmfValue::String(description.to_string()),
573 );
574
575 Command {
576 name: CMD_ON_STATUS.to_string(),
577 transaction_id: 0.0,
578 command_object: AmfValue::Null,
579 arguments: vec![AmfValue::Object(info)],
580 stream_id,
581 }
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588
589 #[test]
590 fn test_connect_params_parsing() {
591 let mut obj = HashMap::new();
592 obj.insert("app".to_string(), AmfValue::String("live".into()));
593 obj.insert(
594 "tcUrl".to_string(),
595 AmfValue::String("rtmp://localhost/live".into()),
596 );
597 obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
598
599 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
600 assert_eq!(params.app, "live");
601 assert_eq!(params.tc_url, Some("rtmp://localhost/live".into()));
602 assert_eq!(params.object_encoding, 0.0);
603 }
604
605 #[test]
606 fn test_command_roundtrip() {
607 let cmd = Command {
608 name: "connect".to_string(),
609 transaction_id: 1.0,
610 command_object: AmfValue::Null,
611 arguments: vec![AmfValue::String("test".into())],
612 stream_id: 0,
613 };
614
615 let payload = encode_command(&cmd);
616 let chunk = RtmpChunk {
617 csid: CSID_COMMAND,
618 timestamp: 0,
619 message_type: MSG_COMMAND_AMF0,
620 stream_id: 0,
621 payload,
622 };
623
624 let parsed = RtmpMessage::from_chunk(&chunk).unwrap();
625 if let RtmpMessage::Command(parsed_cmd) = parsed {
626 assert_eq!(parsed_cmd.name, "connect");
627 assert_eq!(parsed_cmd.transaction_id, 1.0);
628 } else {
629 panic!("Expected Command message");
630 }
631 }
632}