1use crate::amf::{encode_command, Amf0Value};
8use crate::caps::ConnectCapabilities;
9use crate::chunk::Message;
10use crate::error::{Error, Result};
11
12pub const MSG_SET_CHUNK_SIZE: u8 = 1;
14pub const MSG_ABORT: u8 = 2;
15pub const MSG_ACK: u8 = 3;
16pub const MSG_USER_CONTROL: u8 = 4;
17pub const MSG_WINDOW_ACK_SIZE: u8 = 5;
18pub const MSG_SET_PEER_BANDWIDTH: u8 = 6;
19pub const MSG_AUDIO: u8 = 8;
20pub const MSG_VIDEO: u8 = 9;
21pub const MSG_DATA_AMF3: u8 = 15;
22pub const MSG_SHARED_OBJECT_AMF3: u8 = 16;
23pub const MSG_COMMAND_AMF3: u8 = 17;
24pub const MSG_DATA_AMF0: u8 = 18;
25pub const MSG_SHARED_OBJECT_AMF0: u8 = 19;
26pub const MSG_COMMAND_AMF0: u8 = 20;
27pub const MSG_AGGREGATE: u8 = 22;
28
29pub const USR_STREAM_BEGIN: u16 = 0;
31pub const USR_STREAM_EOF: u16 = 1;
32pub const USR_STREAM_DRY: u16 = 2;
33pub const USR_SET_BUFFER_LENGTH: u16 = 3;
34pub const USR_STREAM_IS_RECORDED: u16 = 4;
35pub const USR_PING_REQUEST: u16 = 6;
36pub const USR_PING_RESPONSE: u16 = 7;
37
38pub const CSID_PROTOCOL_CONTROL: u32 = 2;
41pub const CSID_COMMAND: u32 = 3;
42pub const CSID_AUDIO: u32 = 4;
43pub const CSID_VIDEO: u32 = 5;
44pub const CSID_DATA: u32 = 6;
45
46pub fn build_set_chunk_size(size: u32) -> Message {
51 let size = size & 0x7FFF_FFFF;
53 Message {
54 msg_type_id: MSG_SET_CHUNK_SIZE,
55 msg_stream_id: 0,
56 timestamp: 0,
57 payload: size.to_be_bytes().to_vec(),
58 }
59}
60
61pub fn build_abort(chunk_stream_id: u32) -> Message {
72 Message {
73 msg_type_id: MSG_ABORT,
74 msg_stream_id: 0,
75 timestamp: 0,
76 payload: chunk_stream_id.to_be_bytes().to_vec(),
77 }
78}
79
80pub fn build_window_ack_size(size: u32) -> Message {
81 Message {
82 msg_type_id: MSG_WINDOW_ACK_SIZE,
83 msg_stream_id: 0,
84 timestamp: 0,
85 payload: size.to_be_bytes().to_vec(),
86 }
87}
88
89pub fn build_set_peer_bandwidth(size: u32, limit_type: u8) -> Message {
90 let mut p = Vec::with_capacity(5);
91 p.extend_from_slice(&size.to_be_bytes());
92 p.push(limit_type);
93 Message {
94 msg_type_id: MSG_SET_PEER_BANDWIDTH,
95 msg_stream_id: 0,
96 timestamp: 0,
97 payload: p,
98 }
99}
100
101pub fn build_user_control_stream_begin(stream_id: u32) -> Message {
102 let mut p = Vec::with_capacity(6);
103 p.extend_from_slice(&USR_STREAM_BEGIN.to_be_bytes());
104 p.extend_from_slice(&stream_id.to_be_bytes());
105 Message {
106 msg_type_id: MSG_USER_CONTROL,
107 msg_stream_id: 0,
108 timestamp: 0,
109 payload: p,
110 }
111}
112
113pub fn build_user_control_stream_eof(stream_id: u32) -> Message {
121 let mut p = Vec::with_capacity(6);
122 p.extend_from_slice(&USR_STREAM_EOF.to_be_bytes());
123 p.extend_from_slice(&stream_id.to_be_bytes());
124 Message {
125 msg_type_id: MSG_USER_CONTROL,
126 msg_stream_id: 0,
127 timestamp: 0,
128 payload: p,
129 }
130}
131
132pub fn build_user_control_stream_dry(stream_id: u32) -> Message {
142 let mut p = Vec::with_capacity(6);
143 p.extend_from_slice(&USR_STREAM_DRY.to_be_bytes());
144 p.extend_from_slice(&stream_id.to_be_bytes());
145 Message {
146 msg_type_id: MSG_USER_CONTROL,
147 msg_stream_id: 0,
148 timestamp: 0,
149 payload: p,
150 }
151}
152
153pub fn build_user_control_set_buffer_length(stream_id: u32, buffer_ms: u32) -> Message {
163 let mut p = Vec::with_capacity(10);
164 p.extend_from_slice(&USR_SET_BUFFER_LENGTH.to_be_bytes());
165 p.extend_from_slice(&stream_id.to_be_bytes());
166 p.extend_from_slice(&buffer_ms.to_be_bytes());
167 Message {
168 msg_type_id: MSG_USER_CONTROL,
169 msg_stream_id: 0,
170 timestamp: 0,
171 payload: p,
172 }
173}
174
175pub fn build_user_control_stream_is_recorded(stream_id: u32) -> Message {
182 let mut p = Vec::with_capacity(6);
183 p.extend_from_slice(&USR_STREAM_IS_RECORDED.to_be_bytes());
184 p.extend_from_slice(&stream_id.to_be_bytes());
185 Message {
186 msg_type_id: MSG_USER_CONTROL,
187 msg_stream_id: 0,
188 timestamp: 0,
189 payload: p,
190 }
191}
192
193pub fn build_user_control_ping_request(timestamp_ms: u32) -> Message {
200 let mut p = Vec::with_capacity(6);
201 p.extend_from_slice(&USR_PING_REQUEST.to_be_bytes());
202 p.extend_from_slice(×tamp_ms.to_be_bytes());
203 Message {
204 msg_type_id: MSG_USER_CONTROL,
205 msg_stream_id: 0,
206 timestamp: 0,
207 payload: p,
208 }
209}
210
211pub fn build_user_control_ping_response(timestamp_ms: u32) -> Message {
219 let mut p = Vec::with_capacity(6);
220 p.extend_from_slice(&USR_PING_RESPONSE.to_be_bytes());
221 p.extend_from_slice(×tamp_ms.to_be_bytes());
222 Message {
223 msg_type_id: MSG_USER_CONTROL,
224 msg_stream_id: 0,
225 timestamp: 0,
226 payload: p,
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
254pub enum UserControlEvent {
255 StreamBegin { stream_id: u32 },
260 StreamEof { stream_id: u32 },
265 StreamDry { stream_id: u32 },
270 SetBufferLength { stream_id: u32, buffer_ms: u32 },
275 StreamIsRecorded { stream_id: u32 },
279 PingRequest { timestamp_ms: u32 },
283 PingResponse { timestamp_ms: u32 },
286 Unknown { event_type: u16, data: Vec<u8> },
292}
293
294impl UserControlEvent {
295 pub fn parse(payload: &[u8]) -> Result<Self> {
307 if payload.len() < 2 {
308 return Err(Error::ProtocolViolation(
309 "UserControl: payload < 2 bytes (need event type)".into(),
310 ));
311 }
312 let event_type = u16::from_be_bytes([payload[0], payload[1]]);
313 let data = &payload[2..];
314 match event_type {
315 USR_STREAM_BEGIN => Ok(Self::StreamBegin {
316 stream_id: read_u32_be(data, "StreamBegin")?,
317 }),
318 USR_STREAM_EOF => Ok(Self::StreamEof {
319 stream_id: read_u32_be(data, "StreamEOF")?,
320 }),
321 USR_STREAM_DRY => Ok(Self::StreamDry {
322 stream_id: read_u32_be(data, "StreamDry")?,
323 }),
324 USR_SET_BUFFER_LENGTH => {
325 if data.len() < 8 {
326 return Err(Error::ProtocolViolation(format!(
327 "UserControl SetBufferLength: event data {} < 8 bytes",
328 data.len()
329 )));
330 }
331 let stream_id = u32::from_be_bytes([data[0], data[1], data[2], data[3]]);
332 let buffer_ms = u32::from_be_bytes([data[4], data[5], data[6], data[7]]);
333 Ok(Self::SetBufferLength {
334 stream_id,
335 buffer_ms,
336 })
337 }
338 USR_STREAM_IS_RECORDED => Ok(Self::StreamIsRecorded {
339 stream_id: read_u32_be(data, "StreamIsRecorded")?,
340 }),
341 USR_PING_REQUEST => Ok(Self::PingRequest {
342 timestamp_ms: read_u32_be(data, "PingRequest")?,
343 }),
344 USR_PING_RESPONSE => Ok(Self::PingResponse {
345 timestamp_ms: read_u32_be(data, "PingResponse")?,
346 }),
347 other => Ok(Self::Unknown {
348 event_type: other,
349 data: data.to_vec(),
350 }),
351 }
352 }
353
354 pub fn event_type(&self) -> u16 {
357 match self {
358 Self::StreamBegin { .. } => USR_STREAM_BEGIN,
359 Self::StreamEof { .. } => USR_STREAM_EOF,
360 Self::StreamDry { .. } => USR_STREAM_DRY,
361 Self::SetBufferLength { .. } => USR_SET_BUFFER_LENGTH,
362 Self::StreamIsRecorded { .. } => USR_STREAM_IS_RECORDED,
363 Self::PingRequest { .. } => USR_PING_REQUEST,
364 Self::PingResponse { .. } => USR_PING_RESPONSE,
365 Self::Unknown { event_type, .. } => *event_type,
366 }
367 }
368
369 pub fn is_spec_defined(&self) -> bool {
372 !matches!(self, Self::Unknown { .. })
373 }
374
375 pub fn to_message(&self) -> Message {
383 match self {
384 Self::StreamBegin { stream_id } => build_user_control_stream_begin(*stream_id),
385 Self::StreamEof { stream_id } => build_user_control_stream_eof(*stream_id),
386 Self::StreamDry { stream_id } => build_user_control_stream_dry(*stream_id),
387 Self::SetBufferLength {
388 stream_id,
389 buffer_ms,
390 } => build_user_control_set_buffer_length(*stream_id, *buffer_ms),
391 Self::StreamIsRecorded { stream_id } => {
392 build_user_control_stream_is_recorded(*stream_id)
393 }
394 Self::PingRequest { timestamp_ms } => build_user_control_ping_request(*timestamp_ms),
395 Self::PingResponse { timestamp_ms } => build_user_control_ping_response(*timestamp_ms),
396 Self::Unknown { event_type, data } => {
397 let mut p = Vec::with_capacity(2 + data.len());
398 p.extend_from_slice(&event_type.to_be_bytes());
399 p.extend_from_slice(data);
400 Message {
401 msg_type_id: MSG_USER_CONTROL,
402 msg_stream_id: 0,
403 timestamp: 0,
404 payload: p,
405 }
406 }
407 }
408 }
409}
410
411fn read_u32_be(data: &[u8], variant: &str) -> Result<u32> {
415 if data.len() < 4 {
416 return Err(Error::ProtocolViolation(format!(
417 "UserControl {variant}: event data {} < 4 bytes",
418 data.len()
419 )));
420 }
421 Ok(u32::from_be_bytes([data[0], data[1], data[2], data[3]]))
422}
423
424pub fn build_ack(bytes_received: u32) -> Message {
425 Message {
426 msg_type_id: MSG_ACK,
427 msg_stream_id: 0,
428 timestamp: 0,
429 payload: bytes_received.to_be_bytes().to_vec(),
430 }
431}
432
433pub fn build_connect(transaction_id: f64, app: &str, tc_url: &str, flash_ver: &str) -> Message {
446 build_connect_with_caps(
447 transaction_id,
448 app,
449 tc_url,
450 flash_ver,
451 &ConnectCapabilities::default(),
452 )
453}
454
455pub fn build_connect_with_caps(
469 transaction_id: f64,
470 app: &str,
471 tc_url: &str,
472 flash_ver: &str,
473 caps: &ConnectCapabilities,
474) -> Message {
475 let mut pairs: Vec<(String, Amf0Value)> = vec![
476 ("app".into(), Amf0Value::String(app.into())),
477 ("type".into(), Amf0Value::String("nonprivate".into())),
478 ("flashVer".into(), Amf0Value::String(flash_ver.into())),
479 ("tcUrl".into(), Amf0Value::String(tc_url.into())),
480 ("fpad".into(), Amf0Value::Boolean(false)),
481 ("capabilities".into(), Amf0Value::Number(15.0)),
482 ("audioCodecs".into(), Amf0Value::Number(0x0FFF as f64)),
483 ("videoCodecs".into(), Amf0Value::Number(0x00FF as f64)),
484 ("videoFunction".into(), Amf0Value::Number(1.0)),
485 ];
486 caps.encode_into(&mut pairs);
487 let payload = encode_command("connect", transaction_id, Amf0Value::Object(pairs), &[]);
488 Message {
489 msg_type_id: MSG_COMMAND_AMF0,
490 msg_stream_id: 0,
491 timestamp: 0,
492 payload,
493 }
494}
495
496pub fn build_connect_result(transaction_id: f64) -> Message {
504 build_connect_result_with_caps(transaction_id, &ConnectCapabilities::default())
505}
506
507pub fn build_connect_result_with_caps(transaction_id: f64, caps: &ConnectCapabilities) -> Message {
524 let props = Amf0Value::Object(vec![
525 ("fmsVer".into(), Amf0Value::String("FMS/3,0,1,123".into())),
526 ("capabilities".into(), Amf0Value::Number(31.0)),
527 ("mode".into(), Amf0Value::Number(1.0)),
528 ]);
529 let mut info_pairs: Vec<(String, Amf0Value)> = vec![
535 ("level".into(), Amf0Value::String("status".into())),
536 (
537 "code".into(),
538 Amf0Value::String("NetConnection.Connect.Success".into()),
539 ),
540 (
541 "description".into(),
542 Amf0Value::String("Connection accepted.".into()),
543 ),
544 ];
545 if caps.object_encoding.is_none() {
546 info_pairs.push(("objectEncoding".into(), Amf0Value::Number(0.0)));
547 }
548 caps.encode_into(&mut info_pairs);
549 let payload = encode_command(
550 "_result",
551 transaction_id,
552 props,
553 &[Amf0Value::Object(info_pairs)],
554 );
555 Message {
556 msg_type_id: MSG_COMMAND_AMF0,
557 msg_stream_id: 0,
558 timestamp: 0,
559 payload,
560 }
561}
562
563pub fn build_release_stream(transaction_id: f64, stream_name: &str) -> Message {
566 let payload = encode_command(
567 "releaseStream",
568 transaction_id,
569 Amf0Value::Null,
570 &[Amf0Value::String(stream_name.into())],
571 );
572 Message {
573 msg_type_id: MSG_COMMAND_AMF0,
574 msg_stream_id: 0,
575 timestamp: 0,
576 payload,
577 }
578}
579
580pub fn build_fc_publish(transaction_id: f64, stream_name: &str) -> Message {
584 let payload = encode_command(
585 "FCPublish",
586 transaction_id,
587 Amf0Value::Null,
588 &[Amf0Value::String(stream_name.into())],
589 );
590 Message {
591 msg_type_id: MSG_COMMAND_AMF0,
592 msg_stream_id: 0,
593 timestamp: 0,
594 payload,
595 }
596}
597
598pub fn build_create_stream(transaction_id: f64) -> Message {
602 let payload = encode_command("createStream", transaction_id, Amf0Value::Null, &[]);
603 Message {
604 msg_type_id: MSG_COMMAND_AMF0,
605 msg_stream_id: 0,
606 timestamp: 0,
607 payload,
608 }
609}
610
611pub fn build_create_stream_result(transaction_id: f64, stream_id: f64) -> Message {
612 let payload = encode_command(
613 "_result",
614 transaction_id,
615 Amf0Value::Null,
616 &[Amf0Value::Number(stream_id)],
617 );
618 Message {
619 msg_type_id: MSG_COMMAND_AMF0,
620 msg_stream_id: 0,
621 timestamp: 0,
622 payload,
623 }
624}
625
626pub fn build_publish(
630 transaction_id: f64,
631 stream_id: u32,
632 stream_name: &str,
633 publish_type: &str,
634) -> Message {
635 let payload = encode_command(
636 "publish",
637 transaction_id,
638 Amf0Value::Null,
639 &[
640 Amf0Value::String(stream_name.into()),
641 Amf0Value::String(publish_type.into()),
642 ],
643 );
644 Message {
645 msg_type_id: MSG_COMMAND_AMF0,
646 msg_stream_id: stream_id,
647 timestamp: 0,
648 payload,
649 }
650}
651
652pub fn build_on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Message {
656 let info = Amf0Value::Object(vec![
657 ("level".into(), Amf0Value::String(level.into())),
658 ("code".into(), Amf0Value::String(code.into())),
659 ("description".into(), Amf0Value::String(description.into())),
660 ]);
661 let payload = encode_command("onStatus", 0.0, Amf0Value::Null, &[info]);
662 Message {
663 msg_type_id: MSG_COMMAND_AMF0,
664 msg_stream_id: stream_id,
665 timestamp: 0,
666 payload,
667 }
668}
669
670pub const RECONNECT_REQUEST_CODE: &str = "NetConnection.Connect.ReconnectRequest";
675
676pub fn build_reconnect_request(tc_url: Option<&str>, description: Option<&str>) -> Message {
699 let mut props = vec![
700 (
701 "code".into(),
702 Amf0Value::String(RECONNECT_REQUEST_CODE.into()),
703 ),
704 ("level".into(), Amf0Value::String("status".into())),
705 ];
706 if let Some(desc) = description {
707 props.push(("description".into(), Amf0Value::String(desc.into())));
708 }
709 if let Some(url) = tc_url {
710 props.push(("tcUrl".into(), Amf0Value::String(url.into())));
711 }
712 let payload = encode_command(
713 "onStatus",
714 0.0,
715 Amf0Value::Null,
716 &[Amf0Value::Object(props)],
717 );
718 Message {
719 msg_type_id: MSG_COMMAND_AMF0,
720 msg_stream_id: 0,
724 timestamp: 0,
725 payload,
726 }
727}
728
729pub fn build_set_data_frame(stream_id: u32, metadata: Amf0Value) -> Message {
733 let mut payload = Vec::new();
734 crate::amf::encode(&mut payload, &Amf0Value::String("@setDataFrame".into()));
735 crate::amf::encode(&mut payload, &Amf0Value::String("onMetaData".into()));
736 crate::amf::encode(&mut payload, &metadata);
737 Message {
738 msg_type_id: MSG_DATA_AMF0,
739 msg_stream_id: stream_id,
740 timestamp: 0,
741 payload,
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748
749 #[test]
752 fn user_control_stream_begin_wire_bytes() {
753 let m = build_user_control_stream_begin(1);
754 assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
755 assert_eq!(m.msg_stream_id, 0);
756 assert_eq!(m.payload, vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x01]);
757 }
758
759 #[test]
764 fn user_control_stream_eof_wire_bytes() {
765 let m = build_user_control_stream_eof(7);
766 assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
767 assert_eq!(m.msg_stream_id, 0);
768 assert_eq!(m.timestamp, 0);
769 assert_eq!(m.payload, vec![0x00, 0x01, 0x00, 0x00, 0x00, 0x07]);
771 }
772
773 #[test]
777 fn user_control_stream_dry_wire_bytes() {
778 let m = build_user_control_stream_dry(0x0010_2030);
779 assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
780 assert_eq!(m.msg_stream_id, 0);
781 assert_eq!(m.payload, vec![0x00, 0x02, 0x00, 0x10, 0x20, 0x30]);
782 }
783
784 #[test]
788 fn user_control_set_buffer_length_wire_bytes() {
789 let m = build_user_control_set_buffer_length(1, 3000);
790 assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
791 assert_eq!(m.msg_stream_id, 0);
792 assert_eq!(
794 m.payload,
795 vec![0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x0B, 0xB8],
796 );
797 }
798
799 #[test]
802 fn user_control_stream_is_recorded_wire_bytes() {
803 let m = build_user_control_stream_is_recorded(5);
804 assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
805 assert_eq!(m.msg_stream_id, 0);
806 assert_eq!(m.payload, vec![0x00, 0x04, 0x00, 0x00, 0x00, 0x05]);
807 }
808
809 #[test]
812 fn user_control_ping_request_wire_bytes() {
813 let m = build_user_control_ping_request(0xDEAD_BEEF);
814 assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
815 assert_eq!(m.msg_stream_id, 0);
816 assert_eq!(m.payload, vec![0x00, 0x06, 0xDE, 0xAD, 0xBE, 0xEF]);
817 }
818
819 #[test]
823 fn user_control_ping_response_wire_bytes() {
824 let m = build_user_control_ping_response(0xDEAD_BEEF);
825 assert_eq!(m.msg_type_id, MSG_USER_CONTROL);
826 assert_eq!(m.msg_stream_id, 0);
827 assert_eq!(m.payload, vec![0x00, 0x07, 0xDE, 0xAD, 0xBE, 0xEF]);
828 }
829
830 #[test]
834 fn abort_wire_bytes() {
835 let m = build_abort(0x0001_0203);
836 assert_eq!(m.msg_type_id, MSG_ABORT);
837 assert_eq!(m.msg_stream_id, 0);
838 assert_eq!(m.timestamp, 0);
839 assert_eq!(m.payload, vec![0x00, 0x01, 0x02, 0x03]);
840 }
841
842 #[test]
845 fn connect_with_empty_caps_matches_legacy() {
846 let legacy = build_connect(1.0, "live", "rtmp://srv/live", "FMLE/3.0");
847 let caps_empty = build_connect_with_caps(
848 1.0,
849 "live",
850 "rtmp://srv/live",
851 "FMLE/3.0",
852 &ConnectCapabilities::default(),
853 );
854 assert_eq!(legacy.payload, caps_empty.payload);
855 }
856
857 #[test]
861 fn connect_with_caps_appends_in_documented_order() {
862 let mut video = crate::caps::FourCcInfoMap::new();
863 video.insert("*", crate::caps::FOURCC_INFO_CAN_FORWARD);
864 let mut audio = crate::caps::FourCcInfoMap::new();
865 audio.insert("Opus", crate::caps::FOURCC_INFO_CAN_DECODE);
866 let caps = ConnectCapabilities {
867 object_encoding: Some(crate::caps::OBJECT_ENCODING_AMF3),
868 fourcc_list: vec!["av01".into(), "hvc1".into()],
869 video_fourcc_info_map: video,
870 audio_fourcc_info_map: audio,
871 caps_ex: crate::caps::CAPS_EX_RECONNECT | crate::caps::CAPS_EX_MULTITRACK,
872 };
873
874 let msg = build_connect_with_caps(1.0, "live", "rtmp://srv/live", "FMLE/3.0", &caps);
875 let vals = crate::amf::decode_all(&msg.payload).unwrap();
879 assert_eq!(vals[0].as_str(), Some("connect"));
880 let cmd_obj = match &vals[2] {
881 Amf0Value::Object(p) => p,
882 other => panic!("expected Object for command object, got {other:?}"),
883 };
884 let names: Vec<&str> = cmd_obj.iter().map(|(k, _)| k.as_str()).collect();
885 let legacy_count = names
886 .iter()
887 .position(|n| *n == "videoFunction")
888 .expect("legacy block must end with videoFunction")
889 + 1;
890 let extras = &names[legacy_count..];
891 assert_eq!(
892 extras,
893 &[
894 "objectEncoding",
895 "fourCcList",
896 "videoFourCcInfoMap",
897 "audioFourCcInfoMap",
898 "capsEx",
899 ],
900 );
901 }
902
903 #[test]
907 fn connect_result_with_caps_emits_info_block() {
908 let mut video = crate::caps::FourCcInfoMap::new();
909 video.insert("hvc1", crate::caps::FOURCC_INFO_CAN_DECODE);
910 let caps = ConnectCapabilities {
911 video_fourcc_info_map: video,
912 caps_ex: crate::caps::CAPS_EX_MULTITRACK | crate::caps::CAPS_EX_MOD_EX,
913 ..Default::default()
914 };
915 let msg = build_connect_result_with_caps(1.0, &caps);
916
917 let vals = crate::amf::decode_all(&msg.payload).unwrap();
918 assert_eq!(vals[0].as_str(), Some("_result"));
919 let info = &vals[3];
921 assert_eq!(
922 info.get("code").and_then(Amf0Value::as_str),
923 Some("NetConnection.Connect.Success"),
924 );
925 let parsed = ConnectCapabilities::from_amf0(info);
926 assert_eq!(parsed.caps_ex, caps.caps_ex);
927 assert_eq!(parsed.video_fourcc_info_map.get("hvc1"), Some(1));
928 }
929
930 #[test]
934 fn connect_result_with_empty_caps_matches_legacy() {
935 let legacy = build_connect_result(7.0);
936 let empty = build_connect_result_with_caps(7.0, &ConnectCapabilities::default());
937 assert_eq!(legacy.payload, empty.payload);
938 }
939
940 #[test]
945 fn user_control_event_parse_recognises_spec_types() {
946 let cases: &[(&[u8], UserControlEvent)] = &[
947 (
948 &[0x00, 0x00, 0x00, 0x00, 0x00, 0x01],
949 UserControlEvent::StreamBegin { stream_id: 1 },
950 ),
951 (
952 &[0x00, 0x01, 0x00, 0x00, 0x00, 0x07],
953 UserControlEvent::StreamEof { stream_id: 7 },
954 ),
955 (
956 &[0x00, 0x02, 0x00, 0x10, 0x20, 0x30],
957 UserControlEvent::StreamDry {
958 stream_id: 0x0010_2030,
959 },
960 ),
961 (
962 &[0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x0B, 0xB8],
963 UserControlEvent::SetBufferLength {
964 stream_id: 1,
965 buffer_ms: 3000,
966 },
967 ),
968 (
969 &[0x00, 0x04, 0x00, 0x00, 0x00, 0x05],
970 UserControlEvent::StreamIsRecorded { stream_id: 5 },
971 ),
972 (
973 &[0x00, 0x06, 0xDE, 0xAD, 0xBE, 0xEF],
974 UserControlEvent::PingRequest {
975 timestamp_ms: 0xDEAD_BEEF,
976 },
977 ),
978 (
979 &[0x00, 0x07, 0xDE, 0xAD, 0xBE, 0xEF],
980 UserControlEvent::PingResponse {
981 timestamp_ms: 0xDEAD_BEEF,
982 },
983 ),
984 ];
985 for (wire, expected) in cases {
986 let parsed = UserControlEvent::parse(wire).expect("parse UCM");
987 assert_eq!(&parsed, expected);
988 assert!(parsed.is_spec_defined());
989 assert_eq!(
990 parsed.event_type() as usize,
991 ((wire[0] as usize) << 8) | wire[1] as usize
992 );
993 }
994 }
995
996 #[test]
1000 fn user_control_event_round_trip_matches_builder_bytes() {
1001 let originals = [
1002 build_user_control_stream_begin(1),
1003 build_user_control_stream_eof(7),
1004 build_user_control_stream_dry(0x0010_2030),
1005 build_user_control_set_buffer_length(1, 3000),
1006 build_user_control_stream_is_recorded(5),
1007 build_user_control_ping_request(0xDEAD_BEEF),
1008 build_user_control_ping_response(0xDEAD_BEEF),
1009 ];
1010 for m in &originals {
1011 let parsed = UserControlEvent::parse(&m.payload).expect("parse UCM");
1012 let rebuilt = parsed.to_message();
1013 assert_eq!(rebuilt.msg_type_id, MSG_USER_CONTROL);
1014 assert_eq!(rebuilt.msg_stream_id, 0);
1015 assert_eq!(rebuilt.timestamp, 0);
1016 assert_eq!(rebuilt.payload, m.payload);
1017 }
1018 }
1019
1020 #[test]
1025 fn user_control_event_unknown_preserves_event_type_and_tail() {
1026 let wire: &[u8] = &[0x00, 0x05, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE];
1028 let parsed = UserControlEvent::parse(wire).expect("parse reserved UCM");
1029 assert_eq!(
1030 parsed,
1031 UserControlEvent::Unknown {
1032 event_type: 5,
1033 data: vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE],
1034 },
1035 );
1036 assert!(!parsed.is_spec_defined());
1037 assert_eq!(parsed.event_type(), 5);
1038 let rebuilt = parsed.to_message();
1040 assert_eq!(rebuilt.payload, wire);
1041
1042 let future: &[u8] = &[0xFF, 0xFE];
1045 let parsed_future = UserControlEvent::parse(future).expect("parse future UCM");
1046 assert_eq!(
1047 parsed_future,
1048 UserControlEvent::Unknown {
1049 event_type: 0xFFFE,
1050 data: Vec::new(),
1051 },
1052 );
1053 assert_eq!(parsed_future.to_message().payload, future);
1054 }
1055
1056 #[test]
1061 fn user_control_event_parse_rejects_truncated_payload() {
1062 assert!(matches!(
1064 UserControlEvent::parse(&[]),
1065 Err(Error::ProtocolViolation(_))
1066 ));
1067 assert!(matches!(
1068 UserControlEvent::parse(&[0x00]),
1069 Err(Error::ProtocolViolation(_))
1070 ));
1071 for type_byte in [
1073 USR_STREAM_BEGIN,
1074 USR_STREAM_EOF,
1075 USR_STREAM_DRY,
1076 USR_STREAM_IS_RECORDED,
1077 USR_PING_REQUEST,
1078 USR_PING_RESPONSE,
1079 ] {
1080 let wire = [(type_byte >> 8) as u8, type_byte as u8, 0x00, 0x00, 0x00];
1081 assert!(matches!(
1082 UserControlEvent::parse(&wire),
1083 Err(Error::ProtocolViolation(_))
1084 ));
1085 }
1086 let too_short = [0x00, 0x03, 0, 0, 0, 1, 0, 0, 11];
1088 assert!(matches!(
1089 UserControlEvent::parse(&too_short),
1090 Err(Error::ProtocolViolation(_))
1091 ));
1092 }
1093
1094 #[test]
1101 fn reconnect_request_full_info_object() {
1102 let m = build_reconnect_request(
1103 Some("rtmp://foo.mydomain.com:1935/realtimeapp"),
1104 Some("The streaming server is undergoing updates."),
1105 );
1106 assert_eq!(m.msg_type_id, MSG_COMMAND_AMF0);
1107 assert_eq!(m.msg_stream_id, 0, "NetConnection command stream");
1108 let vals = crate::amf::decode_all(&m.payload).unwrap();
1109 assert_eq!(vals[0].as_str(), Some("onStatus"));
1110 assert_eq!(vals[1].as_f64(), Some(0.0), "transaction id 0");
1111 assert_eq!(vals[2], Amf0Value::Null, "no command object");
1112 let info = &vals[3];
1113 assert_eq!(
1114 info.get("code").and_then(Amf0Value::as_str),
1115 Some(RECONNECT_REQUEST_CODE)
1116 );
1117 assert_eq!(
1118 info.get("level").and_then(Amf0Value::as_str),
1119 Some("status")
1120 );
1121 assert_eq!(
1122 info.get("tcUrl").and_then(Amf0Value::as_str),
1123 Some("rtmp://foo.mydomain.com:1935/realtimeapp")
1124 );
1125 assert_eq!(
1126 info.get("description").and_then(Amf0Value::as_str),
1127 Some("The streaming server is undergoing updates.")
1128 );
1129 }
1130
1131 #[test]
1135 fn reconnect_request_minimal_info_object() {
1136 let m = build_reconnect_request(None, None);
1137 let vals = crate::amf::decode_all(&m.payload).unwrap();
1138 let info = &vals[3];
1139 assert_eq!(
1140 info.get("code").and_then(Amf0Value::as_str),
1141 Some(RECONNECT_REQUEST_CODE)
1142 );
1143 assert_eq!(
1144 info.get("level").and_then(Amf0Value::as_str),
1145 Some("status")
1146 );
1147 assert!(info.get("tcUrl").is_none(), "tcUrl omitted when None");
1148 assert!(
1149 info.get("description").is_none(),
1150 "description omitted when None"
1151 );
1152 }
1153}