arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! SRT handshake parsing and a minimal listener-side responder.
//!
//! The handshake control payload follows the 16-byte packet header and carries
//! the version, encryption/extension fields, sequence number, MTU/window sizes,
//! the [request type][HandshakeType], the caller's socket id, and a SYN cookie
//! (draft-sharabayko-srt §3.2.1).

/// SRT handshake request type (the 32-bit `Handshake Type` field).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum HandshakeType {
    /// `INDUCTION` (1) — caller's first packet; listener replies with a cookie.
    Induction,
    /// `CONCLUSION` (0xFFFFFFFF) — caller echoes the cookie to finish setup.
    Conclusion,
    /// `WAVEAHAND` (0) — rendezvous probe.
    WaveAHand,
    /// `AGREEMENT` (0xFFFFFFFE) — rendezvous agreement.
    Agreement,
    /// Any other / error code.
    Other(u32),
}

impl HandshakeType {
    fn from_u32(v: u32) -> HandshakeType {
        match v {
            1 => HandshakeType::Induction,
            0xFFFF_FFFF => HandshakeType::Conclusion,
            0 => HandshakeType::WaveAHand,
            0xFFFF_FFFE => HandshakeType::Agreement,
            other => HandshakeType::Other(other),
        }
    }

    fn to_u32(self) -> u32 {
        match self {
            HandshakeType::Induction => 1,
            HandshakeType::Conclusion => 0xFFFF_FFFF,
            HandshakeType::WaveAHand => 0,
            HandshakeType::Agreement => 0xFFFF_FFFE,
            HandshakeType::Other(v) => v,
        }
    }
}

/// The fields of an SRT handshake this listener inspects.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SrtHandshake {
    /// Protocol version (4 for HSv4, 5 for HSv5).
    pub version: u32,
    /// Encryption field — non-zero means the caller wants an encrypted link,
    /// which this build rejects (see the module-level scope note).
    pub encryption: u16,
    /// Initial packet sequence number.
    pub initial_sequence: u32,
    /// The handshake request type.
    pub handshake_type: HandshakeType,
    /// The caller's SRT socket id.
    pub socket_id: u32,
    /// SYN cookie (0 on the caller's induction request).
    pub cookie: u32,
}

impl SrtHandshake {
    /// Offset of the handshake payload within a control datagram (past the
    /// 16-byte SRT packet header).
    const PAYLOAD: usize = 16;

    /// Parse the handshake from a full control datagram. Returns `None` if the
    /// datagram is too short to contain a handshake body.
    pub fn parse(datagram: &[u8]) -> Option<SrtHandshake> {
        let b = datagram.get(Self::PAYLOAD..)?;
        if b.len() < 32 {
            return None;
        }
        let w = |i: usize| u32::from_be_bytes([b[i], b[i + 1], b[i + 2], b[i + 3]]);
        Some(SrtHandshake {
            version: w(0),
            encryption: u16::from_be_bytes([b[4], b[5]]),
            initial_sequence: w(8),
            handshake_type: HandshakeType::from_u32(w(20)),
            socket_id: w(24),
            cookie: w(28),
        })
    }

    /// Whether the caller requested an encrypted link (unsupported here).
    pub fn wants_encryption(&self) -> bool {
        self.encryption != 0
    }
}

/// A deterministic-but-opaque SYN cookie derived from the caller's socket id.
/// A production listener would mix in the peer address and a per-boot secret;
/// for an unencrypted loss-tolerant ingest the anti-spoofing value is modest, so
/// a stable derivation keeps the responder dependency-free.
fn syn_cookie(socket_id: u32) -> u32 {
    socket_id
        .rotate_left(13)
        .wrapping_mul(0x9E37_79B1)
        .wrapping_add(0x5247_5421)
}

/// Build a listener response to a caller's handshake datagram, or `None` if the
/// datagram is not a parseable handshake or requests encryption.
///
/// For an `INDUCTION` request the response echoes the body with a freshly
/// derived SYN cookie installed; for `CONCLUSION` it echoes the body to confirm
/// agreement. The returned bytes are ready to send back to the caller.
pub fn respond(datagram: &[u8]) -> Option<Vec<u8>> {
    let hs = SrtHandshake::parse(datagram)?;
    if hs.wants_encryption() {
        return None; // encrypted SRT is out of scope; drop the handshake
    }
    let mut reply = datagram.to_vec();
    let cookie = match hs.handshake_type {
        HandshakeType::Induction => syn_cookie(hs.socket_id),
        HandshakeType::Conclusion => hs.cookie,
        _ => return None,
    };
    // Install the cookie at its offset (payload + 28).
    let at = SrtHandshake::PAYLOAD + 28;
    reply
        .get_mut(at..at + 4)?
        .copy_from_slice(&cookie.to_be_bytes());
    Some(reply)
}

/// Build a caller-side handshake control datagram (the **egress relay** uses
/// this to dial an SRT listener). Mirrors the 32-byte HSv5 body this crate's
/// [`respond`] listener understands.
///
/// Note: this is the minimal handshake subset (no `HSREQ`/`KMREQ` extension
/// blocks), so it interoperates with this crate's own listener; full extension
/// interop with third-party SRT stacks is a follow-up.
pub fn caller_handshake(
    req_type: HandshakeType,
    socket_id: u32,
    initial_sequence: u32,
    cookie: u32,
) -> Vec<u8> {
    let mut d = vec![0u8; 16]; // control header: control flag set, type 0x0000
    d[0] = 0x80;
    let mut body = vec![0u8; 32];
    body[0..4].copy_from_slice(&5u32.to_be_bytes()); // version HSv5
                                                     // bytes 4..8: encryption(0) + extension(0)
    body[8..12].copy_from_slice(&initial_sequence.to_be_bytes());
    body[12..16].copy_from_slice(&1500u32.to_be_bytes()); // MTU
    body[16..20].copy_from_slice(&8192u32.to_be_bytes()); // flow-window
    body[20..24].copy_from_slice(&req_type.to_u32().to_be_bytes());
    body[24..28].copy_from_slice(&socket_id.to_be_bytes());
    body[28..32].copy_from_slice(&cookie.to_be_bytes());
    d.extend_from_slice(&body);
    d
}

/// The caller's first handshake (`INDUCTION`, cookie 0).
pub fn caller_induction(socket_id: u32, initial_sequence: u32) -> Vec<u8> {
    caller_handshake(HandshakeType::Induction, socket_id, initial_sequence, 0)
}

/// The caller's second handshake (`CONCLUSION`, echoing the listener's cookie).
pub fn caller_conclusion(socket_id: u32, initial_sequence: u32, cookie: u32) -> Vec<u8> {
    caller_handshake(
        HandshakeType::Conclusion,
        socket_id,
        initial_sequence,
        cookie,
    )
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Build a control datagram with a handshake body for the given type/enc.
    fn handshake_datagram(req_type: u32, encryption: u16) -> Vec<u8> {
        let mut d = vec![0u8; 16]; // control header
        d[0] = 0x80; // control flag
        let mut body = vec![0u8; 32];
        body[0..4].copy_from_slice(&5u32.to_be_bytes()); // version 5
        body[4..6].copy_from_slice(&encryption.to_be_bytes());
        body[20..24].copy_from_slice(&req_type.to_be_bytes());
        body[24..28].copy_from_slice(&0xABCD_1234u32.to_be_bytes()); // socket id
        d.extend_from_slice(&body);
        d
    }

    #[test]
    fn parses_induction_handshake() {
        let d = handshake_datagram(1, 0);
        let hs = SrtHandshake::parse(&d).unwrap();
        assert_eq!(hs.version, 5);
        assert_eq!(hs.handshake_type, HandshakeType::Induction);
        assert_eq!(hs.socket_id, 0xABCD_1234);
        assert!(!hs.wants_encryption());
    }

    #[test]
    fn induction_response_installs_nonzero_cookie() {
        let d = handshake_datagram(1, 0);
        let reply = respond(&d).unwrap();
        let parsed = SrtHandshake::parse(&reply).unwrap();
        assert_ne!(parsed.cookie, 0, "cookie installed in induction response");
    }

    #[test]
    fn encrypted_handshake_is_rejected() {
        let d = handshake_datagram(1, 0x0002);
        assert!(respond(&d).is_none());
    }

    #[test]
    fn non_handshake_request_type_has_no_response() {
        let d = handshake_datagram(0, 0); // WAVEAHAND
        assert!(respond(&d).is_none());
    }

    #[test]
    fn caller_handshake_loops_through_listener() {
        // Full in-process loopback: caller INDUCTION → listener cookie →
        // caller CONCLUSION → listener agreement, all over the wire format.
        let induction = caller_induction(0x0BAD_F00D, 42);
        let hs = SrtHandshake::parse(&induction).unwrap();
        assert_eq!(hs.handshake_type, HandshakeType::Induction);
        assert_eq!(hs.version, 5);
        assert_eq!(hs.socket_id, 0x0BAD_F00D);

        let resp = respond(&induction).expect("listener induction reply");
        let cookie = SrtHandshake::parse(&resp).unwrap().cookie;
        assert_ne!(cookie, 0, "listener installed a cookie");

        let conclusion = caller_conclusion(0x0BAD_F00D, 42, cookie);
        let chs = SrtHandshake::parse(&conclusion).unwrap();
        assert_eq!(chs.handshake_type, HandshakeType::Conclusion);
        assert_eq!(chs.cookie, cookie, "caller echoes the cookie");

        let agree = respond(&conclusion).expect("listener conclusion reply");
        assert_eq!(SrtHandshake::parse(&agree).unwrap().cookie, cookie);
    }
}