1use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::collections::HashMap;
13
14use crate::amf::{Amf0Decoder, Amf0Encoder, AmfValue};
15use crate::error::{AmfError, ProtocolError, Result};
16use crate::protocol::chunk::RtmpChunk;
17use crate::protocol::constants::*;
18
19#[derive(Debug, Clone)]
21pub enum RtmpMessage {
22 SetChunkSize(u32),
24
25 Abort { csid: u32 },
27
28 Acknowledgement { sequence: u32 },
30
31 UserControl(UserControlEvent),
33
34 WindowAckSize(u32),
36
37 SetPeerBandwidth { size: u32, limit_type: u8 },
39
40 Audio { timestamp: u32, data: Bytes },
42
43 Video { timestamp: u32, data: Bytes },
45
46 Command(Command),
48
49 Data(DataMessage),
51
52 CommandAmf3(Command),
54
55 DataAmf3(DataMessage),
57
58 Aggregate { data: Bytes },
60
61 Unknown { type_id: u8, data: Bytes },
63}
64
65#[derive(Debug, Clone)]
67pub enum UserControlEvent {
68 StreamBegin(u32),
69 StreamEof(u32),
70 StreamDry(u32),
71 SetBufferLength { stream_id: u32, buffer_ms: u32 },
72 StreamIsRecorded(u32),
73 PingRequest(u32),
74 PingResponse(u32),
75 Unknown { event_type: u16, data: Bytes },
76}
77
78#[derive(Debug, Clone)]
80pub struct Command {
81 pub name: String,
83 pub transaction_id: f64,
85 pub command_object: AmfValue,
87 pub arguments: Vec<AmfValue>,
89 pub stream_id: u32,
91}
92
93#[derive(Debug, Clone)]
95pub struct DataMessage {
96 pub name: String,
98 pub values: Vec<AmfValue>,
100 pub stream_id: u32,
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct ConnectParams {
107 pub app: String,
109 pub flash_ver: Option<String>,
111 pub swf_url: Option<String>,
113 pub tc_url: Option<String>,
115 pub fpad: bool,
117 pub audio_codecs: u32,
119 pub video_codecs: u32,
121 pub video_function: u32,
123 pub page_url: Option<String>,
125 pub object_encoding: f64,
127 pub extra: HashMap<String, AmfValue>,
129}
130
131impl ConnectParams {
132 pub fn from_amf(obj: &AmfValue) -> Self {
134 let mut params = ConnectParams::default();
135
136 if let Some(map) = obj.as_object() {
137 for (key, value) in map {
138 match key.as_str() {
139 "app" => {
140 if let Some(s) = value.as_str() {
141 params.app = s.to_string();
142 }
143 }
144 "flashVer" | "flashver" => {
145 params.flash_ver = value.as_str().map(|s| s.to_string());
146 }
147 "swfUrl" | "swfurl" => {
148 params.swf_url = value.as_str().map(|s| s.to_string());
149 }
150 "tcUrl" | "tcurl" => {
151 params.tc_url = value.as_str().map(|s| s.to_string());
152 }
153 "fpad" => {
154 params.fpad = value.as_bool().unwrap_or(false);
155 }
156 "audioCodecs" | "audiocodecs" => {
157 params.audio_codecs = value.as_number().unwrap_or(0.0) as u32;
158 }
159 "videoCodecs" | "videocodecs" => {
160 params.video_codecs = value.as_number().unwrap_or(0.0) as u32;
161 }
162 "videoFunction" | "videofunction" => {
163 params.video_function = value.as_number().unwrap_or(0.0) as u32;
164 }
165 "pageUrl" | "pageurl" => {
166 params.page_url = value.as_str().map(|s| s.to_string());
167 }
168 "objectEncoding" | "objectencoding" => {
169 params.object_encoding = value.as_number().unwrap_or(0.0);
170 }
171 _ => {
172 params.extra.insert(key.clone(), value.clone());
173 }
174 }
175 }
176 }
177
178 params
179 }
180}
181
182#[derive(Debug, Clone)]
184pub struct PublishParams {
185 pub stream_key: String,
187 pub publish_type: String,
189 pub stream_id: u32,
191}
192
193#[derive(Debug, Clone)]
195pub struct PlayParams {
196 pub stream_name: String,
198 pub start: f64,
200 pub duration: f64,
202 pub reset: bool,
204 pub stream_id: u32,
206}
207
208impl RtmpMessage {
209 pub fn from_chunk(chunk: &RtmpChunk) -> Result<Self> {
211 let mut payload = chunk.payload.clone();
212
213 match chunk.message_type {
214 MSG_SET_CHUNK_SIZE => {
215 if payload.len() < 4 {
216 return Err(ProtocolError::InvalidChunkHeader.into());
217 }
218 let size = payload.get_u32() & 0x7FFFFFFF; Ok(RtmpMessage::SetChunkSize(size))
220 }
221
222 MSG_ABORT => {
223 if payload.len() < 4 {
224 return Err(ProtocolError::InvalidChunkHeader.into());
225 }
226 Ok(RtmpMessage::Abort {
227 csid: payload.get_u32(),
228 })
229 }
230
231 MSG_ACKNOWLEDGEMENT => {
232 if payload.len() < 4 {
233 return Err(ProtocolError::InvalidChunkHeader.into());
234 }
235 Ok(RtmpMessage::Acknowledgement {
236 sequence: payload.get_u32(),
237 })
238 }
239
240 MSG_USER_CONTROL => Self::parse_user_control(&mut payload),
241
242 MSG_WINDOW_ACK_SIZE => {
243 if payload.len() < 4 {
244 return Err(ProtocolError::InvalidChunkHeader.into());
245 }
246 Ok(RtmpMessage::WindowAckSize(payload.get_u32()))
247 }
248
249 MSG_SET_PEER_BANDWIDTH => {
250 if payload.len() < 5 {
251 return Err(ProtocolError::InvalidChunkHeader.into());
252 }
253 let size = payload.get_u32();
254 let limit_type = payload.get_u8();
255 Ok(RtmpMessage::SetPeerBandwidth { size, limit_type })
256 }
257
258 MSG_AUDIO => Ok(RtmpMessage::Audio {
259 timestamp: chunk.timestamp,
260 data: payload,
261 }),
262
263 MSG_VIDEO => Ok(RtmpMessage::Video {
264 timestamp: chunk.timestamp,
265 data: payload,
266 }),
267
268 MSG_COMMAND_AMF0 => {
269 let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
270 Ok(RtmpMessage::Command(cmd))
271 }
272
273 MSG_COMMAND_AMF3 => {
274 if !payload.is_empty() && payload[0] == 0x00 {
276 payload.advance(1);
277 }
278 let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
279 Ok(RtmpMessage::CommandAmf3(cmd))
280 }
281
282 MSG_DATA_AMF0 => {
283 let data = Self::parse_data(&mut payload, chunk.stream_id)?;
284 Ok(RtmpMessage::Data(data))
285 }
286
287 MSG_DATA_AMF3 => {
288 if !payload.is_empty() && payload[0] == 0x00 {
289 payload.advance(1);
290 }
291 let data = Self::parse_data(&mut payload, chunk.stream_id)?;
292 Ok(RtmpMessage::DataAmf3(data))
293 }
294
295 MSG_AGGREGATE => Ok(RtmpMessage::Aggregate { data: payload }),
296
297 _ => Ok(RtmpMessage::Unknown {
298 type_id: chunk.message_type,
299 data: payload,
300 }),
301 }
302 }
303
304 fn parse_user_control(payload: &mut Bytes) -> Result<Self> {
306 if payload.len() < 6 {
307 return Err(ProtocolError::InvalidChunkHeader.into());
308 }
309
310 let event_type = payload.get_u16();
311 let event = match event_type {
312 UC_STREAM_BEGIN => UserControlEvent::StreamBegin(payload.get_u32()),
313 UC_STREAM_EOF => UserControlEvent::StreamEof(payload.get_u32()),
314 UC_STREAM_DRY => UserControlEvent::StreamDry(payload.get_u32()),
315 UC_SET_BUFFER_LENGTH => {
316 if payload.len() < 8 {
317 return Err(ProtocolError::InvalidChunkHeader.into());
318 }
319 let stream_id = payload.get_u32();
320 let buffer_ms = payload.get_u32();
321 UserControlEvent::SetBufferLength {
322 stream_id,
323 buffer_ms,
324 }
325 }
326 UC_STREAM_IS_RECORDED => UserControlEvent::StreamIsRecorded(payload.get_u32()),
327 UC_PING_REQUEST => UserControlEvent::PingRequest(payload.get_u32()),
328 UC_PING_RESPONSE => UserControlEvent::PingResponse(payload.get_u32()),
329 _ => UserControlEvent::Unknown {
330 event_type,
331 data: payload.clone(),
332 },
333 };
334
335 Ok(RtmpMessage::UserControl(event))
336 }
337
338 fn parse_command(payload: &mut Bytes, stream_id: u32) -> Result<Command> {
340 let mut decoder = Amf0Decoder::new();
341
342 let name = match decoder.decode(payload)? {
344 AmfValue::String(s) => s,
345 _ => return Err(ProtocolError::InvalidCommand("Expected command name".into()).into()),
346 };
347
348 let transaction_id = match decoder.decode(payload)? {
350 AmfValue::Number(n) => n,
351 _ => 0.0, };
353
354 let command_object = if payload.has_remaining() {
356 decoder.decode(payload)?
357 } else {
358 AmfValue::Null
359 };
360
361 let mut arguments = Vec::new();
363 while payload.has_remaining() {
364 match decoder.decode(payload) {
365 Ok(v) => arguments.push(v),
366 Err(AmfError::UnexpectedEof) => break,
367 Err(e) => return Err(e.into()),
368 }
369 }
370
371 Ok(Command {
372 name,
373 transaction_id,
374 command_object,
375 arguments,
376 stream_id,
377 })
378 }
379
380 fn parse_data(payload: &mut Bytes, stream_id: u32) -> Result<DataMessage> {
382 let mut decoder = Amf0Decoder::new();
383
384 let name = match decoder.decode(payload)? {
386 AmfValue::String(s) => s,
387 _ => String::new(), };
389
390 let mut values = Vec::new();
392 while payload.has_remaining() {
393 match decoder.decode(payload) {
394 Ok(v) => values.push(v),
395 Err(AmfError::UnexpectedEof) => break,
396 Err(e) => return Err(e.into()),
397 }
398 }
399
400 Ok(DataMessage {
401 name,
402 values,
403 stream_id,
404 })
405 }
406
407 pub fn encode(&self) -> (u8, Bytes) {
409 match self {
410 RtmpMessage::SetChunkSize(size) => {
411 let mut buf = BytesMut::with_capacity(4);
412 buf.put_u32(*size);
413 (MSG_SET_CHUNK_SIZE, buf.freeze())
414 }
415
416 RtmpMessage::Abort { csid } => {
417 let mut buf = BytesMut::with_capacity(4);
418 buf.put_u32(*csid);
419 (MSG_ABORT, buf.freeze())
420 }
421
422 RtmpMessage::Acknowledgement { sequence } => {
423 let mut buf = BytesMut::with_capacity(4);
424 buf.put_u32(*sequence);
425 (MSG_ACKNOWLEDGEMENT, buf.freeze())
426 }
427
428 RtmpMessage::WindowAckSize(size) => {
429 let mut buf = BytesMut::with_capacity(4);
430 buf.put_u32(*size);
431 (MSG_WINDOW_ACK_SIZE, buf.freeze())
432 }
433
434 RtmpMessage::SetPeerBandwidth { size, limit_type } => {
435 let mut buf = BytesMut::with_capacity(5);
436 buf.put_u32(*size);
437 buf.put_u8(*limit_type);
438 (MSG_SET_PEER_BANDWIDTH, buf.freeze())
439 }
440
441 RtmpMessage::UserControl(event) => {
442 let mut buf = BytesMut::with_capacity(10);
443 match event {
444 UserControlEvent::StreamBegin(id) => {
445 buf.put_u16(UC_STREAM_BEGIN);
446 buf.put_u32(*id);
447 }
448 UserControlEvent::StreamEof(id) => {
449 buf.put_u16(UC_STREAM_EOF);
450 buf.put_u32(*id);
451 }
452 UserControlEvent::StreamDry(id) => {
453 buf.put_u16(UC_STREAM_DRY);
454 buf.put_u32(*id);
455 }
456 UserControlEvent::SetBufferLength {
457 stream_id,
458 buffer_ms,
459 } => {
460 buf.put_u16(UC_SET_BUFFER_LENGTH);
461 buf.put_u32(*stream_id);
462 buf.put_u32(*buffer_ms);
463 }
464 UserControlEvent::StreamIsRecorded(id) => {
465 buf.put_u16(UC_STREAM_IS_RECORDED);
466 buf.put_u32(*id);
467 }
468 UserControlEvent::PingRequest(ts) => {
469 buf.put_u16(UC_PING_REQUEST);
470 buf.put_u32(*ts);
471 }
472 UserControlEvent::PingResponse(ts) => {
473 buf.put_u16(UC_PING_RESPONSE);
474 buf.put_u32(*ts);
475 }
476 UserControlEvent::Unknown { event_type, data } => {
477 buf.put_u16(*event_type);
478 buf.put_slice(data);
479 }
480 }
481 (MSG_USER_CONTROL, buf.freeze())
482 }
483
484 RtmpMessage::Audio { data, .. } => (MSG_AUDIO, data.clone()),
485
486 RtmpMessage::Video { data, .. } => (MSG_VIDEO, data.clone()),
487
488 RtmpMessage::Command(cmd) => {
489 let payload = encode_command(cmd);
490 (MSG_COMMAND_AMF0, payload)
491 }
492
493 RtmpMessage::CommandAmf3(cmd) => {
494 let mut buf = BytesMut::new();
495 buf.put_u8(0x00); buf.put_slice(&encode_command(cmd));
497 (MSG_COMMAND_AMF3, buf.freeze())
498 }
499
500 RtmpMessage::Data(data) => {
501 let payload = encode_data(data);
502 (MSG_DATA_AMF0, payload)
503 }
504
505 RtmpMessage::DataAmf3(data) => {
506 let mut buf = BytesMut::new();
507 buf.put_u8(0x00);
508 buf.put_slice(&encode_data(data));
509 (MSG_DATA_AMF3, buf.freeze())
510 }
511
512 RtmpMessage::Aggregate { data } => (MSG_AGGREGATE, data.clone()),
513
514 RtmpMessage::Unknown { type_id, data } => (*type_id, data.clone()),
515 }
516 }
517}
518
519fn encode_command(cmd: &Command) -> Bytes {
521 let mut encoder = Amf0Encoder::new();
522 encoder.encode(&AmfValue::String(cmd.name.clone()));
523 encoder.encode(&AmfValue::Number(cmd.transaction_id));
524 encoder.encode(&cmd.command_object);
525 for arg in &cmd.arguments {
526 encoder.encode(arg);
527 }
528 encoder.finish()
529}
530
531fn encode_data(data: &DataMessage) -> Bytes {
533 let mut encoder = Amf0Encoder::new();
534 encoder.encode(&AmfValue::String(data.name.clone()));
535 for value in &data.values {
536 encoder.encode(value);
537 }
538 encoder.finish()
539}
540
541impl Command {
543 pub fn result(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
545 Command {
546 name: CMD_RESULT.to_string(),
547 transaction_id,
548 command_object: properties,
549 arguments: vec![info],
550 stream_id: 0,
551 }
552 }
553
554 pub fn error(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
556 Command {
557 name: CMD_ERROR.to_string(),
558 transaction_id,
559 command_object: properties,
560 arguments: vec![info],
561 stream_id: 0,
562 }
563 }
564
565 pub fn on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Self {
567 let mut info = HashMap::new();
568 info.insert("level".to_string(), AmfValue::String(level.to_string()));
569 info.insert("code".to_string(), AmfValue::String(code.to_string()));
570 info.insert(
571 "description".to_string(),
572 AmfValue::String(description.to_string()),
573 );
574
575 Command {
576 name: CMD_ON_STATUS.to_string(),
577 transaction_id: 0.0,
578 command_object: AmfValue::Null,
579 arguments: vec![AmfValue::Object(info)],
580 stream_id,
581 }
582 }
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588
589 #[test]
590 fn test_connect_params_parsing() {
591 let mut obj = HashMap::new();
592 obj.insert("app".to_string(), AmfValue::String("live".into()));
593 obj.insert(
594 "tcUrl".to_string(),
595 AmfValue::String("rtmp://localhost/live".into()),
596 );
597 obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
598
599 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
600 assert_eq!(params.app, "live");
601 assert_eq!(params.tc_url, Some("rtmp://localhost/live".into()));
602 assert_eq!(params.object_encoding, 0.0);
603 }
604
605 #[test]
606 fn test_command_roundtrip() {
607 let cmd = Command {
608 name: "connect".to_string(),
609 transaction_id: 1.0,
610 command_object: AmfValue::Null,
611 arguments: vec![AmfValue::String("test".into())],
612 stream_id: 0,
613 };
614
615 let payload = encode_command(&cmd);
616 let chunk = RtmpChunk {
617 csid: CSID_COMMAND,
618 timestamp: 0,
619 message_type: MSG_COMMAND_AMF0,
620 stream_id: 0,
621 payload,
622 };
623
624 let parsed = RtmpMessage::from_chunk(&chunk).unwrap();
625 if let RtmpMessage::Command(parsed_cmd) = parsed {
626 assert_eq!(parsed_cmd.name, "connect");
627 assert_eq!(parsed_cmd.transaction_id, 1.0);
628 } else {
629 panic!("Expected Command message");
630 }
631 }
632
633 #[test]
634 fn test_set_chunk_size_message() {
635 let chunk = RtmpChunk {
636 csid: CSID_PROTOCOL_CONTROL,
637 timestamp: 0,
638 message_type: MSG_SET_CHUNK_SIZE,
639 stream_id: 0,
640 payload: Bytes::from_static(&[0x00, 0x00, 0x10, 0x00]), };
642
643 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
644 assert!(matches!(msg, RtmpMessage::SetChunkSize(4096)));
645
646 let (msg_type, payload) = msg.encode();
648 assert_eq!(msg_type, MSG_SET_CHUNK_SIZE);
649 assert_eq!(&payload[..], &[0x00, 0x00, 0x10, 0x00]);
650 }
651
652 #[test]
653 fn test_abort_message() {
654 let chunk = RtmpChunk {
655 csid: CSID_PROTOCOL_CONTROL,
656 timestamp: 0,
657 message_type: MSG_ABORT,
658 stream_id: 0,
659 payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x05]), };
661
662 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
663 if let RtmpMessage::Abort { csid } = msg {
664 assert_eq!(csid, 5);
665 } else {
666 panic!("Expected Abort message");
667 }
668 }
669
670 #[test]
671 fn test_acknowledgement_message() {
672 let chunk = RtmpChunk {
673 csid: CSID_PROTOCOL_CONTROL,
674 timestamp: 0,
675 message_type: MSG_ACKNOWLEDGEMENT,
676 stream_id: 0,
677 payload: Bytes::from_static(&[0x00, 0x10, 0x00, 0x00]), };
679
680 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
681 if let RtmpMessage::Acknowledgement { sequence } = msg {
682 assert_eq!(sequence, 1048576);
683 } else {
684 panic!("Expected Acknowledgement message");
685 }
686 }
687
688 #[test]
689 fn test_window_ack_size_message() {
690 let chunk = RtmpChunk {
691 csid: CSID_PROTOCOL_CONTROL,
692 timestamp: 0,
693 message_type: MSG_WINDOW_ACK_SIZE,
694 stream_id: 0,
695 payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0]), };
697
698 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
699 if let RtmpMessage::WindowAckSize(size) = msg {
700 assert_eq!(size, 2500000);
701 } else {
702 panic!("Expected WindowAckSize message");
703 }
704 }
705
706 #[test]
707 fn test_set_peer_bandwidth_message() {
708 let chunk = RtmpChunk {
709 csid: CSID_PROTOCOL_CONTROL,
710 timestamp: 0,
711 message_type: MSG_SET_PEER_BANDWIDTH,
712 stream_id: 0,
713 payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0, 0x02]), };
715
716 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
717 if let RtmpMessage::SetPeerBandwidth { size, limit_type } = msg {
718 assert_eq!(size, 2500000);
719 assert_eq!(limit_type, BANDWIDTH_LIMIT_DYNAMIC);
720 } else {
721 panic!("Expected SetPeerBandwidth message");
722 }
723 }
724
725 #[test]
726 fn test_user_control_stream_begin() {
727 let chunk = RtmpChunk {
728 csid: CSID_PROTOCOL_CONTROL,
729 timestamp: 0,
730 message_type: MSG_USER_CONTROL,
731 stream_id: 0,
732 payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x01]), };
734
735 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
736 if let RtmpMessage::UserControl(UserControlEvent::StreamBegin(id)) = msg {
737 assert_eq!(id, 1);
738 } else {
739 panic!("Expected StreamBegin user control");
740 }
741 }
742
743 #[test]
744 fn test_user_control_stream_eof() {
745 let chunk = RtmpChunk {
746 csid: CSID_PROTOCOL_CONTROL,
747 timestamp: 0,
748 message_type: MSG_USER_CONTROL,
749 stream_id: 0,
750 payload: Bytes::from_static(&[0x00, 0x01, 0x00, 0x00, 0x00, 0x02]), };
752
753 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
754 if let RtmpMessage::UserControl(UserControlEvent::StreamEof(id)) = msg {
755 assert_eq!(id, 2);
756 } else {
757 panic!("Expected StreamEof user control");
758 }
759 }
760
761 #[test]
762 fn test_user_control_set_buffer_length() {
763 let chunk = RtmpChunk {
764 csid: CSID_PROTOCOL_CONTROL,
765 timestamp: 0,
766 message_type: MSG_USER_CONTROL,
767 stream_id: 0,
768 payload: Bytes::from_static(&[
769 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x03, 0xE8, ]),
773 };
774
775 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
776 if let RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
777 stream_id,
778 buffer_ms,
779 }) = msg
780 {
781 assert_eq!(stream_id, 1);
782 assert_eq!(buffer_ms, 1000);
783 } else {
784 panic!("Expected SetBufferLength user control");
785 }
786 }
787
788 #[test]
789 fn test_user_control_ping_request() {
790 let chunk = RtmpChunk {
791 csid: CSID_PROTOCOL_CONTROL,
792 timestamp: 0,
793 message_type: MSG_USER_CONTROL,
794 stream_id: 0,
795 payload: Bytes::from_static(&[0x00, 0x06, 0x00, 0x01, 0x00, 0x00]), };
797
798 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
799 if let RtmpMessage::UserControl(UserControlEvent::PingRequest(ts)) = msg {
800 assert_eq!(ts, 0x00010000);
801 } else {
802 panic!("Expected PingRequest user control");
803 }
804 }
805
806 #[test]
807 fn test_user_control_ping_response() {
808 let chunk = RtmpChunk {
809 csid: CSID_PROTOCOL_CONTROL,
810 timestamp: 0,
811 message_type: MSG_USER_CONTROL,
812 stream_id: 0,
813 payload: Bytes::from_static(&[0x00, 0x07, 0x00, 0x00, 0x00, 0x64]), };
815
816 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
817 if let RtmpMessage::UserControl(UserControlEvent::PingResponse(ts)) = msg {
818 assert_eq!(ts, 100);
819 } else {
820 panic!("Expected PingResponse user control");
821 }
822 }
823
824 #[test]
825 fn test_audio_message() {
826 let audio_data = Bytes::from_static(&[0xAF, 0x01, 0x21, 0x00, 0x00]);
827
828 let chunk = RtmpChunk {
829 csid: CSID_AUDIO,
830 timestamp: 1000,
831 message_type: MSG_AUDIO,
832 stream_id: 1,
833 payload: audio_data.clone(),
834 };
835
836 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
837 if let RtmpMessage::Audio { timestamp, data } = msg {
838 assert_eq!(timestamp, 1000);
839 assert_eq!(data, audio_data);
840 } else {
841 panic!("Expected Audio message");
842 }
843 }
844
845 #[test]
846 fn test_video_message() {
847 let video_data = Bytes::from_static(&[0x17, 0x01, 0x00, 0x00, 0x00, 0x00]);
848
849 let chunk = RtmpChunk {
850 csid: CSID_VIDEO,
851 timestamp: 2000,
852 message_type: MSG_VIDEO,
853 stream_id: 1,
854 payload: video_data.clone(),
855 };
856
857 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
858 if let RtmpMessage::Video { timestamp, data } = msg {
859 assert_eq!(timestamp, 2000);
860 assert_eq!(data, video_data);
861 } else {
862 panic!("Expected Video message");
863 }
864 }
865
866 #[test]
867 fn test_data_message() {
868 let mut encoder = Amf0Encoder::new();
869 encoder.encode(&AmfValue::String("@setDataFrame".into()));
870 encoder.encode(&AmfValue::String("onMetaData".into()));
871 let mut metadata = HashMap::new();
872 metadata.insert("width".to_string(), AmfValue::Number(1920.0));
873 encoder.encode(&AmfValue::Object(metadata));
874
875 let chunk = RtmpChunk {
876 csid: CSID_COMMAND,
877 timestamp: 0,
878 message_type: MSG_DATA_AMF0,
879 stream_id: 1,
880 payload: encoder.finish(),
881 };
882
883 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
884 if let RtmpMessage::Data(data) = msg {
885 assert_eq!(data.name, "@setDataFrame");
886 assert_eq!(data.stream_id, 1);
887 assert_eq!(data.values.len(), 2);
888 } else {
889 panic!("Expected Data message");
890 }
891 }
892
893 #[test]
894 fn test_unknown_message_type() {
895 let chunk = RtmpChunk {
896 csid: CSID_COMMAND,
897 timestamp: 0,
898 message_type: 99, stream_id: 0,
900 payload: Bytes::from_static(b"unknown"),
901 };
902
903 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
904 if let RtmpMessage::Unknown { type_id, data } = msg {
905 assert_eq!(type_id, 99);
906 assert_eq!(data.as_ref(), b"unknown");
907 } else {
908 panic!("Expected Unknown message");
909 }
910 }
911
912 #[test]
913 fn test_command_result() {
914 let mut props = HashMap::new();
915 props.insert(
916 "fmsVer".to_string(),
917 AmfValue::String("FMS/3,5,7,7009".into()),
918 );
919 props.insert("capabilities".to_string(), AmfValue::Number(31.0));
920
921 let result = Command::result(1.0, AmfValue::Object(props), AmfValue::Null);
922
923 assert_eq!(result.name, "_result");
924 assert_eq!(result.transaction_id, 1.0);
925 }
926
927 #[test]
928 fn test_command_error() {
929 let error = Command::error(1.0, AmfValue::Null, AmfValue::String("error".into()));
930
931 assert_eq!(error.name, "_error");
932 assert_eq!(error.transaction_id, 1.0);
933 }
934
935 #[test]
936 fn test_command_on_status() {
937 let status = Command::on_status(1, "status", NS_PUBLISH_START, "Publishing started");
938
939 assert_eq!(status.name, "onStatus");
940 assert_eq!(status.transaction_id, 0.0);
941 assert_eq!(status.stream_id, 1);
942
943 if let Some(info) = status.arguments.first() {
944 if let AmfValue::Object(props) = info {
945 assert_eq!(props.get("level").unwrap().as_str(), Some("status"));
946 assert_eq!(props.get("code").unwrap().as_str(), Some(NS_PUBLISH_START));
947 } else {
948 panic!("Expected Object in arguments");
949 }
950 } else {
951 panic!("Expected arguments");
952 }
953 }
954
955 #[test]
956 fn test_connect_params_all_fields() {
957 let mut obj = HashMap::new();
958 obj.insert("app".to_string(), AmfValue::String("live".into()));
959 obj.insert(
960 "flashVer".to_string(),
961 AmfValue::String("OBS-Studio/29.0".into()),
962 );
963 obj.insert(
964 "swfUrl".to_string(),
965 AmfValue::String("rtmp://example.com/app".into()),
966 );
967 obj.insert(
968 "tcUrl".to_string(),
969 AmfValue::String("rtmp://example.com/live".into()),
970 );
971 obj.insert("fpad".to_string(), AmfValue::Boolean(false));
972 obj.insert("audioCodecs".to_string(), AmfValue::Number(3575.0));
973 obj.insert("videoCodecs".to_string(), AmfValue::Number(252.0));
974 obj.insert("videoFunction".to_string(), AmfValue::Number(1.0));
975 obj.insert(
976 "pageUrl".to_string(),
977 AmfValue::String("http://twitch.tv".into()),
978 );
979 obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
980 obj.insert("custom".to_string(), AmfValue::String("value".into()));
981
982 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
983
984 assert_eq!(params.app, "live");
985 assert_eq!(params.flash_ver, Some("OBS-Studio/29.0".into()));
986 assert_eq!(params.swf_url, Some("rtmp://example.com/app".into()));
987 assert_eq!(params.tc_url, Some("rtmp://example.com/live".into()));
988 assert!(!params.fpad);
989 assert_eq!(params.audio_codecs, 3575);
990 assert_eq!(params.video_codecs, 252);
991 assert_eq!(params.video_function, 1);
992 assert_eq!(params.page_url, Some("http://twitch.tv".into()));
993 assert_eq!(params.object_encoding, 0.0);
994 assert!(params.extra.contains_key("custom"));
995 }
996
997 #[test]
998 fn test_connect_params_case_insensitive() {
999 let mut obj = HashMap::new();
1001 obj.insert("flashver".to_string(), AmfValue::String("test".into()));
1002 obj.insert("tcurl".to_string(), AmfValue::String("url".into()));
1003 obj.insert("pageurl".to_string(), AmfValue::String("page".into()));
1004 obj.insert("swfurl".to_string(), AmfValue::String("swf".into()));
1005
1006 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1007
1008 assert_eq!(params.flash_ver, Some("test".into()));
1009 assert_eq!(params.tc_url, Some("url".into()));
1010 assert_eq!(params.page_url, Some("page".into()));
1011 assert_eq!(params.swf_url, Some("swf".into()));
1012 }
1013
1014 #[test]
1015 fn test_connect_params_from_non_object() {
1016 let params = ConnectParams::from_amf(&AmfValue::Null);
1018 assert_eq!(params.app, "");
1019 assert!(params.flash_ver.is_none());
1020 }
1021
1022 #[test]
1023 fn test_message_encode_roundtrip() {
1024 let msg = RtmpMessage::SetChunkSize(4096);
1028 let (msg_type, payload) = msg.encode();
1029 let chunk = RtmpChunk {
1030 csid: CSID_PROTOCOL_CONTROL,
1031 timestamp: 0,
1032 message_type: msg_type,
1033 stream_id: 0,
1034 payload,
1035 };
1036 let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1037 assert!(matches!(decoded, RtmpMessage::SetChunkSize(4096)));
1038
1039 let msg = RtmpMessage::WindowAckSize(2500000);
1041 let (msg_type, payload) = msg.encode();
1042 let chunk = RtmpChunk {
1043 csid: CSID_PROTOCOL_CONTROL,
1044 timestamp: 0,
1045 message_type: msg_type,
1046 stream_id: 0,
1047 payload,
1048 };
1049 let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1050 assert!(matches!(decoded, RtmpMessage::WindowAckSize(2500000)));
1051 }
1052
1053 #[test]
1054 fn test_user_control_event_encode() {
1055 let events = vec![
1057 RtmpMessage::UserControl(UserControlEvent::StreamBegin(1)),
1058 RtmpMessage::UserControl(UserControlEvent::StreamEof(2)),
1059 RtmpMessage::UserControl(UserControlEvent::StreamDry(3)),
1060 RtmpMessage::UserControl(UserControlEvent::StreamIsRecorded(4)),
1061 RtmpMessage::UserControl(UserControlEvent::PingRequest(5)),
1062 RtmpMessage::UserControl(UserControlEvent::PingResponse(6)),
1063 RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
1064 stream_id: 1,
1065 buffer_ms: 1000,
1066 }),
1067 ];
1068
1069 for msg in events {
1070 let (msg_type, payload) = msg.encode();
1071 assert_eq!(msg_type, MSG_USER_CONTROL);
1072 assert!(!payload.is_empty());
1073 }
1074 }
1075
1076 #[test]
1077 fn test_aggregate_message() {
1078 let chunk = RtmpChunk {
1079 csid: CSID_VIDEO,
1080 timestamp: 0,
1081 message_type: MSG_AGGREGATE,
1082 stream_id: 1,
1083 payload: Bytes::from_static(b"aggregate data"),
1084 };
1085
1086 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1087 if let RtmpMessage::Aggregate { data } = msg {
1088 assert_eq!(data.as_ref(), b"aggregate data");
1089 } else {
1090 panic!("Expected Aggregate message");
1091 }
1092 }
1093
1094 #[test]
1095 fn test_truncated_protocol_control_messages() {
1096 let chunk = RtmpChunk {
1098 csid: CSID_PROTOCOL_CONTROL,
1099 timestamp: 0,
1100 message_type: MSG_SET_CHUNK_SIZE,
1101 stream_id: 0,
1102 payload: Bytes::from_static(&[0x00, 0x00]), };
1104
1105 let result = RtmpMessage::from_chunk(&chunk);
1106 assert!(result.is_err());
1107
1108 let chunk = RtmpChunk {
1110 csid: CSID_PROTOCOL_CONTROL,
1111 timestamp: 0,
1112 message_type: MSG_WINDOW_ACK_SIZE,
1113 stream_id: 0,
1114 payload: Bytes::from_static(&[0x00]),
1115 };
1116
1117 let result = RtmpMessage::from_chunk(&chunk);
1118 assert!(result.is_err());
1119 }
1120}