arcly-stream 0.1.4

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! The RTMP "simple" handshake (C0/C1/C2 ↔ S0/S1/S2).
//!
//! This is the plain (non-digest) handshake from the Adobe RTMP spec, which
//! every common publisher/player (OBS, FFmpeg, librtmp) interoperates with: the
//! server echoes the client's C1 as S2 and sends its own random S1. The HMAC-SHA256
//! "complex" handshake required by the Flash Player era is intentionally omitted —
//! it pulls in a crypto dependency for no benefit to non-browser clients.

use crate::{Result, StreamError};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

/// RTMP version byte exchanged as C0/S0.
const RTMP_VERSION: u8 = 0x03;
/// Length of the C1/C2/S1/S2 payloads.
const HANDSHAKE_SIZE: usize = 1536;

/// Perform the server side of the handshake on an already-accepted stream.
///
/// Reads C0+C1, writes S0+S1+S2, then reads C2. Returns once the peer is ready
/// to exchange chunks.
pub async fn accept<S>(stream: &mut S) -> Result<()>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    // C0: version.
    let mut c0 = [0u8; 1];
    stream.read_exact(&mut c0).await?;
    if c0[0] != RTMP_VERSION {
        return Err(StreamError::Handshake(format!(
            "unsupported RTMP version {}",
            c0[0]
        )));
    }

    // C1: time(4) + zero(4) + random(1528).
    let mut c1 = [0u8; HANDSHAKE_SIZE];
    stream.read_exact(&mut c1).await?;

    // S0 + S1 + S2 in one write.
    let mut out = Vec::with_capacity(1 + HANDSHAKE_SIZE * 2);
    out.push(RTMP_VERSION); // S0
    out.extend_from_slice(&server_s1()); // S1
    out.extend_from_slice(&c1); // S2 echoes C1 verbatim (simple handshake)
    stream.write_all(&out).await?;
    stream.flush().await?;

    // C2: the client's echo of our S1 — read and discard.
    let mut c2 = [0u8; HANDSHAKE_SIZE];
    stream.read_exact(&mut c2).await?;
    Ok(())
}

/// Build S1: a zero time/zero field followed by deterministic pseudo-random
/// filler. The bytes need not be unpredictable for the simple handshake, so we
/// avoid a crypto-RNG dependency and stay `#![forbid(unsafe_code)]`-clean.
fn server_s1() -> [u8; HANDSHAKE_SIZE] {
    let mut s1 = [0u8; HANDSHAKE_SIZE];
    // Bytes 0..8 stay zero (time + zero). Fill the rest with an xorshift stream.
    let mut state: u32 = 0x9E37_79B9;
    for byte in s1.iter_mut().skip(8) {
        state ^= state << 13;
        state ^= state >> 17;
        state ^= state << 5;
        *byte = (state & 0xFF) as u8;
    }
    s1
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::io::duplex;

    #[tokio::test]
    async fn completes_simple_handshake_and_echoes_c1() {
        let (mut client, mut server) = duplex(8192);

        // Drive the server side.
        let server_task = tokio::spawn(async move {
            accept(&mut server).await.unwrap();
        });

        // Client: send C0 + C1, then read S0 + S1 + S2, then send C2.
        let mut c1 = [0u8; HANDSHAKE_SIZE];
        for (i, b) in c1.iter_mut().enumerate() {
            *b = (i % 251) as u8;
        }
        client.write_u8(RTMP_VERSION).await.unwrap();
        client.write_all(&c1).await.unwrap();

        let mut s0 = [0u8; 1];
        client.read_exact(&mut s0).await.unwrap();
        assert_eq!(s0[0], RTMP_VERSION);
        let mut s1 = [0u8; HANDSHAKE_SIZE];
        client.read_exact(&mut s1).await.unwrap();
        let mut s2 = [0u8; HANDSHAKE_SIZE];
        client.read_exact(&mut s2).await.unwrap();
        assert_eq!(&s2[..], &c1[..], "S2 must echo C1 in the simple handshake");

        client.write_all(&s1).await.unwrap(); // C2 echoes S1
        server_task.await.unwrap();
    }

    #[tokio::test]
    async fn rejects_wrong_version() {
        let (mut a, mut b) = duplex(4096);
        a.write_u8(0xFF).await.unwrap(); // bad C0
        a.write_all(&[0u8; HANDSHAKE_SIZE]).await.unwrap();
        let err = accept(&mut b).await.unwrap_err();
        assert!(matches!(err, StreamError::Handshake(_)));
    }
}