rtmp_rs/amf/
amf3.rs

1//! AMF3 encoder and decoder
2//!
3//! AMF3 is the ActionScript 3.0 serialization format. It's more efficient
4//! than AMF0 due to better string/object references and a native integer type.
5//!
6//! Most RTMP implementations use AMF0 for commands. AMF3 support is included
7//! for completeness and for handling avmplus-object markers (0x11) in AMF0 streams.
8//!
9//! Type Markers:
10//! ```text
11//! 0x00 - Undefined
12//! 0x01 - Null
13//! 0x02 - Boolean false
14//! 0x03 - Boolean true
15//! 0x04 - Integer (29-bit signed)
16//! 0x05 - Double
17//! 0x06 - String
18//! 0x07 - XML Document (legacy)
19//! 0x08 - Date
20//! 0x09 - Array
21//! 0x0A - Object
22//! 0x0B - XML
23//! 0x0C - ByteArray
24//! ```
25
26use bytes::{Buf, BufMut, Bytes, BytesMut};
27use std::collections::HashMap;
28
29use super::value::AmfValue;
30use crate::error::AmfError;
31
32// AMF3 type markers
33const MARKER_UNDEFINED: u8 = 0x00;
34const MARKER_NULL: u8 = 0x01;
35const MARKER_FALSE: u8 = 0x02;
36const MARKER_TRUE: u8 = 0x03;
37const MARKER_INTEGER: u8 = 0x04;
38const MARKER_DOUBLE: u8 = 0x05;
39const MARKER_STRING: u8 = 0x06;
40const MARKER_XML_DOC: u8 = 0x07;
41const MARKER_DATE: u8 = 0x08;
42const MARKER_ARRAY: u8 = 0x09;
43const MARKER_OBJECT: u8 = 0x0A;
44const MARKER_XML: u8 = 0x0B;
45const MARKER_BYTE_ARRAY: u8 = 0x0C;
46
47/// Maximum nesting depth
48const MAX_NESTING_DEPTH: usize = 64;
49
50/// AMF3 29-bit integer bounds
51const AMF3_INT_MAX: i32 = 0x0FFFFFFF;
52const AMF3_INT_MIN: i32 = -0x10000000;
53
54/// AMF3 decoder with reference tables
55pub struct Amf3Decoder {
56    /// String reference table
57    string_refs: Vec<String>,
58    /// Object reference table
59    object_refs: Vec<AmfValue>,
60    /// Trait reference table (class definitions)
61    trait_refs: Vec<TraitDef>,
62    /// Enable lenient parsing
63    lenient: bool,
64    /// Current nesting depth
65    depth: usize,
66}
67
68/// Trait definition for typed objects
69#[derive(Clone, Debug)]
70struct TraitDef {
71    class_name: String,
72    is_dynamic: bool,
73    properties: Vec<String>,
74}
75
76impl Amf3Decoder {
77    /// Create a new decoder
78    pub fn new() -> Self {
79        Self {
80            string_refs: Vec::new(),
81            object_refs: Vec::new(),
82            trait_refs: Vec::new(),
83            lenient: true,
84            depth: 0,
85        }
86    }
87
88    /// Reset decoder state
89    pub fn reset(&mut self) {
90        self.string_refs.clear();
91        self.object_refs.clear();
92        self.trait_refs.clear();
93        self.depth = 0;
94    }
95
96    /// Decode a single AMF3 value
97    pub fn decode(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
98        if buf.is_empty() {
99            return Err(AmfError::UnexpectedEof);
100        }
101
102        self.depth += 1;
103        if self.depth > MAX_NESTING_DEPTH {
104            return Err(AmfError::NestingTooDeep);
105        }
106
107        let marker = buf.get_u8();
108        let result = self.decode_value(marker, buf);
109        self.depth -= 1;
110        result
111    }
112
113    fn decode_value(&mut self, marker: u8, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
114        match marker {
115            MARKER_UNDEFINED => Ok(AmfValue::Undefined),
116            MARKER_NULL => Ok(AmfValue::Null),
117            MARKER_FALSE => Ok(AmfValue::Boolean(false)),
118            MARKER_TRUE => Ok(AmfValue::Boolean(true)),
119            MARKER_INTEGER => self.decode_integer(buf),
120            MARKER_DOUBLE => self.decode_double(buf),
121            MARKER_STRING => self.decode_string(buf),
122            MARKER_DATE => self.decode_date(buf),
123            MARKER_ARRAY => self.decode_array(buf),
124            MARKER_OBJECT => self.decode_object(buf),
125            MARKER_BYTE_ARRAY => self.decode_byte_array(buf),
126            MARKER_XML | MARKER_XML_DOC => self.decode_xml(buf),
127            _ => {
128                if self.lenient {
129                    Ok(AmfValue::Undefined)
130                } else {
131                    Err(AmfError::UnknownMarker(marker))
132                }
133            }
134        }
135    }
136
137    fn decode_integer(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
138        let value = self.read_u29(buf)?;
139        // Sign-extend from 29 bits
140        let signed = if value & 0x10000000 != 0 {
141            (value as i32) | !0x1FFFFFFF
142        } else {
143            value as i32
144        };
145        Ok(AmfValue::Integer(signed))
146    }
147
148    fn decode_double(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
149        if buf.remaining() < 8 {
150            return Err(AmfError::UnexpectedEof);
151        }
152        Ok(AmfValue::Number(buf.get_f64()))
153    }
154
155    fn decode_string(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
156        let s = self.read_string(buf)?;
157        Ok(AmfValue::String(s))
158    }
159
160    fn decode_date(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
161        let header = self.read_u29(buf)?;
162
163        if header & 1 == 0 {
164            // Reference
165            let idx = (header >> 1) as usize;
166            if idx >= self.object_refs.len() {
167                return Err(AmfError::InvalidReference(idx as u16));
168            }
169            return Ok(self.object_refs[idx].clone());
170        }
171
172        if buf.remaining() < 8 {
173            return Err(AmfError::UnexpectedEof);
174        }
175
176        let timestamp = buf.get_f64();
177        let value = AmfValue::Date(timestamp);
178        self.object_refs.push(value.clone());
179        Ok(value)
180    }
181
182    fn decode_array(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
183        let header = self.read_u29(buf)?;
184
185        if header & 1 == 0 {
186            // Reference
187            let idx = (header >> 1) as usize;
188            if idx >= self.object_refs.len() {
189                return Err(AmfError::InvalidReference(idx as u16));
190            }
191            return Ok(self.object_refs[idx].clone());
192        }
193
194        let dense_count = (header >> 1) as usize;
195
196        // Placeholder for self-reference
197        let arr_idx = self.object_refs.len();
198        self.object_refs.push(AmfValue::Null);
199
200        // Read associative portion (key-value pairs until empty string)
201        let mut assoc = HashMap::new();
202        loop {
203            let key = self.read_string(buf)?;
204            if key.is_empty() {
205                break;
206            }
207            let value = self.decode(buf)?;
208            assoc.insert(key, value);
209        }
210
211        // Read dense portion
212        let mut dense = Vec::with_capacity(dense_count.min(1024));
213        for _ in 0..dense_count {
214            dense.push(self.decode(buf)?);
215        }
216
217        let value = if assoc.is_empty() {
218            AmfValue::Array(dense)
219        } else {
220            // Mixed array - store as ECMA array with dense values as string keys
221            for (i, v) in dense.into_iter().enumerate() {
222                assoc.insert(i.to_string(), v);
223            }
224            AmfValue::EcmaArray(assoc)
225        };
226
227        self.object_refs[arr_idx] = value.clone();
228        Ok(value)
229    }
230
231    fn decode_object(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
232        let header = self.read_u29(buf)?;
233
234        if header & 1 == 0 {
235            // Object reference
236            let idx = (header >> 1) as usize;
237            if idx >= self.object_refs.len() {
238                return Err(AmfError::InvalidReference(idx as u16));
239            }
240            return Ok(self.object_refs[idx].clone());
241        }
242
243        // Placeholder for self-reference
244        let obj_idx = self.object_refs.len();
245        self.object_refs.push(AmfValue::Null);
246
247        let trait_def = if header & 2 == 0 {
248            // Trait reference
249            let idx = (header >> 2) as usize;
250            if idx >= self.trait_refs.len() {
251                return Err(AmfError::InvalidReference(idx as u16));
252            }
253            self.trait_refs[idx].clone()
254        } else {
255            // Inline trait
256            let is_dynamic = (header & 8) != 0;
257            let sealed_count = (header >> 4) as usize;
258
259            let class_name = self.read_string(buf)?;
260
261            let mut properties = Vec::with_capacity(sealed_count);
262            for _ in 0..sealed_count {
263                properties.push(self.read_string(buf)?);
264            }
265
266            let trait_def = TraitDef {
267                class_name,
268                is_dynamic,
269                properties,
270            };
271            self.trait_refs.push(trait_def.clone());
272            trait_def
273        };
274
275        let mut props = HashMap::new();
276
277        // Read sealed properties
278        for prop_name in &trait_def.properties {
279            let value = self.decode(buf)?;
280            props.insert(prop_name.clone(), value);
281        }
282
283        // Read dynamic properties
284        if trait_def.is_dynamic {
285            loop {
286                let key = self.read_string(buf)?;
287                if key.is_empty() {
288                    break;
289                }
290                let value = self.decode(buf)?;
291                props.insert(key, value);
292            }
293        }
294
295        let value = if trait_def.class_name.is_empty() {
296            AmfValue::Object(props)
297        } else {
298            AmfValue::TypedObject {
299                class_name: trait_def.class_name,
300                properties: props,
301            }
302        };
303
304        self.object_refs[obj_idx] = value.clone();
305        Ok(value)
306    }
307
308    fn decode_byte_array(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
309        let header = self.read_u29(buf)?;
310
311        if header & 1 == 0 {
312            let idx = (header >> 1) as usize;
313            if idx >= self.object_refs.len() {
314                return Err(AmfError::InvalidReference(idx as u16));
315            }
316            return Ok(self.object_refs[idx].clone());
317        }
318
319        let len = (header >> 1) as usize;
320        if buf.remaining() < len {
321            return Err(AmfError::UnexpectedEof);
322        }
323
324        let data = buf.copy_to_bytes(len).to_vec();
325        let value = AmfValue::ByteArray(data);
326        self.object_refs.push(value.clone());
327        Ok(value)
328    }
329
330    fn decode_xml(&mut self, buf: &mut Bytes) -> Result<AmfValue, AmfError> {
331        let header = self.read_u29(buf)?;
332
333        if header & 1 == 0 {
334            let idx = (header >> 1) as usize;
335            if idx >= self.object_refs.len() {
336                return Err(AmfError::InvalidReference(idx as u16));
337            }
338            return Ok(self.object_refs[idx].clone());
339        }
340
341        let len = (header >> 1) as usize;
342        if buf.remaining() < len {
343            return Err(AmfError::UnexpectedEof);
344        }
345
346        let bytes = buf.copy_to_bytes(len);
347        let s = String::from_utf8(bytes.to_vec()).map_err(|_| AmfError::InvalidUtf8)?;
348        let value = AmfValue::Xml(s);
349        self.object_refs.push(value.clone());
350        Ok(value)
351    }
352
353    /// Read AMF3 U29 variable-length integer
354    fn read_u29(&mut self, buf: &mut Bytes) -> Result<u32, AmfError> {
355        let mut value: u32 = 0;
356
357        for i in 0..4 {
358            if buf.is_empty() {
359                return Err(AmfError::UnexpectedEof);
360            }
361
362            let byte = buf.get_u8();
363
364            if i < 3 {
365                value = (value << 7) | ((byte & 0x7F) as u32);
366                if byte & 0x80 == 0 {
367                    return Ok(value);
368                }
369            } else {
370                // Fourth byte uses all 8 bits
371                value = (value << 8) | (byte as u32);
372                return Ok(value);
373            }
374        }
375
376        Ok(value)
377    }
378
379    /// Read AMF3 string (with reference handling)
380    fn read_string(&mut self, buf: &mut Bytes) -> Result<String, AmfError> {
381        let header = self.read_u29(buf)?;
382
383        if header & 1 == 0 {
384            // Reference
385            let idx = (header >> 1) as usize;
386            if idx >= self.string_refs.len() {
387                return Err(AmfError::InvalidReference(idx as u16));
388            }
389            return Ok(self.string_refs[idx].clone());
390        }
391
392        let len = (header >> 1) as usize;
393        if len == 0 {
394            return Ok(String::new());
395        }
396
397        if buf.remaining() < len {
398            return Err(AmfError::UnexpectedEof);
399        }
400
401        let bytes = buf.copy_to_bytes(len);
402        let s = String::from_utf8(bytes.to_vec()).map_err(|_| AmfError::InvalidUtf8)?;
403
404        // Only non-empty strings go into reference table
405        self.string_refs.push(s.clone());
406        Ok(s)
407    }
408}
409
410impl Default for Amf3Decoder {
411    fn default() -> Self {
412        Self::new()
413    }
414}
415
416/// AMF3 encoder
417pub struct Amf3Encoder {
418    buf: BytesMut,
419    string_refs: HashMap<String, usize>,
420}
421
422impl Amf3Encoder {
423    /// Create a new encoder
424    pub fn new() -> Self {
425        Self {
426            buf: BytesMut::with_capacity(256),
427            string_refs: HashMap::new(),
428        }
429    }
430
431    /// Get encoded bytes and reset
432    pub fn finish(&mut self) -> Bytes {
433        self.string_refs.clear();
434        self.buf.split().freeze()
435    }
436
437    /// Encode a single AMF3 value
438    pub fn encode(&mut self, value: &AmfValue) {
439        match value {
440            AmfValue::Undefined => self.buf.put_u8(MARKER_UNDEFINED),
441            AmfValue::Null => self.buf.put_u8(MARKER_NULL),
442            AmfValue::Boolean(false) => self.buf.put_u8(MARKER_FALSE),
443            AmfValue::Boolean(true) => self.buf.put_u8(MARKER_TRUE),
444            AmfValue::Integer(i) if *i >= AMF3_INT_MIN && *i <= AMF3_INT_MAX => {
445                self.buf.put_u8(MARKER_INTEGER);
446                self.write_u29(*i as u32 & 0x1FFFFFFF);
447            }
448            AmfValue::Integer(i) => {
449                self.buf.put_u8(MARKER_DOUBLE);
450                self.buf.put_f64(*i as f64);
451            }
452            AmfValue::Number(n) => {
453                self.buf.put_u8(MARKER_DOUBLE);
454                self.buf.put_f64(*n);
455            }
456            AmfValue::String(s) => {
457                self.buf.put_u8(MARKER_STRING);
458                self.write_string(s);
459            }
460            AmfValue::Array(elements) => {
461                self.buf.put_u8(MARKER_ARRAY);
462                let header = ((elements.len() as u32) << 1) | 1;
463                self.write_u29(header);
464                // Empty associative portion
465                self.write_u29(1); // Empty string marker
466                for elem in elements {
467                    self.encode(elem);
468                }
469            }
470            AmfValue::Object(props) | AmfValue::EcmaArray(props) => {
471                self.buf.put_u8(MARKER_OBJECT);
472                // Dynamic anonymous object
473                let header = (0 << 4) | (1 << 3) | (1 << 2) | (1 << 1) | 1;
474                self.write_u29(header);
475                self.write_string(""); // Empty class name
476                for (key, val) in props {
477                    self.write_string(key);
478                    self.encode(val);
479                }
480                self.write_string(""); // End marker
481            }
482            AmfValue::TypedObject {
483                class_name,
484                properties,
485            } => {
486                self.buf.put_u8(MARKER_OBJECT);
487                let header = (0 << 4) | (1 << 3) | (1 << 2) | (1 << 1) | 1;
488                self.write_u29(header);
489                self.write_string(class_name);
490                for (key, val) in properties {
491                    self.write_string(key);
492                    self.encode(val);
493                }
494                self.write_string("");
495            }
496            AmfValue::Date(timestamp) => {
497                self.buf.put_u8(MARKER_DATE);
498                self.write_u29(1); // Inline
499                self.buf.put_f64(*timestamp);
500            }
501            AmfValue::Xml(s) => {
502                self.buf.put_u8(MARKER_XML);
503                let header = ((s.len() as u32) << 1) | 1;
504                self.write_u29(header);
505                self.buf.put_slice(s.as_bytes());
506            }
507            AmfValue::ByteArray(data) => {
508                self.buf.put_u8(MARKER_BYTE_ARRAY);
509                let header = ((data.len() as u32) << 1) | 1;
510                self.write_u29(header);
511                self.buf.put_slice(data);
512            }
513        }
514    }
515
516    /// Write U29 variable-length integer
517    fn write_u29(&mut self, value: u32) {
518        let value = value & 0x1FFFFFFF;
519
520        if value < 0x80 {
521            self.buf.put_u8(value as u8);
522        } else if value < 0x4000 {
523            self.buf.put_u8(((value >> 7) | 0x80) as u8);
524            self.buf.put_u8((value & 0x7F) as u8);
525        } else if value < 0x200000 {
526            self.buf.put_u8(((value >> 14) | 0x80) as u8);
527            self.buf.put_u8(((value >> 7) | 0x80) as u8);
528            self.buf.put_u8((value & 0x7F) as u8);
529        } else {
530            self.buf.put_u8(((value >> 22) | 0x80) as u8);
531            self.buf.put_u8(((value >> 15) | 0x80) as u8);
532            self.buf.put_u8(((value >> 8) | 0x80) as u8);
533            self.buf.put_u8((value & 0xFF) as u8);
534        }
535    }
536
537    /// Write string with reference handling
538    fn write_string(&mut self, s: &str) {
539        if s.is_empty() {
540            self.write_u29(1); // Empty string marker
541            return;
542        }
543
544        if let Some(&idx) = self.string_refs.get(s) {
545            // Reference
546            self.write_u29((idx as u32) << 1);
547        } else {
548            // Inline
549            let idx = self.string_refs.len();
550            self.string_refs.insert(s.to_string(), idx);
551            let header = ((s.len() as u32) << 1) | 1;
552            self.write_u29(header);
553            self.buf.put_slice(s.as_bytes());
554        }
555    }
556}
557
558impl Default for Amf3Encoder {
559    fn default() -> Self {
560        Self::new()
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567
568    #[test]
569    fn test_u29_encoding() {
570        let mut encoder = Amf3Encoder::new();
571
572        // Test various ranges
573        encoder.write_u29(0);
574        encoder.write_u29(127);
575        encoder.write_u29(128);
576        encoder.write_u29(16383);
577        encoder.write_u29(16384);
578        encoder.write_u29(2097151);
579        encoder.write_u29(2097152);
580
581        let encoded = encoder.finish();
582
583        let mut decoder = Amf3Decoder::new();
584        let mut buf = encoded;
585
586        assert_eq!(decoder.read_u29(&mut buf).unwrap(), 0);
587        assert_eq!(decoder.read_u29(&mut buf).unwrap(), 127);
588        assert_eq!(decoder.read_u29(&mut buf).unwrap(), 128);
589        assert_eq!(decoder.read_u29(&mut buf).unwrap(), 16383);
590        assert_eq!(decoder.read_u29(&mut buf).unwrap(), 16384);
591        assert_eq!(decoder.read_u29(&mut buf).unwrap(), 2097151);
592        assert_eq!(decoder.read_u29(&mut buf).unwrap(), 2097152);
593    }
594
595    #[test]
596    fn test_string_roundtrip() {
597        let mut encoder = Amf3Encoder::new();
598        encoder.encode(&AmfValue::String("hello".into()));
599        let encoded = encoder.finish();
600
601        let mut decoder = Amf3Decoder::new();
602        let mut buf = encoded;
603        let decoded = decoder.decode(&mut buf).unwrap();
604        assert_eq!(decoded, AmfValue::String("hello".into()));
605    }
606}