1use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::collections::HashMap;
13
14use crate::amf::{Amf0Decoder, Amf0Encoder, AmfValue};
15use crate::error::{AmfError, ProtocolError, Result};
16use crate::media::fourcc::{AudioFourCc, VideoFourCc};
17use crate::protocol::chunk::RtmpChunk;
18use crate::protocol::constants::*;
19use crate::protocol::enhanced::{CapsEx, EnhancedCapabilities, FourCcCapability};
20
21#[derive(Debug, Clone)]
23pub enum RtmpMessage {
24 SetChunkSize(u32),
26
27 Abort { csid: u32 },
29
30 Acknowledgement { sequence: u32 },
32
33 UserControl(UserControlEvent),
35
36 WindowAckSize(u32),
38
39 SetPeerBandwidth { size: u32, limit_type: u8 },
41
42 Audio { timestamp: u32, data: Bytes },
44
45 Video { timestamp: u32, data: Bytes },
47
48 Command(Command),
50
51 Data(DataMessage),
53
54 CommandAmf3(Command),
56
57 DataAmf3(DataMessage),
59
60 Aggregate { data: Bytes },
62
63 Unknown { type_id: u8, data: Bytes },
65}
66
67#[derive(Debug, Clone)]
69pub enum UserControlEvent {
70 StreamBegin(u32),
71 StreamEof(u32),
72 StreamDry(u32),
73 SetBufferLength { stream_id: u32, buffer_ms: u32 },
74 StreamIsRecorded(u32),
75 PingRequest(u32),
76 PingResponse(u32),
77 Unknown { event_type: u16, data: Bytes },
78}
79
80#[derive(Debug, Clone)]
82pub struct Command {
83 pub name: String,
85 pub transaction_id: f64,
87 pub command_object: AmfValue,
89 pub arguments: Vec<AmfValue>,
91 pub stream_id: u32,
93}
94
95#[derive(Debug, Clone)]
97pub struct DataMessage {
98 pub name: String,
100 pub values: Vec<AmfValue>,
102 pub stream_id: u32,
104}
105
106#[derive(Debug, Clone, Default)]
108pub struct ConnectParams {
109 pub app: String,
111 pub flash_ver: Option<String>,
113 pub swf_url: Option<String>,
115 pub tc_url: Option<String>,
117 pub fpad: bool,
119 pub audio_codecs: u32,
121 pub video_codecs: u32,
123 pub video_function: u32,
125 pub page_url: Option<String>,
127 pub object_encoding: f64,
129 pub extra: HashMap<String, AmfValue>,
131
132 pub fourcc_list: Option<Vec<String>>,
140
141 pub video_fourcc_info_map: Option<HashMap<String, u32>>,
148
149 pub audio_fourcc_info_map: Option<HashMap<String, u32>>,
153
154 pub caps_ex: Option<u32>,
161}
162
163impl ConnectParams {
164 pub fn from_amf(obj: &AmfValue) -> Self {
166 let mut params = ConnectParams::default();
167
168 if let Some(map) = obj.as_object() {
169 for (key, value) in map {
170 match key.as_str() {
171 "app" => {
172 if let Some(s) = value.as_str() {
173 params.app = s.to_string();
174 }
175 }
176 "flashVer" | "flashver" => {
177 params.flash_ver = value.as_str().map(|s| s.to_string());
178 }
179 "swfUrl" | "swfurl" => {
180 params.swf_url = value.as_str().map(|s| s.to_string());
181 }
182 "tcUrl" | "tcurl" => {
183 params.tc_url = value.as_str().map(|s| s.to_string());
184 }
185 "fpad" => {
186 params.fpad = value.as_bool().unwrap_or(false);
187 }
188 "audioCodecs" | "audiocodecs" => {
189 params.audio_codecs = value.as_number().unwrap_or(0.0) as u32;
190 }
191 "videoCodecs" | "videocodecs" => {
192 params.video_codecs = value.as_number().unwrap_or(0.0) as u32;
193 }
194 "videoFunction" | "videofunction" => {
195 params.video_function = value.as_number().unwrap_or(0.0) as u32;
196 }
197 "pageUrl" | "pageurl" => {
198 params.page_url = value.as_str().map(|s| s.to_string());
199 }
200 "objectEncoding" | "objectencoding" => {
201 params.object_encoding = value.as_number().unwrap_or(0.0);
202 }
203 "fourCcList" => {
205 params.fourcc_list = Self::parse_fourcc_list(value);
206 }
207 "videoFourCcInfoMap" => {
208 params.video_fourcc_info_map = Self::parse_fourcc_info_map(value);
209 }
210 "audioFourCcInfoMap" => {
211 params.audio_fourcc_info_map = Self::parse_fourcc_info_map(value);
212 }
213 "capsEx" => {
214 params.caps_ex = value.as_number().map(|n| n as u32);
215 }
216 _ => {
217 params.extra.insert(key.clone(), value.clone());
218 }
219 }
220 }
221 }
222
223 params
224 }
225
226 fn parse_fourcc_list(value: &AmfValue) -> Option<Vec<String>> {
228 if let AmfValue::Array(arr) = value {
229 let list: Vec<String> = arr
230 .iter()
231 .filter_map(|v: &AmfValue| v.as_str().map(|s| s.to_string()))
232 .collect();
233 if list.is_empty() {
234 None
235 } else {
236 Some(list)
237 }
238 } else {
239 None
240 }
241 }
242
243 fn parse_fourcc_info_map(value: &AmfValue) -> Option<HashMap<String, u32>> {
245 if let Some(map) = value.as_object() {
246 let info_map: HashMap<String, u32> = map
247 .iter()
248 .filter_map(|(k, v)| v.as_number().map(|n| (k.clone(), n as u32)))
249 .collect();
250 if info_map.is_empty() {
251 None
252 } else {
253 Some(info_map)
254 }
255 } else {
256 None
257 }
258 }
259
260 pub fn has_enhanced_rtmp(&self) -> bool {
264 self.fourcc_list.is_some()
265 || self.video_fourcc_info_map.is_some()
266 || self.audio_fourcc_info_map.is_some()
267 || self.caps_ex.is_some()
268 }
269
270 pub fn caps_ex_flags(&self) -> CapsEx {
272 CapsEx::from_bits(self.caps_ex.unwrap_or(0))
273 }
274
275 pub fn to_enhanced_capabilities(&self) -> EnhancedCapabilities {
279 if !self.has_enhanced_rtmp() {
280 return EnhancedCapabilities::new();
281 }
282
283 let mut caps = EnhancedCapabilities {
284 enabled: true,
285 caps_ex: self.caps_ex_flags(),
286 video_codecs: HashMap::new(),
287 audio_codecs: HashMap::new(),
288 ..Default::default()
289 };
290
291 if let Some(map) = &self.video_fourcc_info_map {
293 for (fourcc_str, capability_bits) in map {
294 if let Some(fourcc) = VideoFourCc::from_fourcc_str(fourcc_str) {
295 caps.video_codecs
296 .insert(fourcc, FourCcCapability::from_bits(*capability_bits));
297 }
298 }
299 }
300
301 if let Some(map) = &self.audio_fourcc_info_map {
303 for (fourcc_str, capability_bits) in map {
304 if let Some(fourcc) = AudioFourCc::from_fourcc_str(fourcc_str) {
305 caps.audio_codecs
306 .insert(fourcc, FourCcCapability::from_bits(*capability_bits));
307 }
308 }
309 }
310
311 if let Some(list) = &self.fourcc_list {
313 if self.video_fourcc_info_map.is_none() && self.audio_fourcc_info_map.is_none() {
314 for fourcc_str in list {
315 if let Some(fourcc) = VideoFourCc::from_fourcc_str(fourcc_str) {
317 caps.video_codecs
318 .entry(fourcc)
319 .or_insert(FourCcCapability::full());
320 }
321 if let Some(fourcc) = AudioFourCc::from_fourcc_str(fourcc_str) {
323 caps.audio_codecs
324 .entry(fourcc)
325 .or_insert(FourCcCapability::full());
326 }
327 }
328 }
329 }
330
331 caps
332 }
333}
334
335#[derive(Debug, Clone)]
337pub struct PublishParams {
338 pub stream_key: String,
340 pub publish_type: String,
342 pub stream_id: u32,
344}
345
346#[derive(Debug, Clone)]
348pub struct PlayParams {
349 pub stream_name: String,
351 pub start: f64,
353 pub duration: f64,
355 pub reset: bool,
357 pub stream_id: u32,
359}
360
361impl RtmpMessage {
362 pub fn from_chunk(chunk: &RtmpChunk) -> Result<Self> {
364 let mut payload = chunk.payload.clone();
365
366 match chunk.message_type {
367 MSG_SET_CHUNK_SIZE => {
368 if payload.len() < 4 {
369 return Err(ProtocolError::InvalidChunkHeader.into());
370 }
371 let size = payload.get_u32() & 0x7FFFFFFF; Ok(RtmpMessage::SetChunkSize(size))
373 }
374
375 MSG_ABORT => {
376 if payload.len() < 4 {
377 return Err(ProtocolError::InvalidChunkHeader.into());
378 }
379 Ok(RtmpMessage::Abort {
380 csid: payload.get_u32(),
381 })
382 }
383
384 MSG_ACKNOWLEDGEMENT => {
385 if payload.len() < 4 {
386 return Err(ProtocolError::InvalidChunkHeader.into());
387 }
388 Ok(RtmpMessage::Acknowledgement {
389 sequence: payload.get_u32(),
390 })
391 }
392
393 MSG_USER_CONTROL => Self::parse_user_control(&mut payload),
394
395 MSG_WINDOW_ACK_SIZE => {
396 if payload.len() < 4 {
397 return Err(ProtocolError::InvalidChunkHeader.into());
398 }
399 Ok(RtmpMessage::WindowAckSize(payload.get_u32()))
400 }
401
402 MSG_SET_PEER_BANDWIDTH => {
403 if payload.len() < 5 {
404 return Err(ProtocolError::InvalidChunkHeader.into());
405 }
406 let size = payload.get_u32();
407 let limit_type = payload.get_u8();
408 Ok(RtmpMessage::SetPeerBandwidth { size, limit_type })
409 }
410
411 MSG_AUDIO => Ok(RtmpMessage::Audio {
412 timestamp: chunk.timestamp,
413 data: payload,
414 }),
415
416 MSG_VIDEO => Ok(RtmpMessage::Video {
417 timestamp: chunk.timestamp,
418 data: payload,
419 }),
420
421 MSG_COMMAND_AMF0 => {
422 let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
423 Ok(RtmpMessage::Command(cmd))
424 }
425
426 MSG_COMMAND_AMF3 => {
427 if !payload.is_empty() && payload[0] == 0x00 {
429 payload.advance(1);
430 }
431 let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
432 Ok(RtmpMessage::CommandAmf3(cmd))
433 }
434
435 MSG_DATA_AMF0 => {
436 let data = Self::parse_data(&mut payload, chunk.stream_id)?;
437 Ok(RtmpMessage::Data(data))
438 }
439
440 MSG_DATA_AMF3 => {
441 if !payload.is_empty() && payload[0] == 0x00 {
442 payload.advance(1);
443 }
444 let data = Self::parse_data(&mut payload, chunk.stream_id)?;
445 Ok(RtmpMessage::DataAmf3(data))
446 }
447
448 MSG_AGGREGATE => Ok(RtmpMessage::Aggregate { data: payload }),
449
450 _ => Ok(RtmpMessage::Unknown {
451 type_id: chunk.message_type,
452 data: payload,
453 }),
454 }
455 }
456
457 fn parse_user_control(payload: &mut Bytes) -> Result<Self> {
459 if payload.len() < 6 {
460 return Err(ProtocolError::InvalidChunkHeader.into());
461 }
462
463 let event_type = payload.get_u16();
464 let event = match event_type {
465 UC_STREAM_BEGIN => UserControlEvent::StreamBegin(payload.get_u32()),
466 UC_STREAM_EOF => UserControlEvent::StreamEof(payload.get_u32()),
467 UC_STREAM_DRY => UserControlEvent::StreamDry(payload.get_u32()),
468 UC_SET_BUFFER_LENGTH => {
469 if payload.len() < 8 {
470 return Err(ProtocolError::InvalidChunkHeader.into());
471 }
472 let stream_id = payload.get_u32();
473 let buffer_ms = payload.get_u32();
474 UserControlEvent::SetBufferLength {
475 stream_id,
476 buffer_ms,
477 }
478 }
479 UC_STREAM_IS_RECORDED => UserControlEvent::StreamIsRecorded(payload.get_u32()),
480 UC_PING_REQUEST => UserControlEvent::PingRequest(payload.get_u32()),
481 UC_PING_RESPONSE => UserControlEvent::PingResponse(payload.get_u32()),
482 _ => UserControlEvent::Unknown {
483 event_type,
484 data: payload.clone(),
485 },
486 };
487
488 Ok(RtmpMessage::UserControl(event))
489 }
490
491 fn parse_command(payload: &mut Bytes, stream_id: u32) -> Result<Command> {
493 let mut decoder = Amf0Decoder::new();
494
495 let name = match decoder.decode(payload)? {
497 AmfValue::String(s) => s,
498 _ => return Err(ProtocolError::InvalidCommand("Expected command name".into()).into()),
499 };
500
501 let transaction_id = match decoder.decode(payload)? {
503 AmfValue::Number(n) => n,
504 _ => 0.0, };
506
507 let command_object = if payload.has_remaining() {
509 decoder.decode(payload)?
510 } else {
511 AmfValue::Null
512 };
513
514 let mut arguments = Vec::new();
516 while payload.has_remaining() {
517 match decoder.decode(payload) {
518 Ok(v) => arguments.push(v),
519 Err(AmfError::UnexpectedEof) => break,
520 Err(e) => return Err(e.into()),
521 }
522 }
523
524 Ok(Command {
525 name,
526 transaction_id,
527 command_object,
528 arguments,
529 stream_id,
530 })
531 }
532
533 fn parse_data(payload: &mut Bytes, stream_id: u32) -> Result<DataMessage> {
535 let mut decoder = Amf0Decoder::new();
536
537 let name = match decoder.decode(payload)? {
539 AmfValue::String(s) => s,
540 _ => String::new(), };
542
543 let mut values = Vec::new();
545 while payload.has_remaining() {
546 match decoder.decode(payload) {
547 Ok(v) => values.push(v),
548 Err(AmfError::UnexpectedEof) => break,
549 Err(e) => return Err(e.into()),
550 }
551 }
552
553 Ok(DataMessage {
554 name,
555 values,
556 stream_id,
557 })
558 }
559
560 pub fn encode(&self) -> (u8, Bytes) {
562 match self {
563 RtmpMessage::SetChunkSize(size) => {
564 let mut buf = BytesMut::with_capacity(4);
565 buf.put_u32(*size);
566 (MSG_SET_CHUNK_SIZE, buf.freeze())
567 }
568
569 RtmpMessage::Abort { csid } => {
570 let mut buf = BytesMut::with_capacity(4);
571 buf.put_u32(*csid);
572 (MSG_ABORT, buf.freeze())
573 }
574
575 RtmpMessage::Acknowledgement { sequence } => {
576 let mut buf = BytesMut::with_capacity(4);
577 buf.put_u32(*sequence);
578 (MSG_ACKNOWLEDGEMENT, buf.freeze())
579 }
580
581 RtmpMessage::WindowAckSize(size) => {
582 let mut buf = BytesMut::with_capacity(4);
583 buf.put_u32(*size);
584 (MSG_WINDOW_ACK_SIZE, buf.freeze())
585 }
586
587 RtmpMessage::SetPeerBandwidth { size, limit_type } => {
588 let mut buf = BytesMut::with_capacity(5);
589 buf.put_u32(*size);
590 buf.put_u8(*limit_type);
591 (MSG_SET_PEER_BANDWIDTH, buf.freeze())
592 }
593
594 RtmpMessage::UserControl(event) => {
595 let mut buf = BytesMut::with_capacity(10);
596 match event {
597 UserControlEvent::StreamBegin(id) => {
598 buf.put_u16(UC_STREAM_BEGIN);
599 buf.put_u32(*id);
600 }
601 UserControlEvent::StreamEof(id) => {
602 buf.put_u16(UC_STREAM_EOF);
603 buf.put_u32(*id);
604 }
605 UserControlEvent::StreamDry(id) => {
606 buf.put_u16(UC_STREAM_DRY);
607 buf.put_u32(*id);
608 }
609 UserControlEvent::SetBufferLength {
610 stream_id,
611 buffer_ms,
612 } => {
613 buf.put_u16(UC_SET_BUFFER_LENGTH);
614 buf.put_u32(*stream_id);
615 buf.put_u32(*buffer_ms);
616 }
617 UserControlEvent::StreamIsRecorded(id) => {
618 buf.put_u16(UC_STREAM_IS_RECORDED);
619 buf.put_u32(*id);
620 }
621 UserControlEvent::PingRequest(ts) => {
622 buf.put_u16(UC_PING_REQUEST);
623 buf.put_u32(*ts);
624 }
625 UserControlEvent::PingResponse(ts) => {
626 buf.put_u16(UC_PING_RESPONSE);
627 buf.put_u32(*ts);
628 }
629 UserControlEvent::Unknown { event_type, data } => {
630 buf.put_u16(*event_type);
631 buf.put_slice(data);
632 }
633 }
634 (MSG_USER_CONTROL, buf.freeze())
635 }
636
637 RtmpMessage::Audio { data, .. } => (MSG_AUDIO, data.clone()),
638
639 RtmpMessage::Video { data, .. } => (MSG_VIDEO, data.clone()),
640
641 RtmpMessage::Command(cmd) => {
642 let payload = encode_command(cmd);
643 (MSG_COMMAND_AMF0, payload)
644 }
645
646 RtmpMessage::CommandAmf3(cmd) => {
647 let mut buf = BytesMut::new();
648 buf.put_u8(0x00); buf.put_slice(&encode_command(cmd));
650 (MSG_COMMAND_AMF3, buf.freeze())
651 }
652
653 RtmpMessage::Data(data) => {
654 let payload = encode_data(data);
655 (MSG_DATA_AMF0, payload)
656 }
657
658 RtmpMessage::DataAmf3(data) => {
659 let mut buf = BytesMut::new();
660 buf.put_u8(0x00);
661 buf.put_slice(&encode_data(data));
662 (MSG_DATA_AMF3, buf.freeze())
663 }
664
665 RtmpMessage::Aggregate { data } => (MSG_AGGREGATE, data.clone()),
666
667 RtmpMessage::Unknown { type_id, data } => (*type_id, data.clone()),
668 }
669 }
670}
671
672fn encode_command(cmd: &Command) -> Bytes {
674 let mut encoder = Amf0Encoder::new();
675 encoder.encode(&AmfValue::String(cmd.name.clone()));
676 encoder.encode(&AmfValue::Number(cmd.transaction_id));
677 encoder.encode(&cmd.command_object);
678 for arg in &cmd.arguments {
679 encoder.encode(arg);
680 }
681 encoder.finish()
682}
683
684fn encode_data(data: &DataMessage) -> Bytes {
686 let mut encoder = Amf0Encoder::new();
687 encoder.encode(&AmfValue::String(data.name.clone()));
688 for value in &data.values {
689 encoder.encode(value);
690 }
691 encoder.finish()
692}
693
694impl Command {
696 pub fn result(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
698 Command {
699 name: CMD_RESULT.to_string(),
700 transaction_id,
701 command_object: properties,
702 arguments: vec![info],
703 stream_id: 0,
704 }
705 }
706
707 pub fn error(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
709 Command {
710 name: CMD_ERROR.to_string(),
711 transaction_id,
712 command_object: properties,
713 arguments: vec![info],
714 stream_id: 0,
715 }
716 }
717
718 pub fn on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Self {
720 let mut info = HashMap::new();
721 info.insert("level".to_string(), AmfValue::String(level.to_string()));
722 info.insert("code".to_string(), AmfValue::String(code.to_string()));
723 info.insert(
724 "description".to_string(),
725 AmfValue::String(description.to_string()),
726 );
727
728 Command {
729 name: CMD_ON_STATUS.to_string(),
730 transaction_id: 0.0,
731 command_object: AmfValue::Null,
732 arguments: vec![AmfValue::Object(info)],
733 stream_id,
734 }
735 }
736}
737
738#[derive(Debug, Clone, Default)]
757pub struct ConnectResponseBuilder {
758 fms_ver: String,
759 capabilities: u32,
760 enhanced_caps: Option<EnhancedCapabilities>,
761}
762
763impl ConnectResponseBuilder {
764 pub fn new() -> Self {
766 Self {
767 fms_ver: "FMS/3,5,7,7009".to_string(),
768 capabilities: 31,
769 enhanced_caps: None,
770 }
771 }
772
773 pub fn fms_ver(mut self, ver: impl Into<String>) -> Self {
775 self.fms_ver = ver.into();
776 self
777 }
778
779 pub fn capabilities(mut self, caps: u32) -> Self {
781 self.capabilities = caps;
782 self
783 }
784
785 pub fn enhanced_capabilities(mut self, caps: &EnhancedCapabilities) -> Self {
790 if caps.enabled {
791 self.enhanced_caps = Some(caps.clone());
792 }
793 self
794 }
795
796 pub fn build(self, transaction_id: f64) -> Command {
798 let properties = self.build_properties();
799
800 let mut info = HashMap::new();
801 info.insert("level".to_string(), AmfValue::String("status".to_string()));
802 info.insert(
803 "code".to_string(),
804 AmfValue::String(NC_CONNECT_SUCCESS.to_string()),
805 );
806 info.insert(
807 "description".to_string(),
808 AmfValue::String("Connection succeeded.".to_string()),
809 );
810 info.insert(
811 "objectEncoding".to_string(),
812 AmfValue::Number(0.0), );
814
815 Command::result(transaction_id, properties, AmfValue::Object(info))
816 }
817
818 fn build_properties(&self) -> AmfValue {
820 let mut props = HashMap::new();
821 props.insert("fmsVer".to_string(), AmfValue::String(self.fms_ver.clone()));
822 props.insert(
823 "capabilities".to_string(),
824 AmfValue::Number(self.capabilities as f64),
825 );
826
827 if let Some(caps) = &self.enhanced_caps {
829 props.insert(
831 "capsEx".to_string(),
832 AmfValue::Number(caps.caps_ex.bits() as f64),
833 );
834
835 if !caps.video_codecs.is_empty() {
837 let video_map: HashMap<String, AmfValue> = caps
838 .video_codecs
839 .iter()
840 .map(|(fourcc, cap)| {
841 (
842 fourcc.as_fourcc_str().to_string(),
843 AmfValue::Number(cap.bits() as f64),
844 )
845 })
846 .collect();
847 props.insert(
848 "videoFourCcInfoMap".to_string(),
849 AmfValue::Object(video_map),
850 );
851 }
852
853 if !caps.audio_codecs.is_empty() {
855 let audio_map: HashMap<String, AmfValue> = caps
856 .audio_codecs
857 .iter()
858 .map(|(fourcc, cap)| {
859 (
860 fourcc.as_fourcc_str().to_string(),
861 AmfValue::Number(cap.bits() as f64),
862 )
863 })
864 .collect();
865 props.insert(
866 "audioFourCcInfoMap".to_string(),
867 AmfValue::Object(audio_map),
868 );
869 }
870 }
871
872 AmfValue::Object(props)
873 }
874}
875
876#[cfg(test)]
877mod tests {
878 use super::*;
879
880 #[test]
881 fn test_connect_params_parsing() {
882 let mut obj = HashMap::new();
883 obj.insert("app".to_string(), AmfValue::String("live".into()));
884 obj.insert(
885 "tcUrl".to_string(),
886 AmfValue::String("rtmp://localhost/live".into()),
887 );
888 obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
889
890 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
891 assert_eq!(params.app, "live");
892 assert_eq!(params.tc_url, Some("rtmp://localhost/live".into()));
893 assert_eq!(params.object_encoding, 0.0);
894 }
895
896 #[test]
897 fn test_command_roundtrip() {
898 let cmd = Command {
899 name: "connect".to_string(),
900 transaction_id: 1.0,
901 command_object: AmfValue::Null,
902 arguments: vec![AmfValue::String("test".into())],
903 stream_id: 0,
904 };
905
906 let payload = encode_command(&cmd);
907 let chunk = RtmpChunk {
908 csid: CSID_COMMAND,
909 timestamp: 0,
910 message_type: MSG_COMMAND_AMF0,
911 stream_id: 0,
912 payload,
913 };
914
915 let parsed = RtmpMessage::from_chunk(&chunk).unwrap();
916 if let RtmpMessage::Command(parsed_cmd) = parsed {
917 assert_eq!(parsed_cmd.name, "connect");
918 assert_eq!(parsed_cmd.transaction_id, 1.0);
919 } else {
920 panic!("Expected Command message");
921 }
922 }
923
924 #[test]
925 fn test_set_chunk_size_message() {
926 let chunk = RtmpChunk {
927 csid: CSID_PROTOCOL_CONTROL,
928 timestamp: 0,
929 message_type: MSG_SET_CHUNK_SIZE,
930 stream_id: 0,
931 payload: Bytes::from_static(&[0x00, 0x00, 0x10, 0x00]), };
933
934 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
935 assert!(matches!(msg, RtmpMessage::SetChunkSize(4096)));
936
937 let (msg_type, payload) = msg.encode();
939 assert_eq!(msg_type, MSG_SET_CHUNK_SIZE);
940 assert_eq!(&payload[..], &[0x00, 0x00, 0x10, 0x00]);
941 }
942
943 #[test]
944 fn test_abort_message() {
945 let chunk = RtmpChunk {
946 csid: CSID_PROTOCOL_CONTROL,
947 timestamp: 0,
948 message_type: MSG_ABORT,
949 stream_id: 0,
950 payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x05]), };
952
953 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
954 if let RtmpMessage::Abort { csid } = msg {
955 assert_eq!(csid, 5);
956 } else {
957 panic!("Expected Abort message");
958 }
959 }
960
961 #[test]
962 fn test_acknowledgement_message() {
963 let chunk = RtmpChunk {
964 csid: CSID_PROTOCOL_CONTROL,
965 timestamp: 0,
966 message_type: MSG_ACKNOWLEDGEMENT,
967 stream_id: 0,
968 payload: Bytes::from_static(&[0x00, 0x10, 0x00, 0x00]), };
970
971 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
972 if let RtmpMessage::Acknowledgement { sequence } = msg {
973 assert_eq!(sequence, 1048576);
974 } else {
975 panic!("Expected Acknowledgement message");
976 }
977 }
978
979 #[test]
980 fn test_window_ack_size_message() {
981 let chunk = RtmpChunk {
982 csid: CSID_PROTOCOL_CONTROL,
983 timestamp: 0,
984 message_type: MSG_WINDOW_ACK_SIZE,
985 stream_id: 0,
986 payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0]), };
988
989 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
990 if let RtmpMessage::WindowAckSize(size) = msg {
991 assert_eq!(size, 2500000);
992 } else {
993 panic!("Expected WindowAckSize message");
994 }
995 }
996
997 #[test]
998 fn test_set_peer_bandwidth_message() {
999 let chunk = RtmpChunk {
1000 csid: CSID_PROTOCOL_CONTROL,
1001 timestamp: 0,
1002 message_type: MSG_SET_PEER_BANDWIDTH,
1003 stream_id: 0,
1004 payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0, 0x02]), };
1006
1007 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1008 if let RtmpMessage::SetPeerBandwidth { size, limit_type } = msg {
1009 assert_eq!(size, 2500000);
1010 assert_eq!(limit_type, BANDWIDTH_LIMIT_DYNAMIC);
1011 } else {
1012 panic!("Expected SetPeerBandwidth message");
1013 }
1014 }
1015
1016 #[test]
1017 fn test_user_control_stream_begin() {
1018 let chunk = RtmpChunk {
1019 csid: CSID_PROTOCOL_CONTROL,
1020 timestamp: 0,
1021 message_type: MSG_USER_CONTROL,
1022 stream_id: 0,
1023 payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x01]), };
1025
1026 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1027 if let RtmpMessage::UserControl(UserControlEvent::StreamBegin(id)) = msg {
1028 assert_eq!(id, 1);
1029 } else {
1030 panic!("Expected StreamBegin user control");
1031 }
1032 }
1033
1034 #[test]
1035 fn test_user_control_stream_eof() {
1036 let chunk = RtmpChunk {
1037 csid: CSID_PROTOCOL_CONTROL,
1038 timestamp: 0,
1039 message_type: MSG_USER_CONTROL,
1040 stream_id: 0,
1041 payload: Bytes::from_static(&[0x00, 0x01, 0x00, 0x00, 0x00, 0x02]), };
1043
1044 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1045 if let RtmpMessage::UserControl(UserControlEvent::StreamEof(id)) = msg {
1046 assert_eq!(id, 2);
1047 } else {
1048 panic!("Expected StreamEof user control");
1049 }
1050 }
1051
1052 #[test]
1053 fn test_user_control_set_buffer_length() {
1054 let chunk = RtmpChunk {
1055 csid: CSID_PROTOCOL_CONTROL,
1056 timestamp: 0,
1057 message_type: MSG_USER_CONTROL,
1058 stream_id: 0,
1059 payload: Bytes::from_static(&[
1060 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x03, 0xE8, ]),
1064 };
1065
1066 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1067 if let RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
1068 stream_id,
1069 buffer_ms,
1070 }) = msg
1071 {
1072 assert_eq!(stream_id, 1);
1073 assert_eq!(buffer_ms, 1000);
1074 } else {
1075 panic!("Expected SetBufferLength user control");
1076 }
1077 }
1078
1079 #[test]
1080 fn test_user_control_ping_request() {
1081 let chunk = RtmpChunk {
1082 csid: CSID_PROTOCOL_CONTROL,
1083 timestamp: 0,
1084 message_type: MSG_USER_CONTROL,
1085 stream_id: 0,
1086 payload: Bytes::from_static(&[0x00, 0x06, 0x00, 0x01, 0x00, 0x00]), };
1088
1089 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1090 if let RtmpMessage::UserControl(UserControlEvent::PingRequest(ts)) = msg {
1091 assert_eq!(ts, 0x00010000);
1092 } else {
1093 panic!("Expected PingRequest user control");
1094 }
1095 }
1096
1097 #[test]
1098 fn test_user_control_ping_response() {
1099 let chunk = RtmpChunk {
1100 csid: CSID_PROTOCOL_CONTROL,
1101 timestamp: 0,
1102 message_type: MSG_USER_CONTROL,
1103 stream_id: 0,
1104 payload: Bytes::from_static(&[0x00, 0x07, 0x00, 0x00, 0x00, 0x64]), };
1106
1107 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1108 if let RtmpMessage::UserControl(UserControlEvent::PingResponse(ts)) = msg {
1109 assert_eq!(ts, 100);
1110 } else {
1111 panic!("Expected PingResponse user control");
1112 }
1113 }
1114
1115 #[test]
1116 fn test_audio_message() {
1117 let audio_data = Bytes::from_static(&[0xAF, 0x01, 0x21, 0x00, 0x00]);
1118
1119 let chunk = RtmpChunk {
1120 csid: CSID_AUDIO,
1121 timestamp: 1000,
1122 message_type: MSG_AUDIO,
1123 stream_id: 1,
1124 payload: audio_data.clone(),
1125 };
1126
1127 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1128 if let RtmpMessage::Audio { timestamp, data } = msg {
1129 assert_eq!(timestamp, 1000);
1130 assert_eq!(data, audio_data);
1131 } else {
1132 panic!("Expected Audio message");
1133 }
1134 }
1135
1136 #[test]
1137 fn test_video_message() {
1138 let video_data = Bytes::from_static(&[0x17, 0x01, 0x00, 0x00, 0x00, 0x00]);
1139
1140 let chunk = RtmpChunk {
1141 csid: CSID_VIDEO,
1142 timestamp: 2000,
1143 message_type: MSG_VIDEO,
1144 stream_id: 1,
1145 payload: video_data.clone(),
1146 };
1147
1148 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1149 if let RtmpMessage::Video { timestamp, data } = msg {
1150 assert_eq!(timestamp, 2000);
1151 assert_eq!(data, video_data);
1152 } else {
1153 panic!("Expected Video message");
1154 }
1155 }
1156
1157 #[test]
1158 fn test_data_message() {
1159 let mut encoder = Amf0Encoder::new();
1160 encoder.encode(&AmfValue::String("@setDataFrame".into()));
1161 encoder.encode(&AmfValue::String("onMetaData".into()));
1162 let mut metadata = HashMap::new();
1163 metadata.insert("width".to_string(), AmfValue::Number(1920.0));
1164 encoder.encode(&AmfValue::Object(metadata));
1165
1166 let chunk = RtmpChunk {
1167 csid: CSID_COMMAND,
1168 timestamp: 0,
1169 message_type: MSG_DATA_AMF0,
1170 stream_id: 1,
1171 payload: encoder.finish(),
1172 };
1173
1174 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1175 if let RtmpMessage::Data(data) = msg {
1176 assert_eq!(data.name, "@setDataFrame");
1177 assert_eq!(data.stream_id, 1);
1178 assert_eq!(data.values.len(), 2);
1179 } else {
1180 panic!("Expected Data message");
1181 }
1182 }
1183
1184 #[test]
1185 fn test_unknown_message_type() {
1186 let chunk = RtmpChunk {
1187 csid: CSID_COMMAND,
1188 timestamp: 0,
1189 message_type: 99, stream_id: 0,
1191 payload: Bytes::from_static(b"unknown"),
1192 };
1193
1194 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1195 if let RtmpMessage::Unknown { type_id, data } = msg {
1196 assert_eq!(type_id, 99);
1197 assert_eq!(data.as_ref(), b"unknown");
1198 } else {
1199 panic!("Expected Unknown message");
1200 }
1201 }
1202
1203 #[test]
1204 fn test_command_result() {
1205 let mut props = HashMap::new();
1206 props.insert(
1207 "fmsVer".to_string(),
1208 AmfValue::String("FMS/3,5,7,7009".into()),
1209 );
1210 props.insert("capabilities".to_string(), AmfValue::Number(31.0));
1211
1212 let result = Command::result(1.0, AmfValue::Object(props), AmfValue::Null);
1213
1214 assert_eq!(result.name, "_result");
1215 assert_eq!(result.transaction_id, 1.0);
1216 }
1217
1218 #[test]
1219 fn test_command_error() {
1220 let error = Command::error(1.0, AmfValue::Null, AmfValue::String("error".into()));
1221
1222 assert_eq!(error.name, "_error");
1223 assert_eq!(error.transaction_id, 1.0);
1224 }
1225
1226 #[test]
1227 fn test_command_on_status() {
1228 let status = Command::on_status(1, "status", NS_PUBLISH_START, "Publishing started");
1229
1230 assert_eq!(status.name, "onStatus");
1231 assert_eq!(status.transaction_id, 0.0);
1232 assert_eq!(status.stream_id, 1);
1233
1234 if let Some(info) = status.arguments.first() {
1235 if let AmfValue::Object(props) = info {
1236 assert_eq!(props.get("level").unwrap().as_str(), Some("status"));
1237 assert_eq!(props.get("code").unwrap().as_str(), Some(NS_PUBLISH_START));
1238 } else {
1239 panic!("Expected Object in arguments");
1240 }
1241 } else {
1242 panic!("Expected arguments");
1243 }
1244 }
1245
1246 #[test]
1247 fn test_connect_params_all_fields() {
1248 let mut obj = HashMap::new();
1249 obj.insert("app".to_string(), AmfValue::String("live".into()));
1250 obj.insert(
1251 "flashVer".to_string(),
1252 AmfValue::String("OBS-Studio/29.0".into()),
1253 );
1254 obj.insert(
1255 "swfUrl".to_string(),
1256 AmfValue::String("rtmp://example.com/app".into()),
1257 );
1258 obj.insert(
1259 "tcUrl".to_string(),
1260 AmfValue::String("rtmp://example.com/live".into()),
1261 );
1262 obj.insert("fpad".to_string(), AmfValue::Boolean(false));
1263 obj.insert("audioCodecs".to_string(), AmfValue::Number(3575.0));
1264 obj.insert("videoCodecs".to_string(), AmfValue::Number(252.0));
1265 obj.insert("videoFunction".to_string(), AmfValue::Number(1.0));
1266 obj.insert(
1267 "pageUrl".to_string(),
1268 AmfValue::String("http://twitch.tv".into()),
1269 );
1270 obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
1271 obj.insert("custom".to_string(), AmfValue::String("value".into()));
1272
1273 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1274
1275 assert_eq!(params.app, "live");
1276 assert_eq!(params.flash_ver, Some("OBS-Studio/29.0".into()));
1277 assert_eq!(params.swf_url, Some("rtmp://example.com/app".into()));
1278 assert_eq!(params.tc_url, Some("rtmp://example.com/live".into()));
1279 assert!(!params.fpad);
1280 assert_eq!(params.audio_codecs, 3575);
1281 assert_eq!(params.video_codecs, 252);
1282 assert_eq!(params.video_function, 1);
1283 assert_eq!(params.page_url, Some("http://twitch.tv".into()));
1284 assert_eq!(params.object_encoding, 0.0);
1285 assert!(params.extra.contains_key("custom"));
1286 }
1287
1288 #[test]
1289 fn test_connect_params_case_insensitive() {
1290 let mut obj = HashMap::new();
1292 obj.insert("flashver".to_string(), AmfValue::String("test".into()));
1293 obj.insert("tcurl".to_string(), AmfValue::String("url".into()));
1294 obj.insert("pageurl".to_string(), AmfValue::String("page".into()));
1295 obj.insert("swfurl".to_string(), AmfValue::String("swf".into()));
1296
1297 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1298
1299 assert_eq!(params.flash_ver, Some("test".into()));
1300 assert_eq!(params.tc_url, Some("url".into()));
1301 assert_eq!(params.page_url, Some("page".into()));
1302 assert_eq!(params.swf_url, Some("swf".into()));
1303 }
1304
1305 #[test]
1306 fn test_connect_params_from_non_object() {
1307 let params = ConnectParams::from_amf(&AmfValue::Null);
1309 assert_eq!(params.app, "");
1310 assert!(params.flash_ver.is_none());
1311 }
1312
1313 #[test]
1314 fn test_message_encode_roundtrip() {
1315 let msg = RtmpMessage::SetChunkSize(4096);
1319 let (msg_type, payload) = msg.encode();
1320 let chunk = RtmpChunk {
1321 csid: CSID_PROTOCOL_CONTROL,
1322 timestamp: 0,
1323 message_type: msg_type,
1324 stream_id: 0,
1325 payload,
1326 };
1327 let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1328 assert!(matches!(decoded, RtmpMessage::SetChunkSize(4096)));
1329
1330 let msg = RtmpMessage::WindowAckSize(2500000);
1332 let (msg_type, payload) = msg.encode();
1333 let chunk = RtmpChunk {
1334 csid: CSID_PROTOCOL_CONTROL,
1335 timestamp: 0,
1336 message_type: msg_type,
1337 stream_id: 0,
1338 payload,
1339 };
1340 let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1341 assert!(matches!(decoded, RtmpMessage::WindowAckSize(2500000)));
1342 }
1343
1344 #[test]
1345 fn test_user_control_event_encode() {
1346 let events = vec![
1348 RtmpMessage::UserControl(UserControlEvent::StreamBegin(1)),
1349 RtmpMessage::UserControl(UserControlEvent::StreamEof(2)),
1350 RtmpMessage::UserControl(UserControlEvent::StreamDry(3)),
1351 RtmpMessage::UserControl(UserControlEvent::StreamIsRecorded(4)),
1352 RtmpMessage::UserControl(UserControlEvent::PingRequest(5)),
1353 RtmpMessage::UserControl(UserControlEvent::PingResponse(6)),
1354 RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
1355 stream_id: 1,
1356 buffer_ms: 1000,
1357 }),
1358 ];
1359
1360 for msg in events {
1361 let (msg_type, payload) = msg.encode();
1362 assert_eq!(msg_type, MSG_USER_CONTROL);
1363 assert!(!payload.is_empty());
1364 }
1365 }
1366
1367 #[test]
1368 fn test_aggregate_message() {
1369 let chunk = RtmpChunk {
1370 csid: CSID_VIDEO,
1371 timestamp: 0,
1372 message_type: MSG_AGGREGATE,
1373 stream_id: 1,
1374 payload: Bytes::from_static(b"aggregate data"),
1375 };
1376
1377 let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1378 if let RtmpMessage::Aggregate { data } = msg {
1379 assert_eq!(data.as_ref(), b"aggregate data");
1380 } else {
1381 panic!("Expected Aggregate message");
1382 }
1383 }
1384
1385 #[test]
1386 fn test_truncated_protocol_control_messages() {
1387 let chunk = RtmpChunk {
1389 csid: CSID_PROTOCOL_CONTROL,
1390 timestamp: 0,
1391 message_type: MSG_SET_CHUNK_SIZE,
1392 stream_id: 0,
1393 payload: Bytes::from_static(&[0x00, 0x00]), };
1395
1396 let result = RtmpMessage::from_chunk(&chunk);
1397 assert!(result.is_err());
1398
1399 let chunk = RtmpChunk {
1401 csid: CSID_PROTOCOL_CONTROL,
1402 timestamp: 0,
1403 message_type: MSG_WINDOW_ACK_SIZE,
1404 stream_id: 0,
1405 payload: Bytes::from_static(&[0x00]),
1406 };
1407
1408 let result = RtmpMessage::from_chunk(&chunk);
1409 assert!(result.is_err());
1410 }
1411
1412 #[test]
1417 fn test_connect_params_ertmp_fields() {
1418 let mut obj = HashMap::new();
1419 obj.insert("app".to_string(), AmfValue::String("live".into()));
1420
1421 obj.insert("capsEx".to_string(), AmfValue::Number(3.0)); let mut video_map = HashMap::new();
1425 video_map.insert("avc1".to_string(), AmfValue::Number(7.0)); video_map.insert("hvc1".to_string(), AmfValue::Number(4.0)); obj.insert(
1428 "videoFourCcInfoMap".to_string(),
1429 AmfValue::Object(video_map),
1430 );
1431
1432 let mut audio_map = HashMap::new();
1433 audio_map.insert("mp4a".to_string(), AmfValue::Number(7.0));
1434 audio_map.insert("Opus".to_string(), AmfValue::Number(4.0));
1435 obj.insert(
1436 "audioFourCcInfoMap".to_string(),
1437 AmfValue::Object(audio_map),
1438 );
1439
1440 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1441
1442 assert!(params.has_enhanced_rtmp());
1443 assert_eq!(params.caps_ex, Some(3));
1444 assert!(params.video_fourcc_info_map.is_some());
1445 assert!(params.audio_fourcc_info_map.is_some());
1446
1447 let video_map = params.video_fourcc_info_map.unwrap();
1448 assert_eq!(video_map.get("avc1"), Some(&7));
1449 assert_eq!(video_map.get("hvc1"), Some(&4));
1450
1451 let audio_map = params.audio_fourcc_info_map.unwrap();
1452 assert_eq!(audio_map.get("mp4a"), Some(&7));
1453 assert_eq!(audio_map.get("Opus"), Some(&4));
1454 }
1455
1456 #[test]
1457 fn test_connect_params_fourcc_list() {
1458 let mut obj = HashMap::new();
1459 obj.insert("app".to_string(), AmfValue::String("live".into()));
1460
1461 obj.insert(
1463 "fourCcList".to_string(),
1464 AmfValue::Array(vec![
1465 AmfValue::String("avc1".into()),
1466 AmfValue::String("hvc1".into()),
1467 AmfValue::String("mp4a".into()),
1468 ]),
1469 );
1470
1471 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1472
1473 assert!(params.has_enhanced_rtmp());
1474 let list = params.fourcc_list.unwrap();
1475 assert_eq!(list.len(), 3);
1476 assert!(list.contains(&"avc1".to_string()));
1477 assert!(list.contains(&"hvc1".to_string()));
1478 assert!(list.contains(&"mp4a".to_string()));
1479 }
1480
1481 #[test]
1482 fn test_connect_params_no_ertmp() {
1483 let mut obj = HashMap::new();
1484 obj.insert("app".to_string(), AmfValue::String("live".into()));
1485
1486 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1487
1488 assert!(!params.has_enhanced_rtmp());
1489 assert!(params.fourcc_list.is_none());
1490 assert!(params.video_fourcc_info_map.is_none());
1491 assert!(params.audio_fourcc_info_map.is_none());
1492 assert!(params.caps_ex.is_none());
1493 }
1494
1495 #[test]
1496 fn test_connect_params_caps_ex_flags() {
1497 let mut obj = HashMap::new();
1498 obj.insert("app".to_string(), AmfValue::String("live".into()));
1499 obj.insert("capsEx".to_string(), AmfValue::Number(15.0)); let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1502 let caps = params.caps_ex_flags();
1503
1504 assert!(caps.supports_reconnect());
1505 assert!(caps.supports_multitrack());
1506 assert!(caps.supports_modex());
1507 assert!(caps.supports_timestamp_nano_offset());
1508 }
1509
1510 #[test]
1511 fn test_connect_params_to_enhanced_capabilities() {
1512 let mut obj = HashMap::new();
1513 obj.insert("app".to_string(), AmfValue::String("live".into()));
1514 obj.insert("capsEx".to_string(), AmfValue::Number(6.0)); let mut video_map = HashMap::new();
1517 video_map.insert("avc1".to_string(), AmfValue::Number(7.0));
1518 video_map.insert("av01".to_string(), AmfValue::Number(4.0));
1519 obj.insert(
1520 "videoFourCcInfoMap".to_string(),
1521 AmfValue::Object(video_map),
1522 );
1523
1524 let mut audio_map = HashMap::new();
1525 audio_map.insert("Opus".to_string(), AmfValue::Number(7.0));
1526 obj.insert(
1527 "audioFourCcInfoMap".to_string(),
1528 AmfValue::Object(audio_map),
1529 );
1530
1531 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1532 let caps = params.to_enhanced_capabilities();
1533
1534 assert!(caps.enabled);
1535 assert!(caps.supports_multitrack());
1536 assert!(caps.caps_ex.supports_modex());
1537 assert!(!caps.caps_ex.supports_reconnect());
1538
1539 assert!(caps.supports_video_codec(VideoFourCc::Avc));
1540 assert!(caps.supports_video_codec(VideoFourCc::Av1));
1541 assert!(!caps.supports_video_codec(VideoFourCc::Hevc));
1542
1543 assert!(caps.supports_audio_codec(AudioFourCc::Opus));
1544 assert!(!caps.supports_audio_codec(AudioFourCc::Aac));
1545 }
1546
1547 #[test]
1548 fn test_connect_params_fourcc_list_to_capabilities() {
1549 let mut obj = HashMap::new();
1550 obj.insert("app".to_string(), AmfValue::String("live".into()));
1551
1552 obj.insert(
1554 "fourCcList".to_string(),
1555 AmfValue::Array(vec![
1556 AmfValue::String("avc1".into()),
1557 AmfValue::String("Opus".into()),
1558 ]),
1559 );
1560
1561 let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1562 let caps = params.to_enhanced_capabilities();
1563
1564 assert!(caps.enabled);
1565
1566 let avc_cap = caps.video_codec_capability(VideoFourCc::Avc).unwrap();
1568 assert!(avc_cap.can_decode());
1569 assert!(avc_cap.can_encode());
1570 assert!(avc_cap.can_forward());
1571
1572 let opus_cap = caps.audio_codec_capability(AudioFourCc::Opus).unwrap();
1573 assert!(opus_cap.can_decode());
1574 assert!(opus_cap.can_encode());
1575 assert!(opus_cap.can_forward());
1576 }
1577
1578 #[test]
1579 fn test_connect_response_builder_basic() {
1580 let response = ConnectResponseBuilder::new()
1581 .fms_ver("rtmp-rs/0.5.0")
1582 .capabilities(31)
1583 .build(1.0);
1584
1585 assert_eq!(response.name, "_result");
1586 assert_eq!(response.transaction_id, 1.0);
1587
1588 if let AmfValue::Object(props) = &response.command_object {
1590 assert_eq!(props.get("fmsVer").unwrap().as_str(), Some("rtmp-rs/0.5.0"));
1591 assert_eq!(props.get("capabilities").unwrap().as_number(), Some(31.0));
1592 assert!(!props.contains_key("capsEx"));
1594 assert!(!props.contains_key("videoFourCcInfoMap"));
1595 assert!(!props.contains_key("audioFourCcInfoMap"));
1596 } else {
1597 panic!("Expected Object in command_object");
1598 }
1599 }
1600
1601 #[test]
1602 fn test_connect_response_builder_with_ertmp() {
1603 let caps = EnhancedCapabilities::with_defaults();
1604
1605 let response = ConnectResponseBuilder::new()
1606 .fms_ver("rtmp-rs/0.5.0")
1607 .capabilities(31)
1608 .enhanced_capabilities(&caps)
1609 .build(1.0);
1610
1611 if let AmfValue::Object(props) = &response.command_object {
1613 assert!(props.contains_key("capsEx"));
1614 assert!(props.contains_key("videoFourCcInfoMap"));
1615 assert!(props.contains_key("audioFourCcInfoMap"));
1616
1617 if let AmfValue::Object(video_map) = props.get("videoFourCcInfoMap").unwrap() {
1619 assert!(video_map.contains_key("avc1"));
1620 assert!(video_map.contains_key("hvc1"));
1621 assert!(video_map.contains_key("av01"));
1622 } else {
1623 panic!("Expected Object for videoFourCcInfoMap");
1624 }
1625
1626 if let AmfValue::Object(audio_map) = props.get("audioFourCcInfoMap").unwrap() {
1628 assert!(audio_map.contains_key("mp4a"));
1629 assert!(audio_map.contains_key("Opus"));
1630 } else {
1631 panic!("Expected Object for audioFourCcInfoMap");
1632 }
1633 } else {
1634 panic!("Expected Object in command_object");
1635 }
1636 }
1637
1638 #[test]
1639 fn test_connect_response_builder_disabled_ertmp() {
1640 let caps = EnhancedCapabilities::new(); let response = ConnectResponseBuilder::new()
1643 .enhanced_capabilities(&caps)
1644 .build(1.0);
1645
1646 if let AmfValue::Object(props) = &response.command_object {
1648 assert!(!props.contains_key("capsEx"));
1649 assert!(!props.contains_key("videoFourCcInfoMap"));
1650 assert!(!props.contains_key("audioFourCcInfoMap"));
1651 } else {
1652 panic!("Expected Object in command_object");
1653 }
1654 }
1655}