arcly-stream 0.1.2

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS) — runtime, config, and metrics free.
Documentation
//! FLV audio/video tag bodies as carried inside RTMP message payloads.
//!
//! Handles the two codecs every RTMP deployment uses — **H.264/AVC** video and
//! **AAC** audio — in both directions:
//!
//! * **Ingest:** decode the AVC decoder-configuration record and AudioSpecificConfig,
//!   turn AVCC NAL units into Annex-B and raw AAC into ADTS, so the engine stores
//!   frames in a player-ready elementary form (what [`MpegTsMuxer`] consumes).
//! * **Egress (play):** rebuild the configuration records and FLV tag bodies from
//!   the engine's Annex-B / ADTS frames.
//!
//! [`MpegTsMuxer`]: crate::packager::MpegTsMuxer

use bytes::{Buf, BufMut, Bytes, BytesMut};

/// FLV codec id for AVC (H.264) in a video tag.
pub const VIDEO_CODEC_AVC: u8 = 7;
/// FLV sound format for AAC in an audio tag.
pub const AUDIO_FORMAT_AAC: u8 = 10;

/// FLV `FrameType` nibble: a keyframe (IDR).
const FRAME_TYPE_KEY: u8 = 1;
/// FLV `FrameType` nibble: an inter (delta) frame.
const FRAME_TYPE_INTER: u8 = 2;

/// `AVCPacketType` / `AACPacketType`: sequence (decoder-config) header.
pub const PKT_SEQUENCE_HEADER: u8 = 0;
/// `AVCPacketType` / `AACPacketType`: raw NALU / raw AAC.
pub const PKT_RAW: u8 = 1;

// ── Video: AVCDecoderConfigurationRecord ────────────────────────────────────

/// Parsed AVC decoder configuration (the `avcC` record from a video sequence
/// header), retaining the parameter sets needed to (a) build Annex-B config for
/// the engine and (b) self-contain each keyframe.
#[derive(Debug, Clone, Default)]
pub struct AvcConfig {
    /// NAL length prefix width in bytes (`lengthSizeMinusOne + 1`).
    pub nal_length_size: usize,
    /// Sequence parameter sets.
    pub sps: Vec<Vec<u8>>,
    /// Picture parameter sets.
    pub pps: Vec<Vec<u8>>,
}

impl AvcConfig {
    /// Parse an AVCDecoderConfigurationRecord.
    pub fn parse(rec: &[u8]) -> Option<Self> {
        if rec.len() < 7 || rec[0] != 1 {
            return None;
        }
        let nal_length_size = (rec[4] & 0x03) as usize + 1;
        let mut pos = 5;
        let num_sps = (rec.get(pos)? & 0x1F) as usize;
        pos += 1;
        let mut sps = Vec::with_capacity(num_sps);
        for _ in 0..num_sps {
            let len = u16::from_be_bytes(rec.get(pos..pos + 2)?.try_into().ok()?) as usize;
            pos += 2;
            sps.push(rec.get(pos..pos + len)?.to_vec());
            pos += len;
        }
        let num_pps = *rec.get(pos)? as usize;
        pos += 1;
        let mut pps = Vec::with_capacity(num_pps);
        for _ in 0..num_pps {
            let len = u16::from_be_bytes(rec.get(pos..pos + 2)?.try_into().ok()?) as usize;
            pos += 2;
            pps.push(rec.get(pos..pos + len)?.to_vec());
            pos += len;
        }
        Some(Self {
            nal_length_size,
            sps,
            pps,
        })
    }

    /// The SPS/PPS as an Annex-B access unit (each NAL prefixed with `00 00 00 01`).
    pub fn to_annexb(&self) -> Bytes {
        let mut out = BytesMut::new();
        for nal in self.sps.iter().chain(self.pps.iter()) {
            out.put_slice(&[0, 0, 0, 1]);
            out.put_slice(nal);
        }
        out.freeze()
    }

    /// Rebuild an AVCDecoderConfigurationRecord from the stored parameter sets
    /// (used when serving a `play` request).
    pub fn to_avc_record(&self) -> Bytes {
        let sps0 = self
            .sps
            .first()
            .map(|s| s.as_slice())
            .unwrap_or(&[0, 0, 0, 0]);
        let mut out = BytesMut::new();
        out.put_u8(1); // configurationVersion
        out.put_u8(*sps0.get(1).unwrap_or(&0)); // AVCProfileIndication
        out.put_u8(*sps0.get(2).unwrap_or(&0)); // profile_compatibility
        out.put_u8(*sps0.get(3).unwrap_or(&0)); // AVCLevelIndication
        out.put_u8(0xFF); // 6 reserved bits + lengthSizeMinusOne = 3
        out.put_u8(0xE0 | (self.sps.len() as u8 & 0x1F));
        for s in &self.sps {
            out.put_u16(s.len() as u16);
            out.put_slice(s);
        }
        out.put_u8(self.pps.len() as u8);
        for p in &self.pps {
            out.put_u16(p.len() as u16);
            out.put_slice(p);
        }
        out.freeze()
    }

    /// Build an Annex-B access unit from this tag's length-prefixed (AVCC) NAL
    /// data, optionally prepending SPS/PPS so a keyframe is self-decodable.
    pub fn avcc_to_annexb(&self, avcc: &[u8], prepend_params: bool) -> Option<Bytes> {
        let body = crate::codec::h264::avcc_to_annexb(avcc, self.nal_length_size)?;
        if prepend_params {
            let mut out = BytesMut::with_capacity(body.len() + 64);
            out.put(self.to_annexb());
            out.put(body);
            Some(out.freeze())
        } else {
            Some(body)
        }
    }
}

/// A parsed video tag header: codec, frame type, AVC packet type, and the
/// composition-time offset (PTS − DTS, in milliseconds).
pub struct VideoTagHeader {
    pub is_keyframe: bool,
    pub avc_packet_type: u8,
    pub composition_time: i32,
    /// Offset of the body following the 5-byte AVC header.
    pub body_offset: usize,
}

/// Parse the fixed video-tag header. Returns `None` for non-AVC tags.
pub fn parse_video_header(payload: &[u8]) -> Option<VideoTagHeader> {
    let b0 = *payload.first()?;
    let frame_type = b0 >> 4;
    let codec_id = b0 & 0x0F;
    if codec_id != VIDEO_CODEC_AVC {
        return None;
    }
    let avc_packet_type = *payload.get(1)?;
    // 24-bit signed composition time offset (big-endian), then sign-extended.
    let raw = payload.get(2..5)?.get_uint(3) as i32;
    let composition_time = (raw << 8) >> 8; // sign-extend from 24 bits
    Some(VideoTagHeader {
        is_keyframe: frame_type == FRAME_TYPE_KEY,
        avc_packet_type,
        composition_time,
        body_offset: 5,
    })
}

/// Build a video tag body for egress: `[frametype|codec][avc_pkt_type][cts:24][body]`.
pub fn build_video_tag(is_keyframe: bool, avc_packet_type: u8, cts: i32, body: &[u8]) -> Bytes {
    let mut out = BytesMut::with_capacity(body.len() + 5);
    let frame_type = if is_keyframe {
        FRAME_TYPE_KEY
    } else {
        FRAME_TYPE_INTER
    };
    out.put_u8((frame_type << 4) | VIDEO_CODEC_AVC);
    out.put_u8(avc_packet_type);
    out.put_uint((cts & 0x00FF_FFFF) as u64, 3); // 24-bit big-endian CTS
    out.put_slice(body);
    out.freeze()
}

// ── Audio: AudioSpecificConfig ↔ ADTS ───────────────────────────────────────

/// Parsed AAC `AudioSpecificConfig` (the audio sequence header).
#[derive(Debug, Clone, Copy, Default)]
pub struct AudioConfig {
    /// MPEG-4 audio object type (2 = AAC-LC).
    pub object_type: u8,
    /// MPEG-4 sampling-frequency index (4 = 44.1 kHz, 3 = 48 kHz, …).
    pub freq_index: u8,
    /// Channel configuration (1 = mono, 2 = stereo, …).
    pub channels: u8,
}

impl AudioConfig {
    /// Parse a 2-byte (or longer) AudioSpecificConfig.
    pub fn parse(asc: &[u8]) -> Option<Self> {
        if asc.len() < 2 {
            return None;
        }
        let object_type = asc[0] >> 3;
        let freq_index = ((asc[0] & 0x07) << 1) | (asc[1] >> 7);
        let channels = (asc[1] >> 3) & 0x0F;
        Some(Self {
            object_type,
            freq_index,
            channels,
        })
    }

    /// Reconstruct the 2-byte AudioSpecificConfig.
    pub fn to_asc(self) -> Bytes {
        let b0 = (self.object_type << 3) | (self.freq_index >> 1);
        let b1 = ((self.freq_index & 0x01) << 7) | ((self.channels & 0x0F) << 3);
        Bytes::copy_from_slice(&[b0, b1])
    }

    /// Prefix raw AAC with a 7-byte ADTS header (what MPEG-TS carries).
    pub fn to_adts(self, raw: &[u8]) -> Bytes {
        let frame_len = raw.len() + 7;
        let profile = self.object_type.saturating_sub(1); // ADTS profile = AOT − 1
        let hdr = [
            0xFF,
            0xF1, // MPEG-4, layer 0, no CRC
            (profile << 6) | (self.freq_index << 2) | ((self.channels >> 2) & 0x01),
            ((self.channels & 0x03) << 6) | ((frame_len >> 11) & 0x03) as u8,
            ((frame_len >> 3) & 0xFF) as u8,
            (((frame_len & 0x07) << 5) as u8) | 0x1F,
            0xFC,
        ];
        let mut out = BytesMut::with_capacity(frame_len);
        out.put_slice(&hdr);
        out.put_slice(raw);
        out.freeze()
    }

    /// Derive the config from a 7-byte ADTS header (used on the egress path,
    /// where the engine stored ADTS frames).
    pub fn from_adts(hdr: &[u8]) -> Option<Self> {
        if hdr.len() < 7 || hdr[0] != 0xFF || (hdr[1] & 0xF0) != 0xF0 {
            return None;
        }
        let profile = (hdr[2] >> 6) & 0x03;
        let freq_index = (hdr[2] >> 2) & 0x0F;
        let channels = ((hdr[2] & 0x01) << 2) | (hdr[3] >> 6);
        Some(Self {
            object_type: profile + 1,
            freq_index,
            channels,
        })
    }
}

/// Build an audio tag body for egress: `[soundfmt|rate|size|type][aac_pkt][body]`.
/// The sound-rate/size/type nibble is fixed to AAC's conventional `0xF` (44 kHz,
/// 16-bit, stereo) — decoders read the real values from the AudioSpecificConfig.
pub fn build_audio_tag(aac_packet_type: u8, body: &[u8]) -> Bytes {
    let mut out = BytesMut::with_capacity(body.len() + 2);
    out.put_u8((AUDIO_FORMAT_AAC << 4) | 0x0F);
    out.put_u8(aac_packet_type);
    out.put_slice(body);
    out.freeze()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn avc_config_roundtrips_through_annexb_and_record() {
        let sps = vec![0x67, 0x42, 0x00, 0x1F, 0xAA];
        let pps = vec![0x68, 0xCE, 0x3C, 0x80];
        let rec = {
            let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
            r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
            r.extend_from_slice(&sps);
            r.push(1);
            r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
            r.extend_from_slice(&pps);
            r
        };
        let cfg = AvcConfig::parse(&rec).expect("parse");
        assert_eq!(cfg.nal_length_size, 4);
        assert_eq!(cfg.sps, vec![sps.clone()]);
        assert_eq!(cfg.pps, vec![pps.clone()]);

        // Annex-B form starts with a 4-byte start code then the SPS.
        let annexb = cfg.to_annexb();
        assert_eq!(&annexb[..5], &[0, 0, 0, 1, 0x67]);

        // Rebuilt record parses back to the same parameter sets.
        let rebuilt = AvcConfig::parse(&cfg.to_avc_record()).expect("reparse");
        assert_eq!(rebuilt.sps, cfg.sps);
        assert_eq!(rebuilt.pps, cfg.pps);
    }

    #[test]
    fn avcc_nalus_become_annexb_with_optional_params() {
        let cfg = AvcConfig {
            nal_length_size: 4,
            sps: vec![vec![0x67, 0x42]],
            pps: vec![vec![0x68, 0xCE]],
        };
        // One AVCC NAL: length 2 + [0x65, 0xAA].
        let avcc = [0, 0, 0, 2, 0x65, 0xAA];
        let plain = cfg.avcc_to_annexb(&avcc, false).unwrap();
        assert_eq!(&plain[..], &[0, 0, 0, 1, 0x65, 0xAA]);
        let with_params = cfg.avcc_to_annexb(&avcc, true).unwrap();
        // SPS + PPS + slice, each start-code prefixed.
        assert_eq!(&with_params[..5], &[0, 0, 0, 1, 0x67]);
        assert!(with_params.ends_with(&[0x65, 0xAA]));
    }

    #[test]
    fn audio_config_asc_and_adts_roundtrip() {
        // AAC-LC (AOT 2), 44.1 kHz (index 4), stereo.
        let cfg = AudioConfig {
            object_type: 2,
            freq_index: 4,
            channels: 2,
        };
        let asc = cfg.to_asc();
        let reparsed = AudioConfig::parse(&asc).unwrap();
        assert_eq!(reparsed.object_type, 2);
        assert_eq!(reparsed.freq_index, 4);
        assert_eq!(reparsed.channels, 2);

        let adts = cfg.to_adts(&[0x11, 0x22, 0x33]);
        assert_eq!(adts.len(), 10); // 7 header + 3 payload
        assert_eq!(adts[0], 0xFF);
        let from_hdr = AudioConfig::from_adts(&adts).unwrap();
        assert_eq!(from_hdr.freq_index, 4);
        assert_eq!(from_hdr.channels, 2);
        assert_eq!(from_hdr.object_type, 2);
    }

    #[test]
    fn video_header_parses_keyframe_and_cts() {
        // keyframe (1) + AVC (7), pkt type 1, cts = 0x000040 (64).
        let tag = build_video_tag(true, PKT_RAW, 64, &[0xAB]);
        let h = parse_video_header(&tag).unwrap();
        assert!(h.is_keyframe);
        assert_eq!(h.avc_packet_type, PKT_RAW);
        assert_eq!(h.composition_time, 64);
        assert_eq!(&tag[h.body_offset..], &[0xAB]);
    }

    #[test]
    fn negative_composition_time_sign_extends() {
        let tag = build_video_tag(false, PKT_RAW, -10, &[]);
        let h = parse_video_header(&tag).unwrap();
        assert_eq!(h.composition_time, -10);
    }
}