Skip to main content

arcly_stream/protocol/srt/
mod.rs

1//! Native SRT ingest handler (feature `srt`).
2//!
3//! Accepts SRT (Secure Reliable Transport) connections in **listener** mode,
4//! parses the control/data packet stream, and extracts the encapsulated
5//! MPEG-TS payload into elementary media frames that flow onto the engine bus
6//! through the standard [`InboundProtocol`] seam.
7//!
8//! [`InboundProtocol`]: crate::inbound::InboundProtocol
9//!
10//! # Pipeline
11//!
12//! ```text
13//!  UDP datagram ─▶ SrtPacket::parse ─▶ data payload ─▶ TsDemuxer ─▶ AccessUnit
14//!                       │                                              │
15//!                       └─ handshake (induction/conclusion)           ▼
16//!                                                              PublishSession
17//! ```
18//!
19//! The SRT [packet header][SrtPacket] (SRT RFC, draft-sharabayko-srt) and the
20//! [`SrtHandshake`] induction/conclusion exchange are parsed here; the MPEG-TS
21//! bytes ride in SRT *data* packets and are demuxed by [`TsDemuxer`].
22//!
23//! # Scope (per the v0.1.3 design decision)
24//!
25//! Unencrypted transport only. SRT's AES-CTR key exchange (the `KMREQ`/`KMRSP`
26//! handshake extensions) requires a crypto backend and is intentionally out of
27//! scope for the `#![forbid(unsafe_code)]`, dependency-light kernel — an
28//! encrypted feed is rejected at handshake. Basic ACK/NAK keep-alive is handled;
29//! a full ARQ retransmission window is not modeled (the listener tolerates loss
30//! and re-syncs on the next TS/PES boundary).
31
32mod egress;
33mod handshake;
34mod packet;
35
36pub use egress::SrtCaller;
37pub use handshake::{HandshakeType, SrtHandshake};
38pub use packet::{ControlType, SrtPacket};
39// The MPEG-TS demuxer now lives in the shared `protocol::tsdemux` module (also
40// used by `udp` ingest); re-exported here so existing paths keep working.
41pub use crate::protocol::tsdemux::{TsDemuxer, TsPayload, TsTrackKind};
42
43use crate::inbound::{InboundProtocol, IngestContext};
44use crate::{CodecId, MediaFrame, Result, StreamKey};
45use async_trait::async_trait;
46use std::net::SocketAddr;
47use tokio_util::sync::CancellationToken;
48use tracing::{debug, info, warn};
49
50/// SRT ingest worker — binds a UDP listener and demuxes one MPEG-TS feed.
51#[derive(Debug, Clone)]
52pub struct SrtHandler {
53    bind: SocketAddr,
54    key: StreamKey,
55}
56
57impl SrtHandler {
58    /// A listener bound to `bind` that publishes the received feed as `key`.
59    pub fn new(bind: SocketAddr, key: StreamKey) -> Self {
60        Self { bind, key }
61    }
62}
63
64#[async_trait]
65impl InboundProtocol for SrtHandler {
66    fn name(&self) -> &'static str {
67        "srt"
68    }
69
70    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
71        use tokio::net::UdpSocket;
72
73        let socket = UdpSocket::bind(self.bind).await?;
74        info!(bind = %self.bind, "srt listener bound");
75        let mut buf = vec![0u8; 1500];
76        let mut peer: Option<SocketAddr> = None;
77        let mut session: Option<crate::inbound::PublishSession> = None;
78        let mut demux = TsDemuxer::new();
79
80        loop {
81            let (n, from) = tokio::select! {
82                _ = shutdown.cancelled() => break,
83                r = socket.recv_from(&mut buf) => match r {
84                    Ok(v) => v,
85                    Err(e) => { warn!(error = %e, "srt recv failed"); continue; }
86                }
87            };
88            let datagram = &buf[..n];
89            let Some(pkt) = SrtPacket::parse(datagram) else {
90                continue;
91            };
92
93            match pkt {
94                SrtPacket::Control { control_type, .. } => {
95                    // Reply to the induction/conclusion handshake to bring the
96                    // session up; other control packets (ACK/NAK/keepalive) need
97                    // no action for an unencrypted, loss-tolerant TS ingest.
98                    if control_type == ControlType::Handshake {
99                        if let Some(reply) = handshake::respond(datagram) {
100                            let _ = socket.send_to(&reply, from).await;
101                            peer = Some(from);
102                            debug!(%from, "srt handshake answered");
103                        }
104                    }
105                }
106                SrtPacket::Data { payload_offset, .. } => {
107                    if peer != Some(from) {
108                        continue; // ignore data before a handshake from this peer
109                    }
110                    if session.is_none() {
111                        session = Some(ctx.open_publish(self.key.clone()).await?);
112                    }
113                    let sess = session.as_ref().unwrap();
114                    for au in demux.push(&datagram[payload_offset..]) {
115                        if au.codec == CodecId::Unknown {
116                            continue;
117                        }
118                        let pts = au.pts_ms;
119                        let mut frame = match au.kind {
120                            TsTrackKind::Video => {
121                                MediaFrame::new_video(pts, pts, au.data, au.codec, au.keyframe)
122                            }
123                            TsTrackKind::Audio => MediaFrame::new_audio(pts, au.data, au.codec),
124                        };
125                        if au.is_config {
126                            frame.flags |= crate::FrameFlags::CONFIG;
127                        }
128                        let _ = sess.publish_frame(frame)?;
129                    }
130                }
131            }
132        }
133
134        if let Some(sess) = session {
135            sess.finish().await?;
136        }
137        Ok(())
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    #[test]
146    fn handler_reports_name_and_key() {
147        let h = SrtHandler::new(
148            "127.0.0.1:9000".parse().unwrap(),
149            StreamKey::new("live", "feed"),
150        );
151        assert_eq!(h.name(), "srt");
152        assert_eq!(h.key.stream_id.as_str(), "feed");
153    }
154}