rtmp_rs/protocol/
message.rs

1//! RTMP message types and parsing
2//!
3//! RTMP messages are classified into:
4//! - Protocol Control Messages (types 1-6): Chunk/flow control
5//! - Command Messages (types 17, 20): AMF-encoded commands
6//! - Data Messages (types 15, 18): Metadata
7//! - Audio/Video Messages (types 8, 9): Media data
8//!
9//! Reference: RTMP Specification Section 5.4
10
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::collections::HashMap;
13
14use crate::amf::{Amf0Decoder, Amf0Encoder, AmfValue};
15use crate::error::{AmfError, ProtocolError, Result};
16use crate::protocol::chunk::RtmpChunk;
17use crate::protocol::constants::*;
18
19/// Parsed RTMP message
20#[derive(Debug, Clone)]
21pub enum RtmpMessage {
22    /// Set Chunk Size (type 1)
23    SetChunkSize(u32),
24
25    /// Abort Message (type 2)
26    Abort { csid: u32 },
27
28    /// Acknowledgement (type 3)
29    Acknowledgement { sequence: u32 },
30
31    /// User Control Message (type 4)
32    UserControl(UserControlEvent),
33
34    /// Window Acknowledgement Size (type 5)
35    WindowAckSize(u32),
36
37    /// Set Peer Bandwidth (type 6)
38    SetPeerBandwidth { size: u32, limit_type: u8 },
39
40    /// Audio data (type 8)
41    Audio { timestamp: u32, data: Bytes },
42
43    /// Video data (type 9)
44    Video { timestamp: u32, data: Bytes },
45
46    /// AMF0 Command (type 20)
47    Command(Command),
48
49    /// AMF0 Data message (type 18) - metadata, etc.
50    Data(DataMessage),
51
52    /// AMF3 Command (type 17)
53    CommandAmf3(Command),
54
55    /// AMF3 Data message (type 15)
56    DataAmf3(DataMessage),
57
58    /// Aggregate message (type 22)
59    Aggregate { data: Bytes },
60
61    /// Unknown message type
62    Unknown { type_id: u8, data: Bytes },
63}
64
65/// User Control Event
66#[derive(Debug, Clone)]
67pub enum UserControlEvent {
68    StreamBegin(u32),
69    StreamEof(u32),
70    StreamDry(u32),
71    SetBufferLength { stream_id: u32, buffer_ms: u32 },
72    StreamIsRecorded(u32),
73    PingRequest(u32),
74    PingResponse(u32),
75    Unknown { event_type: u16, data: Bytes },
76}
77
78/// RTMP command (connect, publish, play, etc.)
79#[derive(Debug, Clone)]
80pub struct Command {
81    /// Command name
82    pub name: String,
83    /// Transaction ID
84    pub transaction_id: f64,
85    /// Command object (often null for responses)
86    pub command_object: AmfValue,
87    /// Additional arguments
88    pub arguments: Vec<AmfValue>,
89    /// Message stream ID (from chunk)
90    pub stream_id: u32,
91}
92
93/// Data message (@setDataFrame, onMetaData, etc.)
94#[derive(Debug, Clone)]
95pub struct DataMessage {
96    /// Handler name (e.g., "@setDataFrame", "onMetaData")
97    pub name: String,
98    /// Data values
99    pub values: Vec<AmfValue>,
100    /// Message stream ID
101    pub stream_id: u32,
102}
103
104/// Connect command parameters
105#[derive(Debug, Clone, Default)]
106pub struct ConnectParams {
107    /// Application name
108    pub app: String,
109    /// Flash version
110    pub flash_ver: Option<String>,
111    /// SWF URL
112    pub swf_url: Option<String>,
113    /// TC URL (full RTMP URL)
114    pub tc_url: Option<String>,
115    /// Is FPAD
116    pub fpad: bool,
117    /// Audio codecs
118    pub audio_codecs: u32,
119    /// Video codecs
120    pub video_codecs: u32,
121    /// Video function
122    pub video_function: u32,
123    /// Page URL
124    pub page_url: Option<String>,
125    /// Object encoding (AMF version)
126    pub object_encoding: f64,
127    /// Extra properties from connect object
128    pub extra: HashMap<String, AmfValue>,
129}
130
131impl ConnectParams {
132    /// Parse from AMF command object
133    pub fn from_amf(obj: &AmfValue) -> Self {
134        let mut params = ConnectParams::default();
135
136        if let Some(map) = obj.as_object() {
137            for (key, value) in map {
138                match key.as_str() {
139                    "app" => {
140                        if let Some(s) = value.as_str() {
141                            params.app = s.to_string();
142                        }
143                    }
144                    "flashVer" | "flashver" => {
145                        params.flash_ver = value.as_str().map(|s| s.to_string());
146                    }
147                    "swfUrl" | "swfurl" => {
148                        params.swf_url = value.as_str().map(|s| s.to_string());
149                    }
150                    "tcUrl" | "tcurl" => {
151                        params.tc_url = value.as_str().map(|s| s.to_string());
152                    }
153                    "fpad" => {
154                        params.fpad = value.as_bool().unwrap_or(false);
155                    }
156                    "audioCodecs" | "audiocodecs" => {
157                        params.audio_codecs = value.as_number().unwrap_or(0.0) as u32;
158                    }
159                    "videoCodecs" | "videocodecs" => {
160                        params.video_codecs = value.as_number().unwrap_or(0.0) as u32;
161                    }
162                    "videoFunction" | "videofunction" => {
163                        params.video_function = value.as_number().unwrap_or(0.0) as u32;
164                    }
165                    "pageUrl" | "pageurl" => {
166                        params.page_url = value.as_str().map(|s| s.to_string());
167                    }
168                    "objectEncoding" | "objectencoding" => {
169                        params.object_encoding = value.as_number().unwrap_or(0.0);
170                    }
171                    _ => {
172                        params.extra.insert(key.clone(), value.clone());
173                    }
174                }
175            }
176        }
177
178        params
179    }
180}
181
182/// Publish command parameters
183#[derive(Debug, Clone)]
184pub struct PublishParams {
185    /// Stream key (name)
186    pub stream_key: String,
187    /// Publish type: "live", "record", "append"
188    pub publish_type: String,
189    /// Message stream ID
190    pub stream_id: u32,
191}
192
193/// Play command parameters
194#[derive(Debug, Clone)]
195pub struct PlayParams {
196    /// Stream name
197    pub stream_name: String,
198    /// Start time (-2 = live, -1 = live or recorded, >= 0 = specific time)
199    pub start: f64,
200    /// Duration (-1 = until end)
201    pub duration: f64,
202    /// Reset flag
203    pub reset: bool,
204    /// Message stream ID
205    pub stream_id: u32,
206}
207
208impl RtmpMessage {
209    /// Parse a message from a chunk
210    pub fn from_chunk(chunk: &RtmpChunk) -> Result<Self> {
211        let mut payload = chunk.payload.clone();
212
213        match chunk.message_type {
214            MSG_SET_CHUNK_SIZE => {
215                if payload.len() < 4 {
216                    return Err(ProtocolError::InvalidChunkHeader.into());
217                }
218                let size = payload.get_u32() & 0x7FFFFFFF; // Ignore MSB
219                Ok(RtmpMessage::SetChunkSize(size))
220            }
221
222            MSG_ABORT => {
223                if payload.len() < 4 {
224                    return Err(ProtocolError::InvalidChunkHeader.into());
225                }
226                Ok(RtmpMessage::Abort {
227                    csid: payload.get_u32(),
228                })
229            }
230
231            MSG_ACKNOWLEDGEMENT => {
232                if payload.len() < 4 {
233                    return Err(ProtocolError::InvalidChunkHeader.into());
234                }
235                Ok(RtmpMessage::Acknowledgement {
236                    sequence: payload.get_u32(),
237                })
238            }
239
240            MSG_USER_CONTROL => Self::parse_user_control(&mut payload),
241
242            MSG_WINDOW_ACK_SIZE => {
243                if payload.len() < 4 {
244                    return Err(ProtocolError::InvalidChunkHeader.into());
245                }
246                Ok(RtmpMessage::WindowAckSize(payload.get_u32()))
247            }
248
249            MSG_SET_PEER_BANDWIDTH => {
250                if payload.len() < 5 {
251                    return Err(ProtocolError::InvalidChunkHeader.into());
252                }
253                let size = payload.get_u32();
254                let limit_type = payload.get_u8();
255                Ok(RtmpMessage::SetPeerBandwidth { size, limit_type })
256            }
257
258            MSG_AUDIO => Ok(RtmpMessage::Audio {
259                timestamp: chunk.timestamp,
260                data: payload,
261            }),
262
263            MSG_VIDEO => Ok(RtmpMessage::Video {
264                timestamp: chunk.timestamp,
265                data: payload,
266            }),
267
268            MSG_COMMAND_AMF0 => {
269                let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
270                Ok(RtmpMessage::Command(cmd))
271            }
272
273            MSG_COMMAND_AMF3 => {
274                // Skip AMF3 marker byte if present
275                if !payload.is_empty() && payload[0] == 0x00 {
276                    payload.advance(1);
277                }
278                let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
279                Ok(RtmpMessage::CommandAmf3(cmd))
280            }
281
282            MSG_DATA_AMF0 => {
283                let data = Self::parse_data(&mut payload, chunk.stream_id)?;
284                Ok(RtmpMessage::Data(data))
285            }
286
287            MSG_DATA_AMF3 => {
288                if !payload.is_empty() && payload[0] == 0x00 {
289                    payload.advance(1);
290                }
291                let data = Self::parse_data(&mut payload, chunk.stream_id)?;
292                Ok(RtmpMessage::DataAmf3(data))
293            }
294
295            MSG_AGGREGATE => Ok(RtmpMessage::Aggregate { data: payload }),
296
297            _ => Ok(RtmpMessage::Unknown {
298                type_id: chunk.message_type,
299                data: payload,
300            }),
301        }
302    }
303
304    /// Parse User Control message
305    fn parse_user_control(payload: &mut Bytes) -> Result<Self> {
306        if payload.len() < 6 {
307            return Err(ProtocolError::InvalidChunkHeader.into());
308        }
309
310        let event_type = payload.get_u16();
311        let event = match event_type {
312            UC_STREAM_BEGIN => UserControlEvent::StreamBegin(payload.get_u32()),
313            UC_STREAM_EOF => UserControlEvent::StreamEof(payload.get_u32()),
314            UC_STREAM_DRY => UserControlEvent::StreamDry(payload.get_u32()),
315            UC_SET_BUFFER_LENGTH => {
316                if payload.len() < 8 {
317                    return Err(ProtocolError::InvalidChunkHeader.into());
318                }
319                let stream_id = payload.get_u32();
320                let buffer_ms = payload.get_u32();
321                UserControlEvent::SetBufferLength {
322                    stream_id,
323                    buffer_ms,
324                }
325            }
326            UC_STREAM_IS_RECORDED => UserControlEvent::StreamIsRecorded(payload.get_u32()),
327            UC_PING_REQUEST => UserControlEvent::PingRequest(payload.get_u32()),
328            UC_PING_RESPONSE => UserControlEvent::PingResponse(payload.get_u32()),
329            _ => UserControlEvent::Unknown {
330                event_type,
331                data: payload.clone(),
332            },
333        };
334
335        Ok(RtmpMessage::UserControl(event))
336    }
337
338    /// Parse AMF0 command
339    fn parse_command(payload: &mut Bytes, stream_id: u32) -> Result<Command> {
340        let mut decoder = Amf0Decoder::new();
341
342        // Command name
343        let name = match decoder.decode(payload)? {
344            AmfValue::String(s) => s,
345            _ => return Err(ProtocolError::InvalidCommand("Expected command name".into()).into()),
346        };
347
348        // Transaction ID
349        let transaction_id = match decoder.decode(payload)? {
350            AmfValue::Number(n) => n,
351            _ => 0.0, // Lenient: default to 0
352        };
353
354        // Command object (can be null)
355        let command_object = if payload.has_remaining() {
356            decoder.decode(payload)?
357        } else {
358            AmfValue::Null
359        };
360
361        // Additional arguments
362        let mut arguments = Vec::new();
363        while payload.has_remaining() {
364            match decoder.decode(payload) {
365                Ok(v) => arguments.push(v),
366                Err(AmfError::UnexpectedEof) => break,
367                Err(e) => return Err(e.into()),
368            }
369        }
370
371        Ok(Command {
372            name,
373            transaction_id,
374            command_object,
375            arguments,
376            stream_id,
377        })
378    }
379
380    /// Parse AMF0 data message
381    fn parse_data(payload: &mut Bytes, stream_id: u32) -> Result<DataMessage> {
382        let mut decoder = Amf0Decoder::new();
383
384        // Handler name
385        let name = match decoder.decode(payload)? {
386            AmfValue::String(s) => s,
387            _ => String::new(), // Lenient
388        };
389
390        // Data values
391        let mut values = Vec::new();
392        while payload.has_remaining() {
393            match decoder.decode(payload) {
394                Ok(v) => values.push(v),
395                Err(AmfError::UnexpectedEof) => break,
396                Err(e) => return Err(e.into()),
397            }
398        }
399
400        Ok(DataMessage {
401            name,
402            values,
403            stream_id,
404        })
405    }
406
407    /// Encode message to chunk payload
408    pub fn encode(&self) -> (u8, Bytes) {
409        match self {
410            RtmpMessage::SetChunkSize(size) => {
411                let mut buf = BytesMut::with_capacity(4);
412                buf.put_u32(*size);
413                (MSG_SET_CHUNK_SIZE, buf.freeze())
414            }
415
416            RtmpMessage::Abort { csid } => {
417                let mut buf = BytesMut::with_capacity(4);
418                buf.put_u32(*csid);
419                (MSG_ABORT, buf.freeze())
420            }
421
422            RtmpMessage::Acknowledgement { sequence } => {
423                let mut buf = BytesMut::with_capacity(4);
424                buf.put_u32(*sequence);
425                (MSG_ACKNOWLEDGEMENT, buf.freeze())
426            }
427
428            RtmpMessage::WindowAckSize(size) => {
429                let mut buf = BytesMut::with_capacity(4);
430                buf.put_u32(*size);
431                (MSG_WINDOW_ACK_SIZE, buf.freeze())
432            }
433
434            RtmpMessage::SetPeerBandwidth { size, limit_type } => {
435                let mut buf = BytesMut::with_capacity(5);
436                buf.put_u32(*size);
437                buf.put_u8(*limit_type);
438                (MSG_SET_PEER_BANDWIDTH, buf.freeze())
439            }
440
441            RtmpMessage::UserControl(event) => {
442                let mut buf = BytesMut::with_capacity(10);
443                match event {
444                    UserControlEvent::StreamBegin(id) => {
445                        buf.put_u16(UC_STREAM_BEGIN);
446                        buf.put_u32(*id);
447                    }
448                    UserControlEvent::StreamEof(id) => {
449                        buf.put_u16(UC_STREAM_EOF);
450                        buf.put_u32(*id);
451                    }
452                    UserControlEvent::StreamDry(id) => {
453                        buf.put_u16(UC_STREAM_DRY);
454                        buf.put_u32(*id);
455                    }
456                    UserControlEvent::SetBufferLength {
457                        stream_id,
458                        buffer_ms,
459                    } => {
460                        buf.put_u16(UC_SET_BUFFER_LENGTH);
461                        buf.put_u32(*stream_id);
462                        buf.put_u32(*buffer_ms);
463                    }
464                    UserControlEvent::StreamIsRecorded(id) => {
465                        buf.put_u16(UC_STREAM_IS_RECORDED);
466                        buf.put_u32(*id);
467                    }
468                    UserControlEvent::PingRequest(ts) => {
469                        buf.put_u16(UC_PING_REQUEST);
470                        buf.put_u32(*ts);
471                    }
472                    UserControlEvent::PingResponse(ts) => {
473                        buf.put_u16(UC_PING_RESPONSE);
474                        buf.put_u32(*ts);
475                    }
476                    UserControlEvent::Unknown { event_type, data } => {
477                        buf.put_u16(*event_type);
478                        buf.put_slice(data);
479                    }
480                }
481                (MSG_USER_CONTROL, buf.freeze())
482            }
483
484            RtmpMessage::Audio { data, .. } => (MSG_AUDIO, data.clone()),
485
486            RtmpMessage::Video { data, .. } => (MSG_VIDEO, data.clone()),
487
488            RtmpMessage::Command(cmd) => {
489                let payload = encode_command(cmd);
490                (MSG_COMMAND_AMF0, payload)
491            }
492
493            RtmpMessage::CommandAmf3(cmd) => {
494                let mut buf = BytesMut::new();
495                buf.put_u8(0x00); // AMF3 marker
496                buf.put_slice(&encode_command(cmd));
497                (MSG_COMMAND_AMF3, buf.freeze())
498            }
499
500            RtmpMessage::Data(data) => {
501                let payload = encode_data(data);
502                (MSG_DATA_AMF0, payload)
503            }
504
505            RtmpMessage::DataAmf3(data) => {
506                let mut buf = BytesMut::new();
507                buf.put_u8(0x00);
508                buf.put_slice(&encode_data(data));
509                (MSG_DATA_AMF3, buf.freeze())
510            }
511
512            RtmpMessage::Aggregate { data } => (MSG_AGGREGATE, data.clone()),
513
514            RtmpMessage::Unknown { type_id, data } => (*type_id, data.clone()),
515        }
516    }
517}
518
519/// Encode a command to AMF0 bytes
520fn encode_command(cmd: &Command) -> Bytes {
521    let mut encoder = Amf0Encoder::new();
522    encoder.encode(&AmfValue::String(cmd.name.clone()));
523    encoder.encode(&AmfValue::Number(cmd.transaction_id));
524    encoder.encode(&cmd.command_object);
525    for arg in &cmd.arguments {
526        encoder.encode(arg);
527    }
528    encoder.finish()
529}
530
531/// Encode a data message to AMF0 bytes
532fn encode_data(data: &DataMessage) -> Bytes {
533    let mut encoder = Amf0Encoder::new();
534    encoder.encode(&AmfValue::String(data.name.clone()));
535    for value in &data.values {
536        encoder.encode(value);
537    }
538    encoder.finish()
539}
540
541/// Build common response messages
542impl Command {
543    /// Create a _result response
544    pub fn result(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
545        Command {
546            name: CMD_RESULT.to_string(),
547            transaction_id,
548            command_object: properties,
549            arguments: vec![info],
550            stream_id: 0,
551        }
552    }
553
554    /// Create an _error response
555    pub fn error(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
556        Command {
557            name: CMD_ERROR.to_string(),
558            transaction_id,
559            command_object: properties,
560            arguments: vec![info],
561            stream_id: 0,
562        }
563    }
564
565    /// Create an onStatus response
566    pub fn on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Self {
567        let mut info = HashMap::new();
568        info.insert("level".to_string(), AmfValue::String(level.to_string()));
569        info.insert("code".to_string(), AmfValue::String(code.to_string()));
570        info.insert(
571            "description".to_string(),
572            AmfValue::String(description.to_string()),
573        );
574
575        Command {
576            name: CMD_ON_STATUS.to_string(),
577            transaction_id: 0.0,
578            command_object: AmfValue::Null,
579            arguments: vec![AmfValue::Object(info)],
580            stream_id,
581        }
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588
589    #[test]
590    fn test_connect_params_parsing() {
591        let mut obj = HashMap::new();
592        obj.insert("app".to_string(), AmfValue::String("live".into()));
593        obj.insert(
594            "tcUrl".to_string(),
595            AmfValue::String("rtmp://localhost/live".into()),
596        );
597        obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
598
599        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
600        assert_eq!(params.app, "live");
601        assert_eq!(params.tc_url, Some("rtmp://localhost/live".into()));
602        assert_eq!(params.object_encoding, 0.0);
603    }
604
605    #[test]
606    fn test_command_roundtrip() {
607        let cmd = Command {
608            name: "connect".to_string(),
609            transaction_id: 1.0,
610            command_object: AmfValue::Null,
611            arguments: vec![AmfValue::String("test".into())],
612            stream_id: 0,
613        };
614
615        let payload = encode_command(&cmd);
616        let chunk = RtmpChunk {
617            csid: CSID_COMMAND,
618            timestamp: 0,
619            message_type: MSG_COMMAND_AMF0,
620            stream_id: 0,
621            payload,
622        };
623
624        let parsed = RtmpMessage::from_chunk(&chunk).unwrap();
625        if let RtmpMessage::Command(parsed_cmd) = parsed {
626            assert_eq!(parsed_cmd.name, "connect");
627            assert_eq!(parsed_cmd.transaction_id, 1.0);
628        } else {
629            panic!("Expected Command message");
630        }
631    }
632
633    #[test]
634    fn test_set_chunk_size_message() {
635        let chunk = RtmpChunk {
636            csid: CSID_PROTOCOL_CONTROL,
637            timestamp: 0,
638            message_type: MSG_SET_CHUNK_SIZE,
639            stream_id: 0,
640            payload: Bytes::from_static(&[0x00, 0x00, 0x10, 0x00]), // 4096
641        };
642
643        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
644        assert!(matches!(msg, RtmpMessage::SetChunkSize(4096)));
645
646        // Test encoding
647        let (msg_type, payload) = msg.encode();
648        assert_eq!(msg_type, MSG_SET_CHUNK_SIZE);
649        assert_eq!(&payload[..], &[0x00, 0x00, 0x10, 0x00]);
650    }
651
652    #[test]
653    fn test_abort_message() {
654        let chunk = RtmpChunk {
655            csid: CSID_PROTOCOL_CONTROL,
656            timestamp: 0,
657            message_type: MSG_ABORT,
658            stream_id: 0,
659            payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x05]), // CSID 5
660        };
661
662        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
663        if let RtmpMessage::Abort { csid } = msg {
664            assert_eq!(csid, 5);
665        } else {
666            panic!("Expected Abort message");
667        }
668    }
669
670    #[test]
671    fn test_acknowledgement_message() {
672        let chunk = RtmpChunk {
673            csid: CSID_PROTOCOL_CONTROL,
674            timestamp: 0,
675            message_type: MSG_ACKNOWLEDGEMENT,
676            stream_id: 0,
677            payload: Bytes::from_static(&[0x00, 0x10, 0x00, 0x00]), // 1048576
678        };
679
680        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
681        if let RtmpMessage::Acknowledgement { sequence } = msg {
682            assert_eq!(sequence, 1048576);
683        } else {
684            panic!("Expected Acknowledgement message");
685        }
686    }
687
688    #[test]
689    fn test_window_ack_size_message() {
690        let chunk = RtmpChunk {
691            csid: CSID_PROTOCOL_CONTROL,
692            timestamp: 0,
693            message_type: MSG_WINDOW_ACK_SIZE,
694            stream_id: 0,
695            payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0]), // 2500000
696        };
697
698        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
699        if let RtmpMessage::WindowAckSize(size) = msg {
700            assert_eq!(size, 2500000);
701        } else {
702            panic!("Expected WindowAckSize message");
703        }
704    }
705
706    #[test]
707    fn test_set_peer_bandwidth_message() {
708        let chunk = RtmpChunk {
709            csid: CSID_PROTOCOL_CONTROL,
710            timestamp: 0,
711            message_type: MSG_SET_PEER_BANDWIDTH,
712            stream_id: 0,
713            payload: Bytes::from_static(&[0x00, 0x26, 0x25, 0xA0, 0x02]), // 2500000, dynamic
714        };
715
716        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
717        if let RtmpMessage::SetPeerBandwidth { size, limit_type } = msg {
718            assert_eq!(size, 2500000);
719            assert_eq!(limit_type, BANDWIDTH_LIMIT_DYNAMIC);
720        } else {
721            panic!("Expected SetPeerBandwidth message");
722        }
723    }
724
725    #[test]
726    fn test_user_control_stream_begin() {
727        let chunk = RtmpChunk {
728            csid: CSID_PROTOCOL_CONTROL,
729            timestamp: 0,
730            message_type: MSG_USER_CONTROL,
731            stream_id: 0,
732            payload: Bytes::from_static(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x01]), // StreamBegin, stream 1
733        };
734
735        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
736        if let RtmpMessage::UserControl(UserControlEvent::StreamBegin(id)) = msg {
737            assert_eq!(id, 1);
738        } else {
739            panic!("Expected StreamBegin user control");
740        }
741    }
742
743    #[test]
744    fn test_user_control_stream_eof() {
745        let chunk = RtmpChunk {
746            csid: CSID_PROTOCOL_CONTROL,
747            timestamp: 0,
748            message_type: MSG_USER_CONTROL,
749            stream_id: 0,
750            payload: Bytes::from_static(&[0x00, 0x01, 0x00, 0x00, 0x00, 0x02]), // StreamEof, stream 2
751        };
752
753        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
754        if let RtmpMessage::UserControl(UserControlEvent::StreamEof(id)) = msg {
755            assert_eq!(id, 2);
756        } else {
757            panic!("Expected StreamEof user control");
758        }
759    }
760
761    #[test]
762    fn test_user_control_set_buffer_length() {
763        let chunk = RtmpChunk {
764            csid: CSID_PROTOCOL_CONTROL,
765            timestamp: 0,
766            message_type: MSG_USER_CONTROL,
767            stream_id: 0,
768            payload: Bytes::from_static(&[
769                0x00, 0x03, // SetBufferLength
770                0x00, 0x00, 0x00, 0x01, // stream_id 1
771                0x00, 0x00, 0x03, 0xE8, // 1000ms
772            ]),
773        };
774
775        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
776        if let RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
777            stream_id,
778            buffer_ms,
779        }) = msg
780        {
781            assert_eq!(stream_id, 1);
782            assert_eq!(buffer_ms, 1000);
783        } else {
784            panic!("Expected SetBufferLength user control");
785        }
786    }
787
788    #[test]
789    fn test_user_control_ping_request() {
790        let chunk = RtmpChunk {
791            csid: CSID_PROTOCOL_CONTROL,
792            timestamp: 0,
793            message_type: MSG_USER_CONTROL,
794            stream_id: 0,
795            payload: Bytes::from_static(&[0x00, 0x06, 0x00, 0x01, 0x00, 0x00]), // PingRequest
796        };
797
798        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
799        if let RtmpMessage::UserControl(UserControlEvent::PingRequest(ts)) = msg {
800            assert_eq!(ts, 0x00010000);
801        } else {
802            panic!("Expected PingRequest user control");
803        }
804    }
805
806    #[test]
807    fn test_user_control_ping_response() {
808        let chunk = RtmpChunk {
809            csid: CSID_PROTOCOL_CONTROL,
810            timestamp: 0,
811            message_type: MSG_USER_CONTROL,
812            stream_id: 0,
813            payload: Bytes::from_static(&[0x00, 0x07, 0x00, 0x00, 0x00, 0x64]), // PingResponse
814        };
815
816        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
817        if let RtmpMessage::UserControl(UserControlEvent::PingResponse(ts)) = msg {
818            assert_eq!(ts, 100);
819        } else {
820            panic!("Expected PingResponse user control");
821        }
822    }
823
824    #[test]
825    fn test_audio_message() {
826        let audio_data = Bytes::from_static(&[0xAF, 0x01, 0x21, 0x00, 0x00]);
827
828        let chunk = RtmpChunk {
829            csid: CSID_AUDIO,
830            timestamp: 1000,
831            message_type: MSG_AUDIO,
832            stream_id: 1,
833            payload: audio_data.clone(),
834        };
835
836        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
837        if let RtmpMessage::Audio { timestamp, data } = msg {
838            assert_eq!(timestamp, 1000);
839            assert_eq!(data, audio_data);
840        } else {
841            panic!("Expected Audio message");
842        }
843    }
844
845    #[test]
846    fn test_video_message() {
847        let video_data = Bytes::from_static(&[0x17, 0x01, 0x00, 0x00, 0x00, 0x00]);
848
849        let chunk = RtmpChunk {
850            csid: CSID_VIDEO,
851            timestamp: 2000,
852            message_type: MSG_VIDEO,
853            stream_id: 1,
854            payload: video_data.clone(),
855        };
856
857        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
858        if let RtmpMessage::Video { timestamp, data } = msg {
859            assert_eq!(timestamp, 2000);
860            assert_eq!(data, video_data);
861        } else {
862            panic!("Expected Video message");
863        }
864    }
865
866    #[test]
867    fn test_data_message() {
868        let mut encoder = Amf0Encoder::new();
869        encoder.encode(&AmfValue::String("@setDataFrame".into()));
870        encoder.encode(&AmfValue::String("onMetaData".into()));
871        let mut metadata = HashMap::new();
872        metadata.insert("width".to_string(), AmfValue::Number(1920.0));
873        encoder.encode(&AmfValue::Object(metadata));
874
875        let chunk = RtmpChunk {
876            csid: CSID_COMMAND,
877            timestamp: 0,
878            message_type: MSG_DATA_AMF0,
879            stream_id: 1,
880            payload: encoder.finish(),
881        };
882
883        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
884        if let RtmpMessage::Data(data) = msg {
885            assert_eq!(data.name, "@setDataFrame");
886            assert_eq!(data.stream_id, 1);
887            assert_eq!(data.values.len(), 2);
888        } else {
889            panic!("Expected Data message");
890        }
891    }
892
893    #[test]
894    fn test_unknown_message_type() {
895        let chunk = RtmpChunk {
896            csid: CSID_COMMAND,
897            timestamp: 0,
898            message_type: 99, // Unknown type
899            stream_id: 0,
900            payload: Bytes::from_static(b"unknown"),
901        };
902
903        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
904        if let RtmpMessage::Unknown { type_id, data } = msg {
905            assert_eq!(type_id, 99);
906            assert_eq!(data.as_ref(), b"unknown");
907        } else {
908            panic!("Expected Unknown message");
909        }
910    }
911
912    #[test]
913    fn test_command_result() {
914        let mut props = HashMap::new();
915        props.insert(
916            "fmsVer".to_string(),
917            AmfValue::String("FMS/3,5,7,7009".into()),
918        );
919        props.insert("capabilities".to_string(), AmfValue::Number(31.0));
920
921        let result = Command::result(1.0, AmfValue::Object(props), AmfValue::Null);
922
923        assert_eq!(result.name, "_result");
924        assert_eq!(result.transaction_id, 1.0);
925    }
926
927    #[test]
928    fn test_command_error() {
929        let error = Command::error(1.0, AmfValue::Null, AmfValue::String("error".into()));
930
931        assert_eq!(error.name, "_error");
932        assert_eq!(error.transaction_id, 1.0);
933    }
934
935    #[test]
936    fn test_command_on_status() {
937        let status = Command::on_status(1, "status", NS_PUBLISH_START, "Publishing started");
938
939        assert_eq!(status.name, "onStatus");
940        assert_eq!(status.transaction_id, 0.0);
941        assert_eq!(status.stream_id, 1);
942
943        if let Some(info) = status.arguments.first() {
944            if let AmfValue::Object(props) = info {
945                assert_eq!(props.get("level").unwrap().as_str(), Some("status"));
946                assert_eq!(props.get("code").unwrap().as_str(), Some(NS_PUBLISH_START));
947            } else {
948                panic!("Expected Object in arguments");
949            }
950        } else {
951            panic!("Expected arguments");
952        }
953    }
954
955    #[test]
956    fn test_connect_params_all_fields() {
957        let mut obj = HashMap::new();
958        obj.insert("app".to_string(), AmfValue::String("live".into()));
959        obj.insert(
960            "flashVer".to_string(),
961            AmfValue::String("OBS-Studio/29.0".into()),
962        );
963        obj.insert(
964            "swfUrl".to_string(),
965            AmfValue::String("rtmp://example.com/app".into()),
966        );
967        obj.insert(
968            "tcUrl".to_string(),
969            AmfValue::String("rtmp://example.com/live".into()),
970        );
971        obj.insert("fpad".to_string(), AmfValue::Boolean(false));
972        obj.insert("audioCodecs".to_string(), AmfValue::Number(3575.0));
973        obj.insert("videoCodecs".to_string(), AmfValue::Number(252.0));
974        obj.insert("videoFunction".to_string(), AmfValue::Number(1.0));
975        obj.insert(
976            "pageUrl".to_string(),
977            AmfValue::String("http://twitch.tv".into()),
978        );
979        obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
980        obj.insert("custom".to_string(), AmfValue::String("value".into()));
981
982        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
983
984        assert_eq!(params.app, "live");
985        assert_eq!(params.flash_ver, Some("OBS-Studio/29.0".into()));
986        assert_eq!(params.swf_url, Some("rtmp://example.com/app".into()));
987        assert_eq!(params.tc_url, Some("rtmp://example.com/live".into()));
988        assert!(!params.fpad);
989        assert_eq!(params.audio_codecs, 3575);
990        assert_eq!(params.video_codecs, 252);
991        assert_eq!(params.video_function, 1);
992        assert_eq!(params.page_url, Some("http://twitch.tv".into()));
993        assert_eq!(params.object_encoding, 0.0);
994        assert!(params.extra.contains_key("custom"));
995    }
996
997    #[test]
998    fn test_connect_params_case_insensitive() {
999        // Test lowercase variants
1000        let mut obj = HashMap::new();
1001        obj.insert("flashver".to_string(), AmfValue::String("test".into()));
1002        obj.insert("tcurl".to_string(), AmfValue::String("url".into()));
1003        obj.insert("pageurl".to_string(), AmfValue::String("page".into()));
1004        obj.insert("swfurl".to_string(), AmfValue::String("swf".into()));
1005
1006        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
1007
1008        assert_eq!(params.flash_ver, Some("test".into()));
1009        assert_eq!(params.tc_url, Some("url".into()));
1010        assert_eq!(params.page_url, Some("page".into()));
1011        assert_eq!(params.swf_url, Some("swf".into()));
1012    }
1013
1014    #[test]
1015    fn test_connect_params_from_non_object() {
1016        // Should handle non-object gracefully
1017        let params = ConnectParams::from_amf(&AmfValue::Null);
1018        assert_eq!(params.app, "");
1019        assert!(params.flash_ver.is_none());
1020    }
1021
1022    #[test]
1023    fn test_message_encode_roundtrip() {
1024        // Test various messages encode/decode roundtrip
1025
1026        // SetChunkSize
1027        let msg = RtmpMessage::SetChunkSize(4096);
1028        let (msg_type, payload) = msg.encode();
1029        let chunk = RtmpChunk {
1030            csid: CSID_PROTOCOL_CONTROL,
1031            timestamp: 0,
1032            message_type: msg_type,
1033            stream_id: 0,
1034            payload,
1035        };
1036        let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1037        assert!(matches!(decoded, RtmpMessage::SetChunkSize(4096)));
1038
1039        // WindowAckSize
1040        let msg = RtmpMessage::WindowAckSize(2500000);
1041        let (msg_type, payload) = msg.encode();
1042        let chunk = RtmpChunk {
1043            csid: CSID_PROTOCOL_CONTROL,
1044            timestamp: 0,
1045            message_type: msg_type,
1046            stream_id: 0,
1047            payload,
1048        };
1049        let decoded = RtmpMessage::from_chunk(&chunk).unwrap();
1050        assert!(matches!(decoded, RtmpMessage::WindowAckSize(2500000)));
1051    }
1052
1053    #[test]
1054    fn test_user_control_event_encode() {
1055        // Test encoding of user control events
1056        let events = vec![
1057            RtmpMessage::UserControl(UserControlEvent::StreamBegin(1)),
1058            RtmpMessage::UserControl(UserControlEvent::StreamEof(2)),
1059            RtmpMessage::UserControl(UserControlEvent::StreamDry(3)),
1060            RtmpMessage::UserControl(UserControlEvent::StreamIsRecorded(4)),
1061            RtmpMessage::UserControl(UserControlEvent::PingRequest(5)),
1062            RtmpMessage::UserControl(UserControlEvent::PingResponse(6)),
1063            RtmpMessage::UserControl(UserControlEvent::SetBufferLength {
1064                stream_id: 1,
1065                buffer_ms: 1000,
1066            }),
1067        ];
1068
1069        for msg in events {
1070            let (msg_type, payload) = msg.encode();
1071            assert_eq!(msg_type, MSG_USER_CONTROL);
1072            assert!(!payload.is_empty());
1073        }
1074    }
1075
1076    #[test]
1077    fn test_aggregate_message() {
1078        let chunk = RtmpChunk {
1079            csid: CSID_VIDEO,
1080            timestamp: 0,
1081            message_type: MSG_AGGREGATE,
1082            stream_id: 1,
1083            payload: Bytes::from_static(b"aggregate data"),
1084        };
1085
1086        let msg = RtmpMessage::from_chunk(&chunk).unwrap();
1087        if let RtmpMessage::Aggregate { data } = msg {
1088            assert_eq!(data.as_ref(), b"aggregate data");
1089        } else {
1090            panic!("Expected Aggregate message");
1091        }
1092    }
1093
1094    #[test]
1095    fn test_truncated_protocol_control_messages() {
1096        // SetChunkSize with less than 4 bytes
1097        let chunk = RtmpChunk {
1098            csid: CSID_PROTOCOL_CONTROL,
1099            timestamp: 0,
1100            message_type: MSG_SET_CHUNK_SIZE,
1101            stream_id: 0,
1102            payload: Bytes::from_static(&[0x00, 0x00]), // Only 2 bytes
1103        };
1104
1105        let result = RtmpMessage::from_chunk(&chunk);
1106        assert!(result.is_err());
1107
1108        // WindowAckSize with less than 4 bytes
1109        let chunk = RtmpChunk {
1110            csid: CSID_PROTOCOL_CONTROL,
1111            timestamp: 0,
1112            message_type: MSG_WINDOW_ACK_SIZE,
1113            stream_id: 0,
1114            payload: Bytes::from_static(&[0x00]),
1115        };
1116
1117        let result = RtmpMessage::from_chunk(&chunk);
1118        assert!(result.is_err());
1119    }
1120}