arcly-stream 0.1.3

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Native SRT ingest handler (feature `srt`).
//!
//! Accepts SRT (Secure Reliable Transport) connections in **listener** mode,
//! parses the control/data packet stream, and extracts the encapsulated
//! MPEG-TS payload into elementary media frames that flow onto the engine bus
//! through the standard [`InboundProtocol`] seam.
//!
//! [`InboundProtocol`]: crate::inbound::InboundProtocol
//!
//! # Pipeline
//!
//! ```text
//!  UDP datagram ─▶ SrtPacket::parse ─▶ data payload ─▶ TsDemuxer ─▶ AccessUnit
//!                       │                                              │
//!                       └─ handshake (induction/conclusion)           ▼
//!                                                              PublishSession
//! ```
//!
//! The SRT [packet header][SrtPacket] (SRT RFC, draft-sharabayko-srt) and the
//! [`SrtHandshake`] induction/conclusion exchange are parsed here; the MPEG-TS
//! bytes ride in SRT *data* packets and are demuxed by [`TsDemuxer`].
//!
//! # Scope (per the v0.1.3 design decision)
//!
//! Unencrypted transport only. SRT's AES-CTR key exchange (the `KMREQ`/`KMRSP`
//! handshake extensions) requires a crypto backend and is intentionally out of
//! scope for the `#![forbid(unsafe_code)]`, dependency-light kernel — an
//! encrypted feed is rejected at handshake. Basic ACK/NAK keep-alive is handled;
//! a full ARQ retransmission window is not modeled (the listener tolerates loss
//! and re-syncs on the next TS/PES boundary).

mod handshake;
mod packet;
mod ts;

pub use handshake::{HandshakeType, SrtHandshake};
pub use packet::{ControlType, SrtPacket};
pub use ts::{TsDemuxer, TsPayload};

use crate::inbound::{InboundProtocol, IngestContext};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

/// SRT ingest worker — binds a UDP listener and demuxes one MPEG-TS feed.
#[derive(Debug, Clone)]
pub struct SrtHandler {
    bind: SocketAddr,
    key: StreamKey,
}

impl SrtHandler {
    /// A listener bound to `bind` that publishes the received feed as `key`.
    pub fn new(bind: SocketAddr, key: StreamKey) -> Self {
        Self { bind, key }
    }
}

#[async_trait]
impl InboundProtocol for SrtHandler {
    fn name(&self) -> &'static str {
        "srt"
    }

    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
        use tokio::net::UdpSocket;

        let socket = UdpSocket::bind(self.bind).await?;
        info!(bind = %self.bind, "srt listener bound");
        let mut buf = vec![0u8; 1500];
        let mut peer: Option<SocketAddr> = None;
        let mut session: Option<crate::inbound::PublishSession> = None;
        let mut demux = TsDemuxer::new();

        loop {
            let (n, from) = tokio::select! {
                _ = shutdown.cancelled() => break,
                r = socket.recv_from(&mut buf) => match r {
                    Ok(v) => v,
                    Err(e) => { warn!(error = %e, "srt recv failed"); continue; }
                }
            };
            let datagram = &buf[..n];
            let Some(pkt) = SrtPacket::parse(datagram) else {
                continue;
            };

            match pkt {
                SrtPacket::Control { control_type, .. } => {
                    // Reply to the induction/conclusion handshake to bring the
                    // session up; other control packets (ACK/NAK/keepalive) need
                    // no action for an unencrypted, loss-tolerant TS ingest.
                    if control_type == ControlType::Handshake {
                        if let Some(reply) = handshake::respond(datagram) {
                            let _ = socket.send_to(&reply, from).await;
                            peer = Some(from);
                            debug!(%from, "srt handshake answered");
                        }
                    }
                }
                SrtPacket::Data { payload_offset, .. } => {
                    if peer != Some(from) {
                        continue; // ignore data before a handshake from this peer
                    }
                    if session.is_none() {
                        session = Some(ctx.open_publish(self.key.clone()).await?);
                    }
                    let sess = session.as_ref().unwrap();
                    for au in demux.push(&datagram[payload_offset..]) {
                        if au.codec == CodecId::Unknown {
                            continue;
                        }
                        let pts = au.pts_ms;
                        let frame = MediaFrame::new_video(pts, pts, au.data, au.codec, au.keyframe);
                        let _ = sess.publish_frame(frame)?;
                    }
                }
            }
        }

        if let Some(sess) = session {
            sess.finish().await?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn handler_reports_name_and_key() {
        let h = SrtHandler::new(
            "127.0.0.1:9000".parse().unwrap(),
            StreamKey::new("live", "feed"),
        );
        assert_eq!(h.name(), "srt");
        assert_eq!(h.key.stream_id.as_str(), "feed");
    }
}