Skip to main content

rtmp_rs/protocol/
message.rs

1//! RTMP message types and parsing
2//!
3//! RTMP messages are classified into:
4//! - Protocol Control Messages (types 1-6): Chunk/flow control
5//! - Command Messages (types 17, 20): AMF-encoded commands
6//! - Data Messages (types 15, 18): Metadata
7//! - Audio/Video Messages (types 8, 9): Media data
8//!
9//! Reference: RTMP Specification Section 5.4
10
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::collections::HashMap;
13
14use crate::amf::{Amf0Decoder, Amf0Encoder, AmfValue};
15use crate::error::{AmfError, ProtocolError, Result};
16use crate::media::fourcc::{AudioFourCc, VideoFourCc};
17use crate::protocol::chunk::RtmpChunk;
18use crate::protocol::constants::*;
19use crate::protocol::enhanced::{CapsEx, EnhancedCapabilities, FourCcCapability};
20
21/// Parsed RTMP message
22#[derive(Debug, Clone)]
23pub enum RtmpMessage {
24    /// Set Chunk Size (type 1)
25    SetChunkSize(u32),
26
27    /// Abort Message (type 2)
28    Abort { csid: u32 },
29
30    /// Acknowledgement (type 3)
31    Acknowledgement { sequence: u32 },
32
33    /// User Control Message (type 4)
34    UserControl(UserControlEvent),
35
36    /// Window Acknowledgement Size (type 5)
37    WindowAckSize(u32),
38
39    /// Set Peer Bandwidth (type 6)
40    SetPeerBandwidth { size: u32, limit_type: u8 },
41
42    /// Audio data (type 8)
43    Audio { timestamp: u32, data: Bytes },
44
45    /// Video data (type 9)
46    Video { timestamp: u32, data: Bytes },
47
48    /// AMF0 Command (type 20)
49    Command(Command),
50
51    /// AMF0 Data message (type 18) - metadata, etc.
52    Data(DataMessage),
53
54    /// AMF3 Command (type 17)
55    CommandAmf3(Command),
56
57    /// AMF3 Data message (type 15)
58    DataAmf3(DataMessage),
59
60    /// Aggregate message (type 22)
61    Aggregate { data: Bytes },
62
63    /// Unknown message type
64    Unknown { type_id: u8, data: Bytes },
65}
66
67/// User Control Event
68#[derive(Debug, Clone)]
69pub enum UserControlEvent {
70    StreamBegin(u32),
71    StreamEof(u32),
72    StreamDry(u32),
73    SetBufferLength { stream_id: u32, buffer_ms: u32 },
74    StreamIsRecorded(u32),
75    PingRequest(u32),
76    PingResponse(u32),
77    Unknown { event_type: u16, data: Bytes },
78}
79
80/// RTMP command (connect, publish, play, etc.)
81#[derive(Debug, Clone)]
82pub struct Command {
83    /// Command name
84    pub name: String,
85    /// Transaction ID
86    pub transaction_id: f64,
87    /// Command object (often null for responses)
88    pub command_object: AmfValue,
89    /// Additional arguments
90    pub arguments: Vec<AmfValue>,
91    /// Message stream ID (from chunk)
92    pub stream_id: u32,
93}
94
95/// Data message (@setDataFrame, onMetaData, etc.)
96#[derive(Debug, Clone)]
97pub struct DataMessage {
98    /// Handler name (e.g., "@setDataFrame", "onMetaData")
99    pub name: String,
100    /// Data values
101    pub values: Vec<AmfValue>,
102    /// Message stream ID
103    pub stream_id: u32,
104}
105
106/// Connect command parameters
107#[derive(Debug, Clone, Default)]
108pub struct ConnectParams {
109    /// Application name
110    pub app: String,
111    /// Flash version
112    pub flash_ver: Option<String>,
113    /// SWF URL
114    pub swf_url: Option<String>,
115    /// TC URL (full RTMP URL)
116    pub tc_url: Option<String>,
117    /// Is FPAD
118    pub fpad: bool,
119    /// Audio codecs
120    pub audio_codecs: u32,
121    /// Video codecs
122    pub video_codecs: u32,
123    /// Video function
124    pub video_function: u32,
125    /// Page URL
126    pub page_url: Option<String>,
127    /// Object encoding (AMF version)
128    pub object_encoding: f64,
129    /// Extra properties from connect object
130    pub extra: HashMap<String, AmfValue>,
131
132    // =========================================================================
133    // Enhanced RTMP (E-RTMP) fields
134    // =========================================================================
135    /// List of supported FOURCC codec strings (e.g., ["avc1", "hvc1", "av01"])
136    ///
137    /// This is an alternative to the info maps; if present, all listed codecs
138    /// are assumed to have full capability (decode + encode + forward).
139    pub fourcc_list: Option<Vec<String>>,
140
141    /// Video codec capabilities by FOURCC string.
142    ///
143    /// Maps FOURCC strings (e.g., "avc1", "hvc1") to capability bitmask:
144    /// - 0x01: Can decode
145    /// - 0x02: Can encode
146    /// - 0x04: Can forward/relay
147    pub video_fourcc_info_map: Option<HashMap<String, u32>>,
148
149    /// Audio codec capabilities by FOURCC string.
150    ///
151    /// Maps FOURCC strings (e.g., "mp4a", "Opus") to capability bitmask.
152    pub audio_fourcc_info_map: Option<HashMap<String, u32>>,
153
154    /// Extended capabilities bitmask (E-RTMP capsEx field).
155    ///
156    /// - 0x01: Reconnect support
157    /// - 0x02: Multitrack support
158    /// - 0x04: ModEx signal parsing
159    /// - 0x08: Nanosecond timestamp offset
160    pub caps_ex: Option<u32>,
161}
162
163impl ConnectParams {
164    /// Parse from AMF command object
165    pub fn from_amf(obj: &AmfValue) -> Self {
166        let mut params = ConnectParams::default();
167
168        if let Some(map) = obj.as_object() {
169            for (key, value) in map {
170                match key.as_str() {
171                    "app" => {
172                        if let Some(s) = value.as_str() {
173                            params.app = s.to_string();
174                        }
175                    }
176                    "flashVer" | "flashver" => {
177                        params.flash_ver = value.as_str().map(|s| s.to_string());
178                    }
179                    "swfUrl" | "swfurl" => {
180                        params.swf_url = value.as_str().map(|s| s.to_string());
181                    }
182                    "tcUrl" | "tcurl" => {
183                        params.tc_url = value.as_str().map(|s| s.to_string());
184                    }
185                    "fpad" => {
186                        params.fpad = value.as_bool().unwrap_or(false);
187                    }
188                    "audioCodecs" | "audiocodecs" => {
189                        params.audio_codecs = value.as_number().unwrap_or(0.0) as u32;
190                    }
191                    "videoCodecs" | "videocodecs" => {
192                        params.video_codecs = value.as_number().unwrap_or(0.0) as u32;
193                    }
194                    "videoFunction" | "videofunction" => {
195                        params.video_function = value.as_number().unwrap_or(0.0) as u32;
196                    }
197                    "pageUrl" | "pageurl" => {
198                        params.page_url = value.as_str().map(|s| s.to_string());
199                    }
200                    "objectEncoding" | "objectencoding" => {
201                        params.object_encoding = value.as_number().unwrap_or(0.0);
202                    }
203                    // E-RTMP fields
204                    "fourCcList" => {
205                        params.fourcc_list = Self::parse_fourcc_list(value);
206                    }
207                    "videoFourCcInfoMap" => {
208                        params.video_fourcc_info_map = Self::parse_fourcc_info_map(value);
209                    }
210                    "audioFourCcInfoMap" => {
211                        params.audio_fourcc_info_map = Self::parse_fourcc_info_map(value);
212                    }
213                    "capsEx" => {
214                        params.caps_ex = value.as_number().map(|n| n as u32);
215                    }
216                    _ => {
217                        params.extra.insert(key.clone(), value.clone());
218                    }
219                }
220            }
221        }
222
223        params
224    }
225
226    /// Parse a fourCcList array from AMF value.
227    fn parse_fourcc_list(value: &AmfValue) -> Option<Vec<String>> {
228        if let AmfValue::Array(arr) = value {
229            let list: Vec<String> = arr
230                .iter()
231                .filter_map(|v: &AmfValue| v.as_str().map(|s| s.to_string()))
232                .collect();
233            if list.is_empty() {
234                None
235            } else {
236                Some(list)
237            }
238        } else {
239            None
240        }
241    }
242
243    /// Parse a FOURCC info map (videoFourCcInfoMap/audioFourCcInfoMap) from AMF value.
244    fn parse_fourcc_info_map(value: &AmfValue) -> Option<HashMap<String, u32>> {
245        if let Some(map) = value.as_object() {
246            let info_map: HashMap<String, u32> = map
247                .iter()
248                .filter_map(|(k, v)| v.as_number().map(|n| (k.clone(), n as u32)))
249                .collect();
250            if info_map.is_empty() {
251                None
252            } else {
253                Some(info_map)
254            }
255        } else {
256            None
257        }
258    }
259
260    /// Check if this connect request includes E-RTMP capabilities.
261    ///
262    /// Returns true if any E-RTMP fields are present (fourCcList, info maps, or capsEx).
263    pub fn has_enhanced_rtmp(&self) -> bool {
264        self.fourcc_list.is_some()
265            || self.video_fourcc_info_map.is_some()
266            || self.audio_fourcc_info_map.is_some()
267            || self.caps_ex.is_some()
268    }
269
270    /// Get the CapsEx flags if present.
271    pub fn caps_ex_flags(&self) -> CapsEx {
272        CapsEx::from_bits(self.caps_ex.unwrap_or(0))
273    }
274
275    /// Convert E-RTMP fields to EnhancedCapabilities for negotiation.
276    ///
277    /// This extracts the client's declared capabilities from the connect params.
278    pub fn to_enhanced_capabilities(&self) -> EnhancedCapabilities {
279        if !self.has_enhanced_rtmp() {
280            return EnhancedCapabilities::new();
281        }
282
283        let mut caps = EnhancedCapabilities {
284            enabled: true,
285            caps_ex: self.caps_ex_flags(),
286            video_codecs: HashMap::new(),
287            audio_codecs: HashMap::new(),
288            ..Default::default()
289        };
290
291        // Parse video FOURCC info map
292        if let Some(map) = &self.video_fourcc_info_map {
293            for (fourcc_str, capability_bits) in map {
294                if let Some(fourcc) = VideoFourCc::from_fourcc_str(fourcc_str) {
295                    caps.video_codecs
296                        .insert(fourcc, FourCcCapability::from_bits(*capability_bits));
297                }
298            }
299        }
300
301        // Parse audio FOURCC info map
302        if let Some(map) = &self.audio_fourcc_info_map {
303            for (fourcc_str, capability_bits) in map {
304                if let Some(fourcc) = AudioFourCc::from_fourcc_str(fourcc_str) {
305                    caps.audio_codecs
306                        .insert(fourcc, FourCcCapability::from_bits(*capability_bits));
307                }
308            }
309        }
310
311        // If only fourCcList is provided (no info maps), treat all as full capability
312        if let Some(list) = &self.fourcc_list {
313            if self.video_fourcc_info_map.is_none() && self.audio_fourcc_info_map.is_none() {
314                for fourcc_str in list {
315                    // Try parsing as video codec
316                    if let Some(fourcc) = VideoFourCc::from_fourcc_str(fourcc_str) {
317                        caps.video_codecs
318                            .entry(fourcc)
319                            .or_insert(FourCcCapability::full());
320                    }
321                    // Try parsing as audio codec
322                    if let Some(fourcc) = AudioFourCc::from_fourcc_str(fourcc_str) {
323                        caps.audio_codecs
324                            .entry(fourcc)
325                            .or_insert(FourCcCapability::full());
326                    }
327                }
328            }
329        }
330
331        caps
332    }
333}
334
335/// Publish command parameters
336#[derive(Debug, Clone)]
337pub struct PublishParams {
338    /// Stream key (name)
339    pub stream_key: String,
340    /// Publish type: "live", "record", "append"
341    pub publish_type: String,
342    /// Message stream ID
343    pub stream_id: u32,
344}
345
346/// Play command parameters
347#[derive(Debug, Clone)]
348pub struct PlayParams {
349    /// Stream name
350    pub stream_name: String,
351    /// Start time (-2 = live, -1 = live or recorded, >= 0 = specific time)
352    pub start: f64,
353    /// Duration (-1 = until end)
354    pub duration: f64,
355    /// Reset flag
356    pub reset: bool,
357    /// Message stream ID
358    pub stream_id: u32,
359}
360
361impl RtmpMessage {
362    /// Parse a message from a chunk
363    pub fn from_chunk(chunk: &RtmpChunk) -> Result<Self> {
364        let mut payload = chunk.payload.clone();
365
366        match chunk.message_type {
367            MSG_SET_CHUNK_SIZE => {
368                if payload.len() < 4 {
369                    return Err(ProtocolError::InvalidChunkHeader.into());
370                }
371                let size = payload.get_u32() & 0x7FFFFFFF; // Ignore MSB
372                Ok(RtmpMessage::SetChunkSize(size))
373            }
374
375            MSG_ABORT => {
376                if payload.len() < 4 {
377                    return Err(ProtocolError::InvalidChunkHeader.into());
378                }
379                Ok(RtmpMessage::Abort {
380                    csid: payload.get_u32(),
381                })
382            }
383
384            MSG_ACKNOWLEDGEMENT => {
385                if payload.len() < 4 {
386                    return Err(ProtocolError::InvalidChunkHeader.into());
387                }
388                Ok(RtmpMessage::Acknowledgement {
389                    sequence: payload.get_u32(),
390                })
391            }
392
393            MSG_USER_CONTROL => Self::parse_user_control(&mut payload),
394
395            MSG_WINDOW_ACK_SIZE => {
396                if payload.len() < 4 {
397                    return Err(ProtocolError::InvalidChunkHeader.into());
398                }
399                Ok(RtmpMessage::WindowAckSize(payload.get_u32()))
400            }
401
402            MSG_SET_PEER_BANDWIDTH => {
403                if payload.len() < 5 {
404                    return Err(ProtocolError::InvalidChunkHeader.into());
405                }
406                let size = payload.get_u32();
407                let limit_type = payload.get_u8();
408                Ok(RtmpMessage::SetPeerBandwidth { size, limit_type })
409            }
410
411            MSG_AUDIO => Ok(RtmpMessage::Audio {
412                timestamp: chunk.timestamp,
413                data: payload,
414            }),
415
416            MSG_VIDEO => Ok(RtmpMessage::Video {
417                timestamp: chunk.timestamp,
418                data: payload,
419            }),
420
421            MSG_COMMAND_AMF0 => {
422                let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
423                Ok(RtmpMessage::Command(cmd))
424            }
425
426            MSG_COMMAND_AMF3 => {
427                // Skip AMF3 marker byte if present
428                if !payload.is_empty() && payload[0] == 0x00 {
429                    payload.advance(1);
430                }
431                let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
432                Ok(RtmpMessage::CommandAmf3(cmd))
433            }
434
435            MSG_DATA_AMF0 => {
436                let data = Self::parse_data(&mut payload, chunk.stream_id)?;
437                Ok(RtmpMessage::Data(data))
438            }
439
440            MSG_DATA_AMF3 => {
441                if !payload.is_empty() && payload[0] == 0x00 {
442                    payload.advance(1);
443                }
444                let data = Self::parse_data(&mut payload, chunk.stream_id)?;
445                Ok(RtmpMessage::DataAmf3(data))
446            }
447
448            MSG_AGGREGATE => Ok(RtmpMessage::Aggregate { data: payload }),
449
450            _ => Ok(RtmpMessage::Unknown {
451                type_id: chunk.message_type,
452                data: payload,
453            }),
454        }
455    }
456
457    /// Parse User Control message
458    fn parse_user_control(payload: &mut Bytes) -> Result<Self> {
459        if payload.len() < 6 {
460            return Err(ProtocolError::InvalidChunkHeader.into());
461        }
462
463        let event_type = payload.get_u16();
464        let event = match event_type {
465            UC_STREAM_BEGIN => UserControlEvent::StreamBegin(payload.get_u32()),
466            UC_STREAM_EOF => UserControlEvent::StreamEof(payload.get_u32()),
467            UC_STREAM_DRY => UserControlEvent::StreamDry(payload.get_u32()),
468            UC_SET_BUFFER_LENGTH => {
469                if payload.len() < 8 {
470                    return Err(ProtocolError::InvalidChunkHeader.into());
471                }
472                let stream_id = payload.get_u32();
473                let buffer_ms = payload.get_u32();
474                UserControlEvent::SetBufferLength {
475                    stream_id,
476                    buffer_ms,
477                }
478            }
479            UC_STREAM_IS_RECORDED => UserControlEvent::StreamIsRecorded(payload.get_u32()),
480            UC_PING_REQUEST => UserControlEvent::PingRequest(payload.get_u32()),
481            UC_PING_RESPONSE => UserControlEvent::PingResponse(payload.get_u32()),
482            _ => UserControlEvent::Unknown {
483                event_type,
484                data: payload.clone(),
485            },
486        };
487
488        Ok(RtmpMessage::UserControl(event))
489    }
490
491    /// Parse AMF0 command
492    fn parse_command(payload: &mut Bytes, stream_id: u32) -> Result<Command> {
493        let mut decoder = Amf0Decoder::new();
494
495        // Command name
496        let name = match decoder.decode(payload)? {
497            AmfValue::String(s) => s,
498            _ => return Err(ProtocolError::InvalidCommand("Expected command name".into()).into()),
499        };
500
501        // Transaction ID
502        let transaction_id = match decoder.decode(payload)? {
503            AmfValue::Number(n) => n,
504            _ => 0.0, // Lenient: default to 0
505        };
506
507        // Command object (can be null)
508        let command_object = if payload.has_remaining() {
509            decoder.decode(payload)?
510        } else {
511            AmfValue::Null
512        };
513
514        // Additional arguments
515        let mut arguments = Vec::new();
516        while payload.has_remaining() {
517            match decoder.decode(payload) {
518                Ok(v) => arguments.push(v),
519                Err(AmfError::UnexpectedEof) => break,
520                Err(e) => return Err(e.into()),
521            }
522        }
523
524        Ok(Command {
525            name,
526            transaction_id,
527            command_object,
528            arguments,
529            stream_id,
530        })
531    }
532
533    /// Parse AMF0 data message
534    fn parse_data(payload: &mut Bytes, stream_id: u32) -> Result<DataMessage> {
535        let mut decoder = Amf0Decoder::new();
536
537        // Handler name
538        let name = match decoder.decode(payload)? {
539            AmfValue::String(s) => s,
540            _ => String::new(), // Lenient
541        };
542
543        // Data values
544        let mut values = Vec::new();
545        while payload.has_remaining() {
546            match decoder.decode(payload) {
547                Ok(v) => values.push(v),
548                Err(AmfError::UnexpectedEof) => break,
549                Err(e) => return Err(e.into()),
550            }
551        }
552
553        Ok(DataMessage {
554            name,
555            values,
556            stream_id,
557        })
558    }
559
560    /// Encode message to chunk payload
561    pub fn encode(&self) -> (u8, Bytes) {
562        match self {
563            RtmpMessage::SetChunkSize(size) => {
564                let mut buf = BytesMut::with_capacity(4);
565                buf.put_u32(*size);
566                (MSG_SET_CHUNK_SIZE, buf.freeze())
567            }
568
569            RtmpMessage::Abort { csid } => {
570                let mut buf = BytesMut::with_capacity(4);
571                buf.put_u32(*csid);
572                (MSG_ABORT, buf.freeze())
573            }
574
575            RtmpMessage::Acknowledgement { sequence } => {
576                let mut buf = BytesMut::with_capacity(4);
577                buf.put_u32(*sequence);
578                (MSG_ACKNOWLEDGEMENT, buf.freeze())
579            }
580
581            RtmpMessage::WindowAckSize(size) => {
582                let mut buf = BytesMut::with_capacity(4);
583                buf.put_u32(*size);
584                (MSG_WINDOW_ACK_SIZE, buf.freeze())
585            }
586
587            RtmpMessage::SetPeerBandwidth { size, limit_type } => {
588                let mut buf = BytesMut::with_capacity(5);
589                buf.put_u32(*size);
590                buf.put_u8(*limit_type);
591                (MSG_SET_PEER_BANDWIDTH, buf.freeze())
592            }
593
594            RtmpMessage::UserControl(event) => {
595                let mut buf = BytesMut::with_capacity(10);
596                match event {
597                    UserControlEvent::StreamBegin(id) => {
598                        buf.put_u16(UC_STREAM_BEGIN);
599                        buf.put_u32(*id);
600                    }
601                    UserControlEvent::StreamEof(id) => {
602                        buf.put_u16(UC_STREAM_EOF);
603                        buf.put_u32(*id);
604                    }
605                    UserControlEvent::StreamDry(id) => {
606                        buf.put_u16(UC_STREAM_DRY);
607                        buf.put_u32(*id);
608                    }
609                    UserControlEvent::SetBufferLength {
610                        stream_id,
611                        buffer_ms,
612                    } => {
613                        buf.put_u16(UC_SET_BUFFER_LENGTH);
614                        buf.put_u32(*stream_id);
615                        buf.put_u32(*buffer_ms);
616                    }
617                    UserControlEvent::StreamIsRecorded(id) => {
618                        buf.put_u16(UC_STREAM_IS_RECORDED);
619                        buf.put_u32(*id);
620                    }
621                    UserControlEvent::PingRequest(ts) => {
622                        buf.put_u16(UC_PING_REQUEST);
623                        buf.put_u32(*ts);
624                    }
625                    UserControlEvent::PingResponse(ts) => {
626                        buf.put_u16(UC_PING_RESPONSE);
627                        buf.put_u32(*ts);
628                    }
629                    UserControlEvent::Unknown { event_type, data } => {
630                        buf.put_u16(*event_type);
631                        buf.put_slice(data);
632                    }
633                }
634                (MSG_USER_CONTROL, buf.freeze())
635            }
636
637            RtmpMessage::Audio { data, .. } => (MSG_AUDIO, data.clone()),
638
639            RtmpMessage::Video { data, .. } => (MSG_VIDEO, data.clone()),
640
641            RtmpMessage::Command(cmd) => {
642                let payload = encode_command(cmd);
643                (MSG_COMMAND_AMF0, payload)
644            }
645
646            RtmpMessage::CommandAmf3(cmd) => {
647                let mut buf = BytesMut::new();
648                buf.put_u8(0x00); // AMF3 marker
649                buf.put_slice(&encode_command(cmd));
650                (MSG_COMMAND_AMF3, buf.freeze())
651            }
652
653            RtmpMessage::Data(data) => {
654                let payload = encode_data(data);
655                (MSG_DATA_AMF0, payload)
656            }
657
658            RtmpMessage::DataAmf3(data) => {
659                let mut buf = BytesMut::new();
660                buf.put_u8(0x00);
661                buf.put_slice(&encode_data(data));
662                (MSG_DATA_AMF3, buf.freeze())
663            }
664
665            RtmpMessage::Aggregate { data } => (MSG_AGGREGATE, data.clone()),
666
667            RtmpMessage::Unknown { type_id, data } => (*type_id, data.clone()),
668        }
669    }
670}
671
672/// Encode a command to AMF0 bytes
673fn encode_command(cmd: &Command) -> Bytes {
674    let mut encoder = Amf0Encoder::new();
675    encoder.encode(&AmfValue::String(cmd.name.clone()));
676    encoder.encode(&AmfValue::Number(cmd.transaction_id));
677    encoder.encode(&cmd.command_object);
678    for arg in &cmd.arguments {
679        encoder.encode(arg);
680    }
681    encoder.finish()
682}
683
684/// Encode a data message to AMF0 bytes
685fn encode_data(data: &DataMessage) -> Bytes {
686    let mut encoder = Amf0Encoder::new();
687    encoder.encode(&AmfValue::String(data.name.clone()));
688    for value in &data.values {
689        encoder.encode(value);
690    }
691    encoder.finish()
692}
693
694/// Build common response messages
695impl Command {
696    /// Create a _result response
697    pub fn result(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
698        Command {
699            name: CMD_RESULT.to_string(),
700            transaction_id,
701            command_object: properties,
702            arguments: vec![info],
703            stream_id: 0,
704        }
705    }
706
707    /// Create an _error response
708    pub fn error(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
709        Command {
710            name: CMD_ERROR.to_string(),
711            transaction_id,
712            command_object: properties,
713            arguments: vec![info],
714            stream_id: 0,
715        }
716    }
717
718    /// Create an onStatus response
719    pub fn on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Self {
720        let mut info = HashMap::new();
721        info.insert("level".to_string(), AmfValue::String(level.to_string()));
722        info.insert("code".to_string(), AmfValue::String(code.to_string()));
723        info.insert(
724            "description".to_string(),
725            AmfValue::String(description.to_string()),
726        );
727
728        Command {
729            name: CMD_ON_STATUS.to_string(),
730            transaction_id: 0.0,
731            command_object: AmfValue::Null,
732            arguments: vec![AmfValue::Object(info)],
733            stream_id,
734        }
735    }
736}
737
738/// Builder for connect response with E-RTMP capability negotiation.
739///
740/// Use this to construct a proper `_result` response to a connect command,
741/// optionally including E-RTMP capability fields.
742///
743/// # Example
744///
745/// ```ignore
746/// use rtmp_rs::protocol::message::ConnectResponseBuilder;
747/// use rtmp_rs::protocol::enhanced::EnhancedCapabilities;
748///
749/// let caps = EnhancedCapabilities::with_defaults();
750/// let response = ConnectResponseBuilder::new()
751///     .fms_ver("rtmp-rs/0.5.0")
752///     .capabilities(31)
753///     .enhanced_capabilities(&caps)
754///     .build(1.0);
755/// ```
756#[derive(Debug, Clone, Default)]
757pub struct ConnectResponseBuilder {
758    fms_ver: String,
759    capabilities: u32,
760    enhanced_caps: Option<EnhancedCapabilities>,
761}
762
763impl ConnectResponseBuilder {
764    /// Create a new builder with default values.
765    pub fn new() -> Self {
766        Self {
767            fms_ver: "FMS/3,5,7,7009".to_string(),
768            capabilities: 31,
769            enhanced_caps: None,
770        }
771    }
772
773    /// Set the FMS version string.
774    pub fn fms_ver(mut self, ver: impl Into<String>) -> Self {
775        self.fms_ver = ver.into();
776        self
777    }
778
779    /// Set the capabilities bitmask.
780    pub fn capabilities(mut self, caps: u32) -> Self {
781        self.capabilities = caps;
782        self
783    }
784
785    /// Set E-RTMP enhanced capabilities.
786    ///
787    /// If the capabilities are enabled, the response will include
788    /// E-RTMP fields (capsEx, videoFourCcInfoMap, audioFourCcInfoMap).
789    pub fn enhanced_capabilities(mut self, caps: &EnhancedCapabilities) -> Self {
790        if caps.enabled {
791            self.enhanced_caps = Some(caps.clone());
792        }
793        self
794    }
795
796    /// Build the connect response command.
797    pub fn build(self, transaction_id: f64) -> Command {
798        let properties = self.build_properties();
799
800        let mut info = HashMap::new();
801        info.insert("level".to_string(), AmfValue::String("status".to_string()));
802        info.insert(
803            "code".to_string(),
804            AmfValue::String(NC_CONNECT_SUCCESS.to_string()),
805        );
806        info.insert(
807            "description".to_string(),
808            AmfValue::String("Connection succeeded.".to_string()),
809        );
810        info.insert(
811            "objectEncoding".to_string(),
812            AmfValue::Number(0.0), // AMF0
813        );
814
815        Command::result(transaction_id, properties, AmfValue::Object(info))
816    }
817
818    /// Build the properties object for the connect response.
819    fn build_properties(&self) -> AmfValue {
820        let mut props = HashMap::new();
821        props.insert("fmsVer".to_string(), AmfValue::String(self.fms_ver.clone()));
822        props.insert(
823            "capabilities".to_string(),
824            AmfValue::Number(self.capabilities as f64),
825        );
826
827        // Add E-RTMP fields if enabled
828        if let Some(caps) = &self.enhanced_caps {
829            // capsEx
830            props.insert(
831                "capsEx".to_string(),
832                AmfValue::Number(caps.caps_ex.bits() as f64),
833            );
834
835            // videoFourCcInfoMap
836            if !caps.video_codecs.is_empty() {
837                let video_map: HashMap<String, AmfValue> = caps
838                    .video_codecs
839                    .iter()
840                    .map(|(fourcc, cap)| {
841                        (
842                            fourcc.as_fourcc_str().to_string(),
843                            AmfValue::Number(cap.bits() as f64),
844                        )
845                    })
846                    .collect();
847                props.insert(
848                    "videoFourCcInfoMap".to_string(),
849                    AmfValue::Object(video_map),
850                );
851            }
852
853            // audioFourCcInfoMap
854            if !caps.audio_codecs.is_empty() {
855                let audio_map: HashMap<String, AmfValue> = caps
856                    .audio_codecs
857                    .iter()
858                    .map(|(fourcc, cap)| {
859                        (
860                            fourcc.as_fourcc_str().to_string(),
861                            AmfValue::Number(cap.bits() as f64),
862                        )
863                    })
864                    .collect();
865                props.insert(
866                    "audioFourCcInfoMap".to_string(),
867                    AmfValue::Object(audio_map),
868                );
869            }
870        }
871
872        AmfValue::Object(props)
873    }
874}
875
876#[cfg(test)]
877mod tests {
878    use super::*;
879
880    #[test]
881    fn test_connect_params_parsing() {
882        let mut obj = HashMap::new();
883        obj.insert("app".to_string(), AmfValue::String("live".into()));
884        obj.insert(
885            "tcUrl".to_string(),
886            AmfValue::String("rtmp://localhost/live".into()),
887        );
888        obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
889
890        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
891        assert_eq!(params.app, "live");
892        assert_eq!(params.tc_url, Some("rtmp://localhost/live".into()));
893        assert_eq!(params.object_encoding, 0.0);
894    }
895
896    #[test]
897    fn test_command_roundtrip() {
898        let cmd = Command {
899            name: "connect".to_string(),
900            transaction_id: 1.0,
901            command_object: AmfValue::Null,
902            arguments: vec![AmfValue::String("test".into())],
903            stream_id: 0,
904        };
905
906        let payload = encode_command(&cmd);
907        let chunk = RtmpChunk {
908            csid: CSID_COMMAND,
909            timestamp: 0,
910            message_type: MSG_COMMAND_AMF0,
911            stream_id: 0,
912            payload,
913        };
914
915        let parsed = RtmpMessage::from_chunk(&chunk).unwrap();
916        if let RtmpMessage::Command(parsed_cmd) = parsed {
917            assert_eq!(parsed_cmd.name, "connect");
918            assert_eq!(parsed_cmd.transaction_id, 1.0);
919        } else {
920            panic!("Expected Command message");
921        }
922    }
923
924    #[test]
925    fn test_set_chunk_size_message() {
926        let chunk = RtmpChunk {
927            csid: CSID_PROTOCOL_CONTROL,
928            timestamp: 0,
929            message_type: MSG_SET_CHUNK_SIZE,
930            stream_id: 0,
931            payload: Bytes::from_static(&[0x00, 0x00, 0x10, 0x00]), // 4096
932        };
933
934        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
935        assert!(matches!(msg, RtmpMessage::SetChunkSize(4096)));
936
937        // Test encoding
938        let (msg_type, payload) = msg.encode();
939        assert_eq!(msg_type, MSG_SET_CHUNK_SIZE);
940        assert_eq!(&payload[..], &[0x00, 0x00, 0x10, 0x00]);
941    }
942
943    #[test]
944    fn test_abort_message() {
945        let chunk = RtmpChunk {
946            csid: CSID_PROTOCOL_CONTROL,
947            timestamp: 0,
948            message_type: MSG_ABORT,
949            stream_id: 0,
950            payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x05]), // CSID 5
951        };
952
953        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
954        if let RtmpMessage::Abort { csid } = msg {
955            assert_eq!(csid, 5);
956        } else {
957            panic!("Expected Abort message");
958        }
959    }
960
961    #[test]
962    fn test_acknowledgement_message() {
963        let chunk = RtmpChunk {
964            csid: CSID_PROTOCOL_CONTROL,
965            timestamp: 0,
966            message_type: MSG_ACKNOWLEDGEMENT,
967            stream_id: 0,
968            payload: Bytes::from_static(&[0x00, 0x10, 0x00, 0x00]), // 1048576
969        };
970
971        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
972        if let RtmpMessage::Acknowledgement { sequence } = msg {
973            assert_eq!(sequence, 1048576);
974        } else {
975            panic!("Expected Acknowledgement message");
976        }
977    }
978
979    #[test]
980    fn test_window_ack_size_message() {
981        let chunk = RtmpChunk {
982            csid: CSID_PROTOCOL_CONTROL,
983            timestamp: 0,
984            message_type: MSG_WINDOW_ACK_SIZE,
985            stream_id: 0,
986            payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0]), // 2500000
987        };
988
989        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
990        if let RtmpMessage::WindowAckSize(size) = msg {
991            assert_eq!(size, 2500000);
992        } else {
993            panic!("Expected WindowAckSize message");
994        }
995    }
996
997    #[test]
998    fn test_set_peer_bandwidth_message() {
999        let chunk = RtmpChunk {
1000            csid: CSID_PROTOCOL_CONTROL,
1001            timestamp: 0,
1002            message_type: MSG_SET_PEER_BANDWIDTH,
1003            stream_id: 0,
1004            payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0, 0x02]), // 2500000, dynamic
1005        };
1006
1007        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1008        if let RtmpMessage::SetPeerBandwidth { size, limit_type } = msg {
1009            assert_eq!(size, 2500000);
1010            assert_eq!(limit_type, BANDWIDTH_LIMIT_DYNAMIC);
1011        } else {
1012            panic!("Expected SetPeerBandwidth message");
1013        }
1014    }
1015
1016    #[test]
1017    fn test_user_control_stream_begin() {
1018        let chunk = RtmpChunk {
1019            csid: CSID_PROTOCOL_CONTROL,
1020            timestamp: 0,
1021            message_type: MSG_USER_CONTROL,
1022            stream_id: 0,
1023            payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x01]), // StreamBegin, stream 1
1024        };
1025
1026        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1027        if let RtmpMessage::UserControl(UserControlEvent::StreamBegin(id)) = msg {
1028            assert_eq!(id, 1);
1029        } else {
1030            panic!("Expected StreamBegin user control");
1031        }
1032    }
1033
1034    #[test]
1035    fn test_user_control_stream_eof() {
1036        let chunk = RtmpChunk {
1037            csid: CSID_PROTOCOL_CONTROL,
1038            timestamp: 0,
1039            message_type: MSG_USER_CONTROL,
1040            stream_id: 0,
1041            payload: Bytes::from_static(&[0x00, 0x01, 0x00, 0x00, 0x00, 0x02]), // StreamEof, stream 2
1042        };
1043
1044        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1045        if let RtmpMessage::UserControl(UserControlEvent::StreamEof(id)) = msg {
1046            assert_eq!(id, 2);
1047        } else {
1048            panic!("Expected StreamEof user control");
1049        }
1050    }
1051
1052    #[test]
1053    fn test_user_control_set_buffer_length() {
1054        let chunk = RtmpChunk {
1055            csid: CSID_PROTOCOL_CONTROL,
1056            timestamp: 0,
1057            message_type: MSG_USER_CONTROL,
1058            stream_id: 0,
1059            payload: Bytes::from_static(&[
1060                0x00, 0x03, // SetBufferLength
1061                0x00, 0x00, 0x00, 0x01, // stream_id 1
1062                0x00, 0x00, 0x03, 0xE8, // 1000ms
1063            ]),
1064        };
1065
1066        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1067        if let RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
1068            stream_id,
1069            buffer_ms,
1070        }) = msg
1071        {
1072            assert_eq!(stream_id, 1);
1073            assert_eq!(buffer_ms, 1000);
1074        } else {
1075            panic!("Expected SetBufferLength user control");
1076        }
1077    }
1078
1079    #[test]
1080    fn test_user_control_ping_request() {
1081        let chunk = RtmpChunk {
1082            csid: CSID_PROTOCOL_CONTROL,
1083            timestamp: 0,
1084            message_type: MSG_USER_CONTROL,
1085            stream_id: 0,
1086            payload: Bytes::from_static(&[0x00, 0x06, 0x00, 0x01, 0x00, 0x00]), // PingRequest
1087        };
1088
1089        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1090        if let RtmpMessage::UserControl(UserControlEvent::PingRequest(ts)) = msg {
1091            assert_eq!(ts, 0x00010000);
1092        } else {
1093            panic!("Expected PingRequest user control");
1094        }
1095    }
1096
1097    #[test]
1098    fn test_user_control_ping_response() {
1099        let chunk = RtmpChunk {
1100            csid: CSID_PROTOCOL_CONTROL,
1101            timestamp: 0,
1102            message_type: MSG_USER_CONTROL,
1103            stream_id: 0,
1104            payload: Bytes::from_static(&[0x00, 0x07, 0x00, 0x00, 0x00, 0x64]), // PingResponse
1105        };
1106
1107        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1108        if let RtmpMessage::UserControl(UserControlEvent::PingResponse(ts)) = msg {
1109            assert_eq!(ts, 100);
1110        } else {
1111            panic!("Expected PingResponse user control");
1112        }
1113    }
1114
1115    #[test]
1116    fn test_audio_message() {
1117        let audio_data = Bytes::from_static(&[0xAF, 0x01, 0x21, 0x00, 0x00]);
1118
1119        let chunk = RtmpChunk {
1120            csid: CSID_AUDIO,
1121            timestamp: 1000,
1122            message_type: MSG_AUDIO,
1123            stream_id: 1,
1124            payload: audio_data.clone(),
1125        };
1126
1127        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1128        if let RtmpMessage::Audio { timestamp, data } = msg {
1129            assert_eq!(timestamp, 1000);
1130            assert_eq!(data, audio_data);
1131        } else {
1132            panic!("Expected Audio message");
1133        }
1134    }
1135
1136    #[test]
1137    fn test_video_message() {
1138        let video_data = Bytes::from_static(&[0x17, 0x01, 0x00, 0x00, 0x00, 0x00]);
1139
1140        let chunk = RtmpChunk {
1141            csid: CSID_VIDEO,
1142            timestamp: 2000,
1143            message_type: MSG_VIDEO,
1144            stream_id: 1,
1145            payload: video_data.clone(),
1146        };
1147
1148        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1149        if let RtmpMessage::Video { timestamp, data } = msg {
1150            assert_eq!(timestamp, 2000);
1151            assert_eq!(data, video_data);
1152        } else {
1153            panic!("Expected Video message");
1154        }
1155    }
1156
1157    #[test]
1158    fn test_data_message() {
1159        let mut encoder = Amf0Encoder::new();
1160        encoder.encode(&AmfValue::String("@setDataFrame".into()));
1161        encoder.encode(&AmfValue::String("onMetaData".into()));
1162        let mut metadata = HashMap::new();
1163        metadata.insert("width".to_string(), AmfValue::Number(1920.0));
1164        encoder.encode(&AmfValue::Object(metadata));
1165
1166        let chunk = RtmpChunk {
1167            csid: CSID_COMMAND,
1168            timestamp: 0,
1169            message_type: MSG_DATA_AMF0,
1170            stream_id: 1,
1171            payload: encoder.finish(),
1172        };
1173
1174        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1175        if let RtmpMessage::Data(data) = msg {
1176            assert_eq!(data.name, "@setDataFrame");
1177            assert_eq!(data.stream_id, 1);
1178            assert_eq!(data.values.len(), 2);
1179        } else {
1180            panic!("Expected Data message");
1181        }
1182    }
1183
1184    #[test]
1185    fn test_unknown_message_type() {
1186        let chunk = RtmpChunk {
1187            csid: CSID_COMMAND,
1188            timestamp: 0,
1189            message_type: 99, // Unknown type
1190            stream_id: 0,
1191            payload: Bytes::from_static(b"unknown"),
1192        };
1193
1194        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1195        if let RtmpMessage::Unknown { type_id, data } = msg {
1196            assert_eq!(type_id, 99);
1197            assert_eq!(data.as_ref(), b"unknown");
1198        } else {
1199            panic!("Expected Unknown message");
1200        }
1201    }
1202
1203    #[test]
1204    fn test_command_result() {
1205        let mut props = HashMap::new();
1206        props.insert(
1207            "fmsVer".to_string(),
1208            AmfValue::String("FMS/3,5,7,7009".into()),
1209        );
1210        props.insert("capabilities".to_string(), AmfValue::Number(31.0));
1211
1212        let result = Command::result(1.0, AmfValue::Object(props), AmfValue::Null);
1213
1214        assert_eq!(result.name, "_result");
1215        assert_eq!(result.transaction_id, 1.0);
1216    }
1217
1218    #[test]
1219    fn test_command_error() {
1220        let error = Command::error(1.0, AmfValue::Null, AmfValue::String("error".into()));
1221
1222        assert_eq!(error.name, "_error");
1223        assert_eq!(error.transaction_id, 1.0);
1224    }
1225
1226    #[test]
1227    fn test_command_on_status() {
1228        let status = Command::on_status(1, "status", NS_PUBLISH_START, "Publishing started");
1229
1230        assert_eq!(status.name, "onStatus");
1231        assert_eq!(status.transaction_id, 0.0);
1232        assert_eq!(status.stream_id, 1);
1233
1234        if let Some(info) = status.arguments.first() {
1235            if let AmfValue::Object(props) = info {
1236                assert_eq!(props.get("level").unwrap().as_str(), Some("status"));
1237                assert_eq!(props.get("code").unwrap().as_str(), Some(NS_PUBLISH_START));
1238            } else {
1239                panic!("Expected Object in arguments");
1240            }
1241        } else {
1242            panic!("Expected arguments");
1243        }
1244    }
1245
1246    #[test]
1247    fn test_connect_params_all_fields() {
1248        let mut obj = HashMap::new();
1249        obj.insert("app".to_string(), AmfValue::String("live".into()));
1250        obj.insert(
1251            "flashVer".to_string(),
1252            AmfValue::String("OBS-Studio/29.0".into()),
1253        );
1254        obj.insert(
1255            "swfUrl".to_string(),
1256            AmfValue::String("rtmp://example.com/app".into()),
1257        );
1258        obj.insert(
1259            "tcUrl".to_string(),
1260            AmfValue::String("rtmp://example.com/live".into()),
1261        );
1262        obj.insert("fpad".to_string(), AmfValue::Boolean(false));
1263        obj.insert("audioCodecs".to_string(), AmfValue::Number(3575.0));
1264        obj.insert("videoCodecs".to_string(), AmfValue::Number(252.0));
1265        obj.insert("videoFunction".to_string(), AmfValue::Number(1.0));
1266        obj.insert(
1267            "pageUrl".to_string(),
1268            AmfValue::String("http://twitch.tv".into()),
1269        );
1270        obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
1271        obj.insert("custom".to_string(), AmfValue::String("value".into()));
1272
1273        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1274
1275        assert_eq!(params.app, "live");
1276        assert_eq!(params.flash_ver, Some("OBS-Studio/29.0".into()));
1277        assert_eq!(params.swf_url, Some("rtmp://example.com/app".into()));
1278        assert_eq!(params.tc_url, Some("rtmp://example.com/live".into()));
1279        assert!(!params.fpad);
1280        assert_eq!(params.audio_codecs, 3575);
1281        assert_eq!(params.video_codecs, 252);
1282        assert_eq!(params.video_function, 1);
1283        assert_eq!(params.page_url, Some("http://twitch.tv".into()));
1284        assert_eq!(params.object_encoding, 0.0);
1285        assert!(params.extra.contains_key("custom"));
1286    }
1287
1288    #[test]
1289    fn test_connect_params_case_insensitive() {
1290        // Test lowercase variants
1291        let mut obj = HashMap::new();
1292        obj.insert("flashver".to_string(), AmfValue::String("test".into()));
1293        obj.insert("tcurl".to_string(), AmfValue::String("url".into()));
1294        obj.insert("pageurl".to_string(), AmfValue::String("page".into()));
1295        obj.insert("swfurl".to_string(), AmfValue::String("swf".into()));
1296
1297        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1298
1299        assert_eq!(params.flash_ver, Some("test".into()));
1300        assert_eq!(params.tc_url, Some("url".into()));
1301        assert_eq!(params.page_url, Some("page".into()));
1302        assert_eq!(params.swf_url, Some("swf".into()));
1303    }
1304
1305    #[test]
1306    fn test_connect_params_from_non_object() {
1307        // Should handle non-object gracefully
1308        let params = ConnectParams::from_amf(&AmfValue::Null);
1309        assert_eq!(params.app, "");
1310        assert!(params.flash_ver.is_none());
1311    }
1312
1313    #[test]
1314    fn test_message_encode_roundtrip() {
1315        // Test various messages encode/decode roundtrip
1316
1317        // SetChunkSize
1318        let msg = RtmpMessage::SetChunkSize(4096);
1319        let (msg_type, payload) = msg.encode();
1320        let chunk = RtmpChunk {
1321            csid: CSID_PROTOCOL_CONTROL,
1322            timestamp: 0,
1323            message_type: msg_type,
1324            stream_id: 0,
1325            payload,
1326        };
1327        let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1328        assert!(matches!(decoded, RtmpMessage::SetChunkSize(4096)));
1329
1330        // WindowAckSize
1331        let msg = RtmpMessage::WindowAckSize(2500000);
1332        let (msg_type, payload) = msg.encode();
1333        let chunk = RtmpChunk {
1334            csid: CSID_PROTOCOL_CONTROL,
1335            timestamp: 0,
1336            message_type: msg_type,
1337            stream_id: 0,
1338            payload,
1339        };
1340        let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1341        assert!(matches!(decoded, RtmpMessage::WindowAckSize(2500000)));
1342    }
1343
1344    #[test]
1345    fn test_user_control_event_encode() {
1346        // Test encoding of user control events
1347        let events = vec![
1348            RtmpMessage::UserControl(UserControlEvent::StreamBegin(1)),
1349            RtmpMessage::UserControl(UserControlEvent::StreamEof(2)),
1350            RtmpMessage::UserControl(UserControlEvent::StreamDry(3)),
1351            RtmpMessage::UserControl(UserControlEvent::StreamIsRecorded(4)),
1352            RtmpMessage::UserControl(UserControlEvent::PingRequest(5)),
1353            RtmpMessage::UserControl(UserControlEvent::PingResponse(6)),
1354            RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
1355                stream_id: 1,
1356                buffer_ms: 1000,
1357            }),
1358        ];
1359
1360        for msg in events {
1361            let (msg_type, payload) = msg.encode();
1362            assert_eq!(msg_type, MSG_USER_CONTROL);
1363            assert!(!payload.is_empty());
1364        }
1365    }
1366
1367    #[test]
1368    fn test_aggregate_message() {
1369        let chunk = RtmpChunk {
1370            csid: CSID_VIDEO,
1371            timestamp: 0,
1372            message_type: MSG_AGGREGATE,
1373            stream_id: 1,
1374            payload: Bytes::from_static(b"aggregate data"),
1375        };
1376
1377        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1378        if let RtmpMessage::Aggregate { data } = msg {
1379            assert_eq!(data.as_ref(), b"aggregate data");
1380        } else {
1381            panic!("Expected Aggregate message");
1382        }
1383    }
1384
1385    #[test]
1386    fn test_truncated_protocol_control_messages() {
1387        // SetChunkSize with less than 4 bytes
1388        let chunk = RtmpChunk {
1389            csid: CSID_PROTOCOL_CONTROL,
1390            timestamp: 0,
1391            message_type: MSG_SET_CHUNK_SIZE,
1392            stream_id: 0,
1393            payload: Bytes::from_static(&[0x00, 0x00]), // Only 2 bytes
1394        };
1395
1396        let result = RtmpMessage::from_chunk(&chunk);
1397        assert!(result.is_err());
1398
1399        // WindowAckSize with less than 4 bytes
1400        let chunk = RtmpChunk {
1401            csid: CSID_PROTOCOL_CONTROL,
1402            timestamp: 0,
1403            message_type: MSG_WINDOW_ACK_SIZE,
1404            stream_id: 0,
1405            payload: Bytes::from_static(&[0x00]),
1406        };
1407
1408        let result = RtmpMessage::from_chunk(&chunk);
1409        assert!(result.is_err());
1410    }
1411
1412    // =========================================================================
1413    // E-RTMP Connect Tests
1414    // =========================================================================
1415
1416    #[test]
1417    fn test_connect_params_ertmp_fields() {
1418        let mut obj = HashMap::new();
1419        obj.insert("app".to_string(), AmfValue::String("live".into()));
1420
1421        // E-RTMP fields
1422        obj.insert("capsEx".to_string(), AmfValue::Number(3.0)); // RECONNECT | MULTITRACK
1423
1424        let mut video_map = HashMap::new();
1425        video_map.insert("avc1".to_string(), AmfValue::Number(7.0)); // Full capability
1426        video_map.insert("hvc1".to_string(), AmfValue::Number(4.0)); // Forward only
1427        obj.insert(
1428            "videoFourCcInfoMap".to_string(),
1429            AmfValue::Object(video_map),
1430        );
1431
1432        let mut audio_map = HashMap::new();
1433        audio_map.insert("mp4a".to_string(), AmfValue::Number(7.0));
1434        audio_map.insert("Opus".to_string(), AmfValue::Number(4.0));
1435        obj.insert(
1436            "audioFourCcInfoMap".to_string(),
1437            AmfValue::Object(audio_map),
1438        );
1439
1440        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1441
1442        assert!(params.has_enhanced_rtmp());
1443        assert_eq!(params.caps_ex, Some(3));
1444        assert!(params.video_fourcc_info_map.is_some());
1445        assert!(params.audio_fourcc_info_map.is_some());
1446
1447        let video_map = params.video_fourcc_info_map.unwrap();
1448        assert_eq!(video_map.get("avc1"), Some(&7));
1449        assert_eq!(video_map.get("hvc1"), Some(&4));
1450
1451        let audio_map = params.audio_fourcc_info_map.unwrap();
1452        assert_eq!(audio_map.get("mp4a"), Some(&7));
1453        assert_eq!(audio_map.get("Opus"), Some(&4));
1454    }
1455
1456    #[test]
1457    fn test_connect_params_fourcc_list() {
1458        let mut obj = HashMap::new();
1459        obj.insert("app".to_string(), AmfValue::String("live".into()));
1460
1461        // fourCcList as alternative to info maps
1462        obj.insert(
1463            "fourCcList".to_string(),
1464            AmfValue::Array(vec![
1465                AmfValue::String("avc1".into()),
1466                AmfValue::String("hvc1".into()),
1467                AmfValue::String("mp4a".into()),
1468            ]),
1469        );
1470
1471        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1472
1473        assert!(params.has_enhanced_rtmp());
1474        let list = params.fourcc_list.unwrap();
1475        assert_eq!(list.len(), 3);
1476        assert!(list.contains(&"avc1".to_string()));
1477        assert!(list.contains(&"hvc1".to_string()));
1478        assert!(list.contains(&"mp4a".to_string()));
1479    }
1480
1481    #[test]
1482    fn test_connect_params_no_ertmp() {
1483        let mut obj = HashMap::new();
1484        obj.insert("app".to_string(), AmfValue::String("live".into()));
1485
1486        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1487
1488        assert!(!params.has_enhanced_rtmp());
1489        assert!(params.fourcc_list.is_none());
1490        assert!(params.video_fourcc_info_map.is_none());
1491        assert!(params.audio_fourcc_info_map.is_none());
1492        assert!(params.caps_ex.is_none());
1493    }
1494
1495    #[test]
1496    fn test_connect_params_caps_ex_flags() {
1497        let mut obj = HashMap::new();
1498        obj.insert("app".to_string(), AmfValue::String("live".into()));
1499        obj.insert("capsEx".to_string(), AmfValue::Number(15.0)); // All flags
1500
1501        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1502        let caps = params.caps_ex_flags();
1503
1504        assert!(caps.supports_reconnect());
1505        assert!(caps.supports_multitrack());
1506        assert!(caps.supports_modex());
1507        assert!(caps.supports_timestamp_nano_offset());
1508    }
1509
1510    #[test]
1511    fn test_connect_params_to_enhanced_capabilities() {
1512        let mut obj = HashMap::new();
1513        obj.insert("app".to_string(), AmfValue::String("live".into()));
1514        obj.insert("capsEx".to_string(), AmfValue::Number(6.0)); // MULTITRACK | MODEX
1515
1516        let mut video_map = HashMap::new();
1517        video_map.insert("avc1".to_string(), AmfValue::Number(7.0));
1518        video_map.insert("av01".to_string(), AmfValue::Number(4.0));
1519        obj.insert(
1520            "videoFourCcInfoMap".to_string(),
1521            AmfValue::Object(video_map),
1522        );
1523
1524        let mut audio_map = HashMap::new();
1525        audio_map.insert("Opus".to_string(), AmfValue::Number(7.0));
1526        obj.insert(
1527            "audioFourCcInfoMap".to_string(),
1528            AmfValue::Object(audio_map),
1529        );
1530
1531        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1532        let caps = params.to_enhanced_capabilities();
1533
1534        assert!(caps.enabled);
1535        assert!(caps.supports_multitrack());
1536        assert!(caps.caps_ex.supports_modex());
1537        assert!(!caps.caps_ex.supports_reconnect());
1538
1539        assert!(caps.supports_video_codec(VideoFourCc::Avc));
1540        assert!(caps.supports_video_codec(VideoFourCc::Av1));
1541        assert!(!caps.supports_video_codec(VideoFourCc::Hevc));
1542
1543        assert!(caps.supports_audio_codec(AudioFourCc::Opus));
1544        assert!(!caps.supports_audio_codec(AudioFourCc::Aac));
1545    }
1546
1547    #[test]
1548    fn test_connect_params_fourcc_list_to_capabilities() {
1549        let mut obj = HashMap::new();
1550        obj.insert("app".to_string(), AmfValue::String("live".into()));
1551
1552        // Only fourCcList (no info maps) -> full capability assumed
1553        obj.insert(
1554            "fourCcList".to_string(),
1555            AmfValue::Array(vec![
1556                AmfValue::String("avc1".into()),
1557                AmfValue::String("Opus".into()),
1558            ]),
1559        );
1560
1561        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1562        let caps = params.to_enhanced_capabilities();
1563
1564        assert!(caps.enabled);
1565
1566        // Both should have full capability
1567        let avc_cap = caps.video_codec_capability(VideoFourCc::Avc).unwrap();
1568        assert!(avc_cap.can_decode());
1569        assert!(avc_cap.can_encode());
1570        assert!(avc_cap.can_forward());
1571
1572        let opus_cap = caps.audio_codec_capability(AudioFourCc::Opus).unwrap();
1573        assert!(opus_cap.can_decode());
1574        assert!(opus_cap.can_encode());
1575        assert!(opus_cap.can_forward());
1576    }
1577
1578    #[test]
1579    fn test_connect_response_builder_basic() {
1580        let response = ConnectResponseBuilder::new()
1581            .fms_ver("rtmp-rs/0.5.0")
1582            .capabilities(31)
1583            .build(1.0);
1584
1585        assert_eq!(response.name, "_result");
1586        assert_eq!(response.transaction_id, 1.0);
1587
1588        // Check properties
1589        if let AmfValue::Object(props) = &response.command_object {
1590            assert_eq!(props.get("fmsVer").unwrap().as_str(), Some("rtmp-rs/0.5.0"));
1591            assert_eq!(props.get("capabilities").unwrap().as_number(), Some(31.0));
1592            // No E-RTMP fields
1593            assert!(!props.contains_key("capsEx"));
1594            assert!(!props.contains_key("videoFourCcInfoMap"));
1595            assert!(!props.contains_key("audioFourCcInfoMap"));
1596        } else {
1597            panic!("Expected Object in command_object");
1598        }
1599    }
1600
1601    #[test]
1602    fn test_connect_response_builder_with_ertmp() {
1603        let caps = EnhancedCapabilities::with_defaults();
1604
1605        let response = ConnectResponseBuilder::new()
1606            .fms_ver("rtmp-rs/0.5.0")
1607            .capabilities(31)
1608            .enhanced_capabilities(&caps)
1609            .build(1.0);
1610
1611        // Check properties include E-RTMP fields
1612        if let AmfValue::Object(props) = &response.command_object {
1613            assert!(props.contains_key("capsEx"));
1614            assert!(props.contains_key("videoFourCcInfoMap"));
1615            assert!(props.contains_key("audioFourCcInfoMap"));
1616
1617            // Check video info map
1618            if let AmfValue::Object(video_map) = props.get("videoFourCcInfoMap").unwrap() {
1619                assert!(video_map.contains_key("avc1"));
1620                assert!(video_map.contains_key("hvc1"));
1621                assert!(video_map.contains_key("av01"));
1622            } else {
1623                panic!("Expected Object for videoFourCcInfoMap");
1624            }
1625
1626            // Check audio info map
1627            if let AmfValue::Object(audio_map) = props.get("audioFourCcInfoMap").unwrap() {
1628                assert!(audio_map.contains_key("mp4a"));
1629                assert!(audio_map.contains_key("Opus"));
1630            } else {
1631                panic!("Expected Object for audioFourCcInfoMap");
1632            }
1633        } else {
1634            panic!("Expected Object in command_object");
1635        }
1636    }
1637
1638    #[test]
1639    fn test_connect_response_builder_disabled_ertmp() {
1640        let caps = EnhancedCapabilities::new(); // disabled
1641
1642        let response = ConnectResponseBuilder::new()
1643            .enhanced_capabilities(&caps)
1644            .build(1.0);
1645
1646        // Should not include E-RTMP fields when disabled
1647        if let AmfValue::Object(props) = &response.command_object {
1648            assert!(!props.contains_key("capsEx"));
1649            assert!(!props.contains_key("videoFourCcInfoMap"));
1650            assert!(!props.contains_key("audioFourCcInfoMap"));
1651        } else {
1652            panic!("Expected Object in command_object");
1653        }
1654    }
1655}