arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Plain **MPEG-TS over UDP** ingest (`feature = "udp"`).
//!
//! The classic contribution feed: an encoder (or `ffmpeg -f mpegts udp://host:port`)
//! blasts 188-byte transport-stream packets in UDP datagrams — typically seven
//! per datagram (1316 bytes) — with no handshake and no reliability layer.
//! [`UdpTsHandler`] binds a UDP socket, feeds each datagram through the shared
//! [`TsDemuxer`], and publishes the resulting elementary access units onto the
//! bus.
//!
//! Unlike SRT there is no connection: the first datagram that yields media opens
//! the publish session, and the session is held until shutdown. This is the
//! lossy, firewall-friendly path for in-network contribution (including
//! multicast — join the group on the bound socket before serving).
//!
//! ```no_run
//! # use arcly_stream::prelude::*;
//! # use arcly_stream::protocol::udp::UdpTsHandler;
//! # async fn run(engine: std::sync::Arc<Engine>) -> arcly_stream::Result<()> {
//! let udp = UdpTsHandler::new("0.0.0.0:1234".parse().unwrap(), StreamKey::new("live", "feed"));
//! engine.serve(vec![Box::new(udp)], tokio_util::sync::CancellationToken::new()).await
//! # }
//! ```

use std::net::SocketAddr;

use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

use crate::inbound::{InboundProtocol, IngestContext, PublishSession};
use crate::protocol::tsdemux::{TsDemuxer, TsTrackKind};
use crate::{CodecId, MediaFrame, Result};

/// An [`InboundProtocol`] that ingests an MPEG-TS elementary stream from UDP
/// datagrams and publishes it to `key`.
pub struct UdpTsHandler {
    bind: SocketAddr,
    key: crate::StreamKey,
    /// UDP receive buffer size. Datagrams larger than this are truncated; 2048
    /// comfortably holds the usual 7×188 = 1316-byte TS bursts.
    recv_buf: usize,
}

impl UdpTsHandler {
    /// Bind `bind` and publish demuxed media to `key`.
    pub fn new(bind: SocketAddr, key: crate::StreamKey) -> Self {
        Self {
            bind,
            key,
            recv_buf: 2048,
        }
    }

    /// Override the UDP receive buffer size (default 2048 bytes).
    pub fn recv_buffer(mut self, bytes: usize) -> Self {
        self.recv_buf = bytes.max(188);
        self
    }
}

#[async_trait]
impl InboundProtocol for UdpTsHandler {
    fn name(&self) -> &'static str {
        "udp"
    }

    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
        use tokio::net::UdpSocket;

        let socket = UdpSocket::bind(self.bind).await?;
        info!(bind = %self.bind, "udp ts listener bound");
        let mut buf = vec![0u8; self.recv_buf];
        let mut demux = TsDemuxer::new();
        let mut session: Option<PublishSession> = None;

        loop {
            let n = tokio::select! {
                _ = shutdown.cancelled() => break,
                r = socket.recv_from(&mut buf) => match r {
                    Ok((n, _from)) => n,
                    Err(e) => {
                        warn!(error = %e, "udp recv failed");
                        continue;
                    }
                }
            };

            for au in demux.push(&buf[..n]) {
                if au.codec == CodecId::Unknown {
                    continue;
                }
                // Open the publish session lazily, on the first real access unit.
                if session.is_none() {
                    session = Some(ctx.open_publish(self.key.clone()).await?);
                }
                let sess = session.as_ref().unwrap();
                let pts = au.pts_ms;
                let mut frame = match au.kind {
                    TsTrackKind::Video => {
                        MediaFrame::new_video(pts, pts, au.data, au.codec, au.keyframe)
                    }
                    TsTrackKind::Audio => MediaFrame::new_audio(pts, au.data, au.codec),
                };
                if au.is_config {
                    frame.flags |= crate::FrameFlags::CONFIG;
                }
                let _ = sess.publish_frame(frame)?;
            }
        }

        if let Some(sess) = session {
            sess.finish().await?;
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::Engine;
    use std::sync::Arc;

    #[tokio::test]
    async fn binds_and_shuts_down_cleanly() {
        let engine: Arc<Engine> = Engine::builder()
            .application(crate::AppSpec::new("live"))
            .build();
        let ctx = IngestContext::new(engine);
        let handler = UdpTsHandler::new(
            "127.0.0.1:0".parse().unwrap(),
            crate::StreamKey::new("live", "feed"),
        );
        let shutdown = CancellationToken::new();

        // serve() must return promptly once the token is cancelled — a regression
        // guard for the graceful-shutdown contract every handler must honor.
        let token = shutdown.clone();
        let task = tokio::spawn(async move { handler.serve(ctx, token).await });
        tokio::task::yield_now().await;
        shutdown.cancel();
        let res = tokio::time::timeout(std::time::Duration::from_secs(5), task)
            .await
            .expect("serve returned after cancel")
            .expect("task joined");
        assert!(res.is_ok(), "clean shutdown");
    }
}