arcly-stream 0.1.5

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! A minimal **AMF0** encoder/decoder — the subset RTMP command messages use
//! (`connect`, `createStream`, `publish`, `play`, `onStatus`, `onMetaData`).
//!
//! AMF3 is not implemented; RTMP command/data channels are AMF0 in every common
//! client (OBS, FFmpeg, librtmp). Unsupported markers decode to
//! [`Amf0Value::Unsupported`] so a stray field never aborts a session.

use bytes::{BufMut, BytesMut};

// AMF0 type markers.
const MARKER_NUMBER: u8 = 0x00;
const MARKER_BOOLEAN: u8 = 0x01;
const MARKER_STRING: u8 = 0x02;
const MARKER_OBJECT: u8 = 0x03;
const MARKER_NULL: u8 = 0x05;
const MARKER_UNDEFINED: u8 = 0x06;
const MARKER_ECMA_ARRAY: u8 = 0x08;
const MARKER_OBJECT_END: u8 = 0x09;
const MARKER_STRICT_ARRAY: u8 = 0x0A;

/// A decoded AMF0 value.
#[derive(Debug, Clone, PartialEq)]
pub enum Amf0Value {
    /// IEEE-754 double (`number`).
    Number(f64),
    /// `boolean`.
    Boolean(bool),
    /// UTF-8 `string`.
    String(String),
    /// Anonymous `object` (ordered key/value pairs).
    Object(Vec<(String, Amf0Value)>),
    /// `ECMA array` — decoded identically to an object.
    EcmaArray(Vec<(String, Amf0Value)>),
    /// `strict array` of values.
    StrictArray(Vec<Amf0Value>),
    /// `null`.
    Null,
    /// `undefined`.
    Undefined,
}

impl Amf0Value {
    /// Borrow the string payload, if this is a [`Amf0Value::String`].
    pub fn as_str(&self) -> Option<&str> {
        match self {
            Amf0Value::String(s) => Some(s),
            _ => None,
        }
    }

    /// Borrow the number payload, if this is a [`Amf0Value::Number`].
    pub fn as_number(&self) -> Option<f64> {
        match self {
            Amf0Value::Number(n) => Some(*n),
            _ => None,
        }
    }

    /// Look up a property by key in an object / ECMA array.
    pub fn get(&self, key: &str) -> Option<&Amf0Value> {
        match self {
            Amf0Value::Object(props) | Amf0Value::EcmaArray(props) => {
                props.iter().find(|(k, _)| k == key).map(|(_, v)| v)
            }
            _ => None,
        }
    }

    /// Encode this value (with its type marker) onto `out`.
    pub fn encode(&self, out: &mut BytesMut) {
        match self {
            Amf0Value::Number(n) => {
                out.put_u8(MARKER_NUMBER);
                out.put_f64(*n);
            }
            Amf0Value::Boolean(b) => {
                out.put_u8(MARKER_BOOLEAN);
                out.put_u8(*b as u8);
            }
            Amf0Value::String(s) => {
                out.put_u8(MARKER_STRING);
                encode_string_body(out, s);
            }
            Amf0Value::Object(props) => {
                out.put_u8(MARKER_OBJECT);
                encode_object_body(out, props);
            }
            Amf0Value::EcmaArray(props) => {
                out.put_u8(MARKER_ECMA_ARRAY);
                out.put_u32(props.len() as u32);
                encode_object_body(out, props);
            }
            Amf0Value::StrictArray(items) => {
                out.put_u8(MARKER_STRICT_ARRAY);
                out.put_u32(items.len() as u32);
                for item in items {
                    item.encode(out);
                }
            }
            Amf0Value::Null => out.put_u8(MARKER_NULL),
            Amf0Value::Undefined => out.put_u8(MARKER_UNDEFINED),
        }
    }
}

/// Build an AMF0 `object` value from `(key, value)` pairs.
pub fn object(props: Vec<(&str, Amf0Value)>) -> Amf0Value {
    Amf0Value::Object(props.into_iter().map(|(k, v)| (k.to_string(), v)).collect())
}

/// Convenience constructor for a string value.
pub fn string(s: impl Into<String>) -> Amf0Value {
    Amf0Value::String(s.into())
}

fn encode_string_body(out: &mut BytesMut, s: &str) {
    out.put_u16(s.len() as u16);
    out.put_slice(s.as_bytes());
}

fn encode_object_body(out: &mut BytesMut, props: &[(String, Amf0Value)]) {
    for (k, v) in props {
        encode_string_body(out, k);
        v.encode(out);
    }
    out.put_slice(&[0x00, 0x00, MARKER_OBJECT_END]); // empty key + object-end marker
}

/// Decode every AMF0 value in `buf` until it is exhausted (the form RTMP command
/// messages take: a flat sequence of values).
pub fn decode_all(buf: &[u8]) -> Vec<Amf0Value> {
    let mut out = Vec::new();
    let mut pos = 0;
    while pos < buf.len() {
        match decode_value(&buf[pos..]) {
            Some((v, used)) if used > 0 => {
                out.push(v);
                pos += used;
            }
            _ => break,
        }
    }
    out
}

/// Decode a single AMF0 value, returning it plus the number of bytes consumed.
pub fn decode_value(buf: &[u8]) -> Option<(Amf0Value, usize)> {
    let marker = *buf.first()?;
    let body = &buf[1..];
    match marker {
        MARKER_NUMBER => {
            let bytes: [u8; 8] = body.get(..8)?.try_into().ok()?;
            Some((Amf0Value::Number(f64::from_be_bytes(bytes)), 9))
        }
        MARKER_BOOLEAN => Some((Amf0Value::Boolean(*body.first()? != 0), 2)),
        MARKER_STRING => {
            let (s, used) = decode_string_body(body)?;
            Some((Amf0Value::String(s), 1 + used))
        }
        MARKER_OBJECT => {
            let (props, used) = decode_object_body(body)?;
            Some((Amf0Value::Object(props), 1 + used))
        }
        MARKER_ECMA_ARRAY => {
            // 4-byte associative-count (advisory) precedes the key/value pairs.
            let count_bytes = body.get(..4)?;
            let _count = u32::from_be_bytes(count_bytes.try_into().ok()?);
            let (props, used) = decode_object_body(&body[4..])?;
            Some((Amf0Value::EcmaArray(props), 1 + 4 + used))
        }
        MARKER_STRICT_ARRAY => {
            let count = u32::from_be_bytes(body.get(..4)?.try_into().ok()?) as usize;
            let mut pos = 4;
            let mut items = Vec::with_capacity(count.min(64));
            for _ in 0..count {
                let (v, used) = decode_value(body.get(pos..)?)?;
                items.push(v);
                pos += used;
            }
            Some((Amf0Value::StrictArray(items), 1 + pos))
        }
        MARKER_NULL => Some((Amf0Value::Null, 1)),
        MARKER_UNDEFINED => Some((Amf0Value::Undefined, 1)),
        _ => None,
    }
}

/// Decode a UTF-8 string body (`u16` length + bytes), returning bytes consumed.
fn decode_string_body(buf: &[u8]) -> Option<(String, usize)> {
    let len = u16::from_be_bytes(buf.get(..2)?.try_into().ok()?) as usize;
    let s = std::str::from_utf8(buf.get(2..2 + len)?).ok()?.to_string();
    Some((s, 2 + len))
}

/// Decode object key/value pairs terminated by the `00 00 09` object-end marker.
fn decode_object_body(buf: &[u8]) -> Option<(Vec<(String, Amf0Value)>, usize)> {
    let mut props = Vec::new();
    let mut pos = 0;
    loop {
        // Object-end is an empty (length-0) key followed by the end marker.
        if buf.get(pos..pos + 3) == Some(&[0x00, 0x00, MARKER_OBJECT_END]) {
            return Some((props, pos + 3));
        }
        let (key, kused) = decode_string_body(buf.get(pos..)?)?;
        pos += kused;
        let (val, vused) = decode_value(buf.get(pos..)?)?;
        pos += vused;
        props.push((key, val));
        if pos >= buf.len() {
            return None; // ran out before the object-end marker
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn roundtrips_a_connect_like_command() {
        let mut buf = BytesMut::new();
        string("connect").encode(&mut buf);
        Amf0Value::Number(1.0).encode(&mut buf);
        object(vec![
            ("app", string("live")),
            ("tcUrl", string("rtmp://host/live")),
            ("fpad", Amf0Value::Boolean(false)),
        ])
        .encode(&mut buf);
        Amf0Value::Null.encode(&mut buf);

        let values = decode_all(&buf);
        assert_eq!(values[0].as_str(), Some("connect"));
        assert_eq!(values[1].as_number(), Some(1.0));
        assert_eq!(values[2].get("app").and_then(|v| v.as_str()), Some("live"));
        assert_eq!(values[2].get("fpad"), Some(&Amf0Value::Boolean(false)));
        assert_eq!(values[3], Amf0Value::Null);
    }

    #[test]
    fn decodes_ecma_array_as_object() {
        let mut buf = BytesMut::new();
        Amf0Value::EcmaArray(vec![("width".into(), Amf0Value::Number(1920.0))]).encode(&mut buf);
        let v = &decode_all(&buf)[0];
        assert_eq!(v.get("width").and_then(|n| n.as_number()), Some(1920.0));
    }

    #[test]
    fn truncated_input_does_not_panic() {
        assert!(decode_value(&[MARKER_STRING, 0x00, 0x05, b'h']).is_none());
        assert!(decode_value(&[]).is_none());
        assert!(decode_all(&[MARKER_NUMBER, 0x01]).is_empty());
    }

    #[test]
    fn random_bytes_never_panic() {
        // AMF0 rides the RTMP control plane (untrusted). A no-panic sweep over
        // seeded pseudo-random buffers; the stable-toolchain analogue of the
        // libfuzzer `amf`-style targets. Reaching here without aborting passes.
        let mut state = 0x1234_5678_9ABC_DEF0u64;
        let mut next = || {
            state ^= state >> 12;
            state ^= state << 25;
            state ^= state >> 27;
            state.wrapping_mul(0x2545_F491_4F6C_DD1D)
        };
        for _ in 0..4000 {
            let len = (next() % 64) as usize;
            let buf: Vec<u8> = (0..len).map(|_| (next() & 0xFF) as u8).collect();
            let _ = decode_all(&buf);
            let _ = decode_value(&buf);
        }
    }
}