rtmp_rs/protocol/
message.rs

1//! RTMP message types and parsing
2//!
3//! RTMP messages are classified into:
4//! - Protocol Control Messages (types 1-6): Chunk/flow control
5//! - Command Messages (types 17, 20): AMF-encoded commands
6//! - Data Messages (types 15, 18): Metadata
7//! - Audio/Video Messages (types 8, 9): Media data
8//!
9//! Reference: RTMP Specification Section 5.4
10
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use std::collections::HashMap;
13
14use crate::amf::{Amf0Decoder, Amf0Encoder, AmfValue};
15use crate::error::{AmfError, ProtocolError, Result};
16use crate::protocol::chunk::RtmpChunk;
17use crate::protocol::constants::*;
18
19/// Parsed RTMP message
20#[derive(Debug, Clone)]
21pub enum RtmpMessage {
22    /// Set Chunk Size (type 1)
23    SetChunkSize(u32),
24
25    /// Abort Message (type 2)
26    Abort { csid: u32 },
27
28    /// Acknowledgement (type 3)
29    Acknowledgement { sequence: u32 },
30
31    /// User Control Message (type 4)
32    UserControl(UserControlEvent),
33
34    /// Window Acknowledgement Size (type 5)
35    WindowAckSize(u32),
36
37    /// Set Peer Bandwidth (type 6)
38    SetPeerBandwidth { size: u32, limit_type: u8 },
39
40    /// Audio data (type 8)
41    Audio { timestamp: u32, data: Bytes },
42
43    /// Video data (type 9)
44    Video { timestamp: u32, data: Bytes },
45
46    /// AMF0 Command (type 20)
47    Command(Command),
48
49    /// AMF0 Data message (type 18) - metadata, etc.
50    Data(DataMessage),
51
52    /// AMF3 Command (type 17)
53    CommandAmf3(Command),
54
55    /// AMF3 Data message (type 15)
56    DataAmf3(DataMessage),
57
58    /// Aggregate message (type 22)
59    Aggregate { data: Bytes },
60
61    /// Unknown message type
62    Unknown { type_id: u8, data: Bytes },
63}
64
65/// User Control Event
66#[derive(Debug, Clone)]
67pub enum UserControlEvent {
68    StreamBegin(u32),
69    StreamEof(u32),
70    StreamDry(u32),
71    SetBufferLength { stream_id: u32, buffer_ms: u32 },
72    StreamIsRecorded(u32),
73    PingRequest(u32),
74    PingResponse(u32),
75    Unknown { event_type: u16, data: Bytes },
76}
77
78/// RTMP command (connect, publish, play, etc.)
79#[derive(Debug, Clone)]
80pub struct Command {
81    /// Command name
82    pub name: String,
83    /// Transaction ID
84    pub transaction_id: f64,
85    /// Command object (often null for responses)
86    pub command_object: AmfValue,
87    /// Additional arguments
88    pub arguments: Vec<AmfValue>,
89    /// Message stream ID (from chunk)
90    pub stream_id: u32,
91}
92
93/// Data message (@setDataFrame, onMetaData, etc.)
94#[derive(Debug, Clone)]
95pub struct DataMessage {
96    /// Handler name (e.g., "@setDataFrame", "onMetaData")
97    pub name: String,
98    /// Data values
99    pub values: Vec<AmfValue>,
100    /// Message stream ID
101    pub stream_id: u32,
102}
103
104/// Connect command parameters
105#[derive(Debug, Clone, Default)]
106pub struct ConnectParams {
107    /// Application name
108    pub app: String,
109    /// Flash version
110    pub flash_ver: Option<String>,
111    /// SWF URL
112    pub swf_url: Option<String>,
113    /// TC URL (full RTMP URL)
114    pub tc_url: Option<String>,
115    /// Is FPAD
116    pub fpad: bool,
117    /// Audio codecs
118    pub audio_codecs: u32,
119    /// Video codecs
120    pub video_codecs: u32,
121    /// Video function
122    pub video_function: u32,
123    /// Page URL
124    pub page_url: Option<String>,
125    /// Object encoding (AMF version)
126    pub object_encoding: f64,
127    /// Extra properties from connect object
128    pub extra: HashMap<String, AmfValue>,
129}
130
131impl ConnectParams {
132    /// Parse from AMF command object
133    pub fn from_amf(obj: &AmfValue) -> Self {
134        let mut params = ConnectParams::default();
135
136        if let Some(map) = obj.as_object() {
137            for (key, value) in map {
138                match key.as_str() {
139                    "app" => {
140                        if let Some(s) = value.as_str() {
141                            params.app = s.to_string();
142                        }
143                    }
144                    "flashVer" | "flashver" => {
145                        params.flash_ver = value.as_str().map(|s| s.to_string());
146                    }
147                    "swfUrl" | "swfurl" => {
148                        params.swf_url = value.as_str().map(|s| s.to_string());
149                    }
150                    "tcUrl" | "tcurl" => {
151                        params.tc_url = value.as_str().map(|s| s.to_string());
152                    }
153                    "fpad" => {
154                        params.fpad = value.as_bool().unwrap_or(false);
155                    }
156                    "audioCodecs" | "audiocodecs" => {
157                        params.audio_codecs = value.as_number().unwrap_or(0.0) as u32;
158                    }
159                    "videoCodecs" | "videocodecs" => {
160                        params.video_codecs = value.as_number().unwrap_or(0.0) as u32;
161                    }
162                    "videoFunction" | "videofunction" => {
163                        params.video_function = value.as_number().unwrap_or(0.0) as u32;
164                    }
165                    "pageUrl" | "pageurl" => {
166                        params.page_url = value.as_str().map(|s| s.to_string());
167                    }
168                    "objectEncoding" | "objectencoding" => {
169                        params.object_encoding = value.as_number().unwrap_or(0.0);
170                    }
171                    _ => {
172                        params.extra.insert(key.clone(), value.clone());
173                    }
174                }
175            }
176        }
177
178        params
179    }
180}
181
182/// Publish command parameters
183#[derive(Debug, Clone)]
184pub struct PublishParams {
185    /// Stream key (name)
186    pub stream_key: String,
187    /// Publish type: "live", "record", "append"
188    pub publish_type: String,
189    /// Message stream ID
190    pub stream_id: u32,
191}
192
193/// Play command parameters
194#[derive(Debug, Clone)]
195pub struct PlayParams {
196    /// Stream name
197    pub stream_name: String,
198    /// Start time (-2 = live, -1 = live or recorded, >= 0 = specific time)
199    pub start: f64,
200    /// Duration (-1 = until end)
201    pub duration: f64,
202    /// Reset flag
203    pub reset: bool,
204    /// Message stream ID
205    pub stream_id: u32,
206}
207
208impl RtmpMessage {
209    /// Parse a message from a chunk
210    pub fn from_chunk(chunk: &RtmpChunk) -> Result<Self> {
211        let mut payload = chunk.payload.clone();
212
213        match chunk.message_type {
214            MSG_SET_CHUNK_SIZE => {
215                if payload.len() < 4 {
216                    return Err(ProtocolError::InvalidChunkHeader.into());
217                }
218                let size = payload.get_u32() & 0x7FFFFFFF; // Ignore MSB
219                Ok(RtmpMessage::SetChunkSize(size))
220            }
221
222            MSG_ABORT => {
223                if payload.len() < 4 {
224                    return Err(ProtocolError::InvalidChunkHeader.into());
225                }
226                Ok(RtmpMessage::Abort {
227                    csid: payload.get_u32(),
228                })
229            }
230
231            MSG_ACKNOWLEDGEMENT => {
232                if payload.len() < 4 {
233                    return Err(ProtocolError::InvalidChunkHeader.into());
234                }
235                Ok(RtmpMessage::Acknowledgement {
236                    sequence: payload.get_u32(),
237                })
238            }
239
240            MSG_USER_CONTROL => Self::parse_user_control(&mut payload),
241
242            MSG_WINDOW_ACK_SIZE => {
243                if payload.len() < 4 {
244                    return Err(ProtocolError::InvalidChunkHeader.into());
245                }
246                Ok(RtmpMessage::WindowAckSize(payload.get_u32()))
247            }
248
249            MSG_SET_PEER_BANDWIDTH => {
250                if payload.len() < 5 {
251                    return Err(ProtocolError::InvalidChunkHeader.into());
252                }
253                let size = payload.get_u32();
254                let limit_type = payload.get_u8();
255                Ok(RtmpMessage::SetPeerBandwidth { size, limit_type })
256            }
257
258            MSG_AUDIO => Ok(RtmpMessage::Audio {
259                timestamp: chunk.timestamp,
260                data: payload,
261            }),
262
263            MSG_VIDEO => Ok(RtmpMessage::Video {
264                timestamp: chunk.timestamp,
265                data: payload,
266            }),
267
268            MSG_COMMAND_AMF0 => {
269                let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
270                Ok(RtmpMessage::Command(cmd))
271            }
272
273            MSG_COMMAND_AMF3 => {
274                // Skip AMF3 marker byte if present
275                if !payload.is_empty() && payload[0] == 0x00 {
276                    payload.advance(1);
277                }
278                let cmd = Self::parse_command(&mut payload, chunk.stream_id)?;
279                Ok(RtmpMessage::CommandAmf3(cmd))
280            }
281
282            MSG_DATA_AMF0 => {
283                let data = Self::parse_data(&mut payload, chunk.stream_id)?;
284                Ok(RtmpMessage::Data(data))
285            }
286
287            MSG_DATA_AMF3 => {
288                if !payload.is_empty() && payload[0] == 0x00 {
289                    payload.advance(1);
290                }
291                let data = Self::parse_data(&mut payload, chunk.stream_id)?;
292                Ok(RtmpMessage::DataAmf3(data))
293            }
294
295            MSG_AGGREGATE => Ok(RtmpMessage::Aggregate { data: payload }),
296
297            _ => Ok(RtmpMessage::Unknown {
298                type_id: chunk.message_type,
299                data: payload,
300            }),
301        }
302    }
303
304    /// Parse User Control message
305    fn parse_user_control(payload: &mut Bytes) -> Result<Self> {
306        if payload.len() < 6 {
307            return Err(ProtocolError::InvalidChunkHeader.into());
308        }
309
310        let event_type = payload.get_u16();
311        let event = match event_type {
312            UC_STREAM_BEGIN => UserControlEvent::StreamBegin(payload.get_u32()),
313            UC_STREAM_EOF => UserControlEvent::StreamEof(payload.get_u32()),
314            UC_STREAM_DRY => UserControlEvent::StreamDry(payload.get_u32()),
315            UC_SET_BUFFER_LENGTH => {
316                if payload.len() < 8 {
317                    return Err(ProtocolError::InvalidChunkHeader.into());
318                }
319                let stream_id = payload.get_u32();
320                let buffer_ms = payload.get_u32();
321                UserControlEvent::SetBufferLength {
322                    stream_id,
323                    buffer_ms,
324                }
325            }
326            UC_STREAM_IS_RECORDED => UserControlEvent::StreamIsRecorded(payload.get_u32()),
327            UC_PING_REQUEST => UserControlEvent::PingRequest(payload.get_u32()),
328            UC_PING_RESPONSE => UserControlEvent::PingResponse(payload.get_u32()),
329            _ => UserControlEvent::Unknown {
330                event_type,
331                data: payload.clone(),
332            },
333        };
334
335        Ok(RtmpMessage::UserControl(event))
336    }
337
338    /// Parse AMF0 command
339    fn parse_command(payload: &mut Bytes, stream_id: u32) -> Result<Command> {
340        let mut decoder = Amf0Decoder::new();
341
342        // Command name
343        let name = match decoder.decode(payload)? {
344            AmfValue::String(s) => s,
345            _ => return Err(ProtocolError::InvalidCommand("Expected command name".into()).into()),
346        };
347
348        // Transaction ID
349        let transaction_id = match decoder.decode(payload)? {
350            AmfValue::Number(n) => n,
351            _ => 0.0, // Lenient: default to 0
352        };
353
354        // Command object (can be null)
355        let command_object = if payload.has_remaining() {
356            decoder.decode(payload)?
357        } else {
358            AmfValue::Null
359        };
360
361        // Additional arguments
362        let mut arguments = Vec::new();
363        while payload.has_remaining() {
364            match decoder.decode(payload) {
365                Ok(v) => arguments.push(v),
366                Err(AmfError::UnexpectedEof) => break,
367                Err(e) => return Err(e.into()),
368            }
369        }
370
371        Ok(Command {
372            name,
373            transaction_id,
374            command_object,
375            arguments,
376            stream_id,
377        })
378    }
379
380    /// Parse AMF0 data message
381    fn parse_data(payload: &mut Bytes, stream_id: u32) -> Result<DataMessage> {
382        let mut decoder = Amf0Decoder::new();
383
384        // Handler name
385        let name = match decoder.decode(payload)? {
386            AmfValue::String(s) => s,
387            _ => String::new(), // Lenient
388        };
389
390        // Data values
391        let mut values = Vec::new();
392        while payload.has_remaining() {
393            match decoder.decode(payload) {
394                Ok(v) => values.push(v),
395                Err(AmfError::UnexpectedEof) => break,
396                Err(e) => return Err(e.into()),
397            }
398        }
399
400        Ok(DataMessage {
401            name,
402            values,
403            stream_id,
404        })
405    }
406
407    /// Encode message to chunk payload
408    pub fn encode(&self) -> (u8, Bytes) {
409        match self {
410            RtmpMessage::SetChunkSize(size) => {
411                let mut buf = BytesMut::with_capacity(4);
412                buf.put_u32(*size);
413                (MSG_SET_CHUNK_SIZE, buf.freeze())
414            }
415
416            RtmpMessage::Abort { csid } => {
417                let mut buf = BytesMut::with_capacity(4);
418                buf.put_u32(*csid);
419                (MSG_ABORT, buf.freeze())
420            }
421
422            RtmpMessage::Acknowledgement { sequence } => {
423                let mut buf = BytesMut::with_capacity(4);
424                buf.put_u32(*sequence);
425                (MSG_ACKNOWLEDGEMENT, buf.freeze())
426            }
427
428            RtmpMessage::WindowAckSize(size) => {
429                let mut buf = BytesMut::with_capacity(4);
430                buf.put_u32(*size);
431                (MSG_WINDOW_ACK_SIZE, buf.freeze())
432            }
433
434            RtmpMessage::SetPeerBandwidth { size, limit_type } => {
435                let mut buf = BytesMut::with_capacity(5);
436                buf.put_u32(*size);
437                buf.put_u8(*limit_type);
438                (MSG_SET_PEER_BANDWIDTH, buf.freeze())
439            }
440
441            RtmpMessage::UserControl(event) => {
442                let mut buf = BytesMut::with_capacity(10);
443                match event {
444                    UserControlEvent::StreamBegin(id) => {
445                        buf.put_u16(UC_STREAM_BEGIN);
446                        buf.put_u32(*id);
447                    }
448                    UserControlEvent::StreamEof(id) => {
449                        buf.put_u16(UC_STREAM_EOF);
450                        buf.put_u32(*id);
451                    }
452                    UserControlEvent::StreamDry(id) => {
453                        buf.put_u16(UC_STREAM_DRY);
454                        buf.put_u32(*id);
455                    }
456                    UserControlEvent::SetBufferLength {
457                        stream_id,
458                        buffer_ms,
459                    } => {
460                        buf.put_u16(UC_SET_BUFFER_LENGTH);
461                        buf.put_u32(*stream_id);
462                        buf.put_u32(*buffer_ms);
463                    }
464                    UserControlEvent::StreamIsRecorded(id) => {
465                        buf.put_u16(UC_STREAM_IS_RECORDED);
466                        buf.put_u32(*id);
467                    }
468                    UserControlEvent::PingRequest(ts) => {
469                        buf.put_u16(UC_PING_REQUEST);
470                        buf.put_u32(*ts);
471                    }
472                    UserControlEvent::PingResponse(ts) => {
473                        buf.put_u16(UC_PING_RESPONSE);
474                        buf.put_u32(*ts);
475                    }
476                    UserControlEvent::Unknown { event_type, data } => {
477                        buf.put_u16(*event_type);
478                        buf.put_slice(data);
479                    }
480                }
481                (MSG_USER_CONTROL, buf.freeze())
482            }
483
484            RtmpMessage::Audio { data, .. } => (MSG_AUDIO, data.clone()),
485
486            RtmpMessage::Video { data, .. } => (MSG_VIDEO, data.clone()),
487
488            RtmpMessage::Command(cmd) => {
489                let payload = encode_command(cmd);
490                (MSG_COMMAND_AMF0, payload)
491            }
492
493            RtmpMessage::CommandAmf3(cmd) => {
494                let mut buf = BytesMut::new();
495                buf.put_u8(0x00); // AMF3 marker
496                buf.put_slice(&encode_command(cmd));
497                (MSG_COMMAND_AMF3, buf.freeze())
498            }
499
500            RtmpMessage::Data(data) => {
501                let payload = encode_data(data);
502                (MSG_DATA_AMF0, payload)
503            }
504
505            RtmpMessage::DataAmf3(data) => {
506                let mut buf = BytesMut::new();
507                buf.put_u8(0x00);
508                buf.put_slice(&encode_data(data));
509                (MSG_DATA_AMF3, buf.freeze())
510            }
511
512            RtmpMessage::Aggregate { data } => (MSG_AGGREGATE, data.clone()),
513
514            RtmpMessage::Unknown { type_id, data } => (*type_id, data.clone()),
515        }
516    }
517}
518
519/// Encode a command to AMF0 bytes
520fn encode_command(cmd: &Command) -> Bytes {
521    let mut encoder = Amf0Encoder::new();
522    encoder.encode(&AmfValue::String(cmd.name.clone()));
523    encoder.encode(&AmfValue::Number(cmd.transaction_id));
524    encoder.encode(&cmd.command_object);
525    for arg in &cmd.arguments {
526        encoder.encode(arg);
527    }
528    encoder.finish()
529}
530
531/// Encode a data message to AMF0 bytes
532fn encode_data(data: &DataMessage) -> Bytes {
533    let mut encoder = Amf0Encoder::new();
534    encoder.encode(&AmfValue::String(data.name.clone()));
535    for value in &data.values {
536        encoder.encode(value);
537    }
538    encoder.finish()
539}
540
541/// Build common response messages
542impl Command {
543    /// Create a _result response
544    pub fn result(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
545        Command {
546            name: CMD_RESULT.to_string(),
547            transaction_id,
548            command_object: properties,
549            arguments: vec![info],
550            stream_id: 0,
551        }
552    }
553
554    /// Create an _error response
555    pub fn error(transaction_id: f64, properties: AmfValue, info: AmfValue) -> Self {
556        Command {
557            name: CMD_ERROR.to_string(),
558            transaction_id,
559            command_object: properties,
560            arguments: vec![info],
561            stream_id: 0,
562        }
563    }
564
565    /// Create an onStatus response
566    pub fn on_status(stream_id: u32, level: &str, code: &str, description: &str) -> Self {
567        let mut info = HashMap::new();
568        info.insert("level".to_string(), AmfValue::String(level.to_string()));
569        info.insert("code".to_string(), AmfValue::String(code.to_string()));
570        info.insert(
571            "description".to_string(),
572            AmfValue::String(description.to_string()),
573        );
574
575        Command {
576            name: CMD_ON_STATUS.to_string(),
577            transaction_id: 0.0,
578            command_object: AmfValue::Null,
579            arguments: vec![AmfValue::Object(info)],
580            stream_id,
581        }
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use super::*;
588
589    #[test]
590    fn test_connect_params_parsing() {
591        let mut obj = HashMap::new();
592        obj.insert("app".to_string(), AmfValue::String("live".into()));
593        obj.insert(
594            "tcUrl".to_string(),
595            AmfValue::String("rtmp://localhost/live".into()),
596        );
597        obj.insert("objectEncoding".to_string(), AmfValue::Number(0.0));
598
599        let params = ConnectParams::from_amf(&AmfValue::Object(obj));
600        assert_eq!(params.app, "live");
601        assert_eq!(params.tc_url, Some("rtmp://localhost/live".into()));
602        assert_eq!(params.object_encoding, 0.0);
603    }
604
605    #[test]
606    fn test_command_roundtrip() {
607        let cmd = Command {
608            name: "connect".to_string(),
609            transaction_id: 1.0,
610            command_object: AmfValue::Null,
611            arguments: vec![AmfValue::String("test".into())],
612            stream_id: 0,
613        };
614
615        let payload = encode_command(&cmd);
616        let chunk = RtmpChunk {
617            csid: CSID_COMMAND,
618            timestamp: 0,
619            message_type: MSG_COMMAND_AMF0,
620            stream_id: 0,
621            payload,
622        };
623
624        let parsed = RtmpMessage::from_chunk(&chunk).unwrap();
625        if let RtmpMessage::Command(parsed_cmd) = parsed {
626            assert_eq!(parsed_cmd.name, "connect");
627            assert_eq!(parsed_cmd.transaction_id, 1.0);
628        } else {
629            panic!("Expected Command message");
630        }
631    }
632}