Skip to main content

arcly_stream/protocol/srt/
mod.rs

1//! Native SRT ingest handler (feature `srt`).
2//!
3//! Accepts SRT (Secure Reliable Transport) connections in **listener** mode,
4//! parses the control/data packet stream, and extracts the encapsulated
5//! MPEG-TS payload into elementary media frames that flow onto the engine bus
6//! through the standard [`InboundProtocol`] seam.
7//!
8//! [`InboundProtocol`]: crate::inbound::InboundProtocol
9//!
10//! # Pipeline
11//!
12//! ```text
13//!  UDP datagram ─▶ SrtPacket::parse ─▶ data payload ─▶ TsDemuxer ─▶ AccessUnit
14//!                       │                                              │
15//!                       └─ handshake (induction/conclusion)           ▼
16//!                                                              PublishSession
17//! ```
18//!
19//! The SRT [packet header][SrtPacket] (SRT RFC, draft-sharabayko-srt) and the
20//! [`SrtHandshake`] induction/conclusion exchange are parsed here; the MPEG-TS
21//! bytes ride in SRT *data* packets and are demuxed by [`TsDemuxer`].
22//!
23//! # Scope (per the v0.1.3 design decision)
24//!
25//! Unencrypted transport only. SRT's AES-CTR key exchange (the `KMREQ`/`KMRSP`
26//! handshake extensions) requires a crypto backend and is intentionally out of
27//! scope for the `#![forbid(unsafe_code)]`, dependency-light kernel — an
28//! encrypted feed is rejected at handshake. Basic ACK/NAK keep-alive is handled;
29//! a full ARQ retransmission window is not modeled (the listener tolerates loss
30//! and re-syncs on the next TS/PES boundary).
31
32mod handshake;
33mod packet;
34mod ts;
35
36pub use handshake::{HandshakeType, SrtHandshake};
37pub use packet::{ControlType, SrtPacket};
38pub use ts::{TsDemuxer, TsPayload, TsTrackKind};
39
40use crate::inbound::{InboundProtocol, IngestContext};
41use crate::{CodecId, MediaFrame, Result, StreamKey};
42use async_trait::async_trait;
43use std::net::SocketAddr;
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, info, warn};
46
47/// SRT ingest worker — binds a UDP listener and demuxes one MPEG-TS feed.
48#[derive(Debug, Clone)]
49pub struct SrtHandler {
50    bind: SocketAddr,
51    key: StreamKey,
52}
53
54impl SrtHandler {
55    /// A listener bound to `bind` that publishes the received feed as `key`.
56    pub fn new(bind: SocketAddr, key: StreamKey) -> Self {
57        Self { bind, key }
58    }
59}
60
61#[async_trait]
62impl InboundProtocol for SrtHandler {
63    fn name(&self) -> &'static str {
64        "srt"
65    }
66
67    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
68        use tokio::net::UdpSocket;
69
70        let socket = UdpSocket::bind(self.bind).await?;
71        info!(bind = %self.bind, "srt listener bound");
72        let mut buf = vec![0u8; 1500];
73        let mut peer: Option<SocketAddr> = None;
74        let mut session: Option<crate::inbound::PublishSession> = None;
75        let mut demux = TsDemuxer::new();
76
77        loop {
78            let (n, from) = tokio::select! {
79                _ = shutdown.cancelled() => break,
80                r = socket.recv_from(&mut buf) => match r {
81                    Ok(v) => v,
82                    Err(e) => { warn!(error = %e, "srt recv failed"); continue; }
83                }
84            };
85            let datagram = &buf[..n];
86            let Some(pkt) = SrtPacket::parse(datagram) else {
87                continue;
88            };
89
90            match pkt {
91                SrtPacket::Control { control_type, .. } => {
92                    // Reply to the induction/conclusion handshake to bring the
93                    // session up; other control packets (ACK/NAK/keepalive) need
94                    // no action for an unencrypted, loss-tolerant TS ingest.
95                    if control_type == ControlType::Handshake {
96                        if let Some(reply) = handshake::respond(datagram) {
97                            let _ = socket.send_to(&reply, from).await;
98                            peer = Some(from);
99                            debug!(%from, "srt handshake answered");
100                        }
101                    }
102                }
103                SrtPacket::Data { payload_offset, .. } => {
104                    if peer != Some(from) {
105                        continue; // ignore data before a handshake from this peer
106                    }
107                    if session.is_none() {
108                        session = Some(ctx.open_publish(self.key.clone()).await?);
109                    }
110                    let sess = session.as_ref().unwrap();
111                    for au in demux.push(&datagram[payload_offset..]) {
112                        if au.codec == CodecId::Unknown {
113                            continue;
114                        }
115                        let pts = au.pts_ms;
116                        let frame = match au.kind {
117                            TsTrackKind::Video => {
118                                MediaFrame::new_video(pts, pts, au.data, au.codec, au.keyframe)
119                            }
120                            TsTrackKind::Audio => MediaFrame::new_audio(pts, au.data, au.codec),
121                        };
122                        let _ = sess.publish_frame(frame)?;
123                    }
124                }
125            }
126        }
127
128        if let Some(sess) = session {
129            sess.finish().await?;
130        }
131        Ok(())
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn handler_reports_name_and_key() {
141        let h = SrtHandler::new(
142            "127.0.0.1:9000".parse().unwrap(),
143            StreamKey::new("live", "feed"),
144        );
145        assert_eq!(h.name(), "srt");
146        assert_eq!(h.key.stream_id.as_str(), "feed");
147    }
148}