arcly_stream/protocol/srt/mod.rs
1//! Native SRT ingest handler (feature `srt`).
2//!
3//! Accepts SRT (Secure Reliable Transport) connections in **listener** mode,
4//! parses the control/data packet stream, and extracts the encapsulated
5//! MPEG-TS payload into elementary media frames that flow onto the engine bus
6//! through the standard [`InboundProtocol`] seam.
7//!
8//! [`InboundProtocol`]: crate::inbound::InboundProtocol
9//!
10//! # Pipeline
11//!
12//! ```text
13//! UDP datagram ─▶ SrtPacket::parse ─▶ data payload ─▶ TsDemuxer ─▶ AccessUnit
14//! │ │
15//! └─ handshake (induction/conclusion) ▼
16//! PublishSession
17//! ```
18//!
19//! The SRT [packet header][SrtPacket] (SRT RFC, draft-sharabayko-srt) and the
20//! [`SrtHandshake`] induction/conclusion exchange are parsed here; the MPEG-TS
21//! bytes ride in SRT *data* packets and are demuxed by [`TsDemuxer`].
22//!
23//! # Scope (per the v0.1.3 design decision)
24//!
25//! Unencrypted transport only. SRT's AES-CTR key exchange (the `KMREQ`/`KMRSP`
26//! handshake extensions) requires a crypto backend and is intentionally out of
27//! scope for the `#![forbid(unsafe_code)]`, dependency-light kernel — an
28//! encrypted feed is rejected at handshake. Basic ACK/NAK keep-alive is handled;
29//! a full ARQ retransmission window is not modeled (the listener tolerates loss
30//! and re-syncs on the next TS/PES boundary).
31
32mod handshake;
33mod packet;
34mod ts;
35
36pub use handshake::{HandshakeType, SrtHandshake};
37pub use packet::{ControlType, SrtPacket};
38pub use ts::{TsDemuxer, TsPayload, TsTrackKind};
39
40use crate::inbound::{InboundProtocol, IngestContext};
41use crate::{CodecId, MediaFrame, Result, StreamKey};
42use async_trait::async_trait;
43use std::net::SocketAddr;
44use tokio_util::sync::CancellationToken;
45use tracing::{debug, info, warn};
46
47/// SRT ingest worker — binds a UDP listener and demuxes one MPEG-TS feed.
48#[derive(Debug, Clone)]
49pub struct SrtHandler {
50 bind: SocketAddr,
51 key: StreamKey,
52}
53
54impl SrtHandler {
55 /// A listener bound to `bind` that publishes the received feed as `key`.
56 pub fn new(bind: SocketAddr, key: StreamKey) -> Self {
57 Self { bind, key }
58 }
59}
60
61#[async_trait]
62impl InboundProtocol for SrtHandler {
63 fn name(&self) -> &'static str {
64 "srt"
65 }
66
67 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
68 use tokio::net::UdpSocket;
69
70 let socket = UdpSocket::bind(self.bind).await?;
71 info!(bind = %self.bind, "srt listener bound");
72 let mut buf = vec![0u8; 1500];
73 let mut peer: Option<SocketAddr> = None;
74 let mut session: Option<crate::inbound::PublishSession> = None;
75 let mut demux = TsDemuxer::new();
76
77 loop {
78 let (n, from) = tokio::select! {
79 _ = shutdown.cancelled() => break,
80 r = socket.recv_from(&mut buf) => match r {
81 Ok(v) => v,
82 Err(e) => { warn!(error = %e, "srt recv failed"); continue; }
83 }
84 };
85 let datagram = &buf[..n];
86 let Some(pkt) = SrtPacket::parse(datagram) else {
87 continue;
88 };
89
90 match pkt {
91 SrtPacket::Control { control_type, .. } => {
92 // Reply to the induction/conclusion handshake to bring the
93 // session up; other control packets (ACK/NAK/keepalive) need
94 // no action for an unencrypted, loss-tolerant TS ingest.
95 if control_type == ControlType::Handshake {
96 if let Some(reply) = handshake::respond(datagram) {
97 let _ = socket.send_to(&reply, from).await;
98 peer = Some(from);
99 debug!(%from, "srt handshake answered");
100 }
101 }
102 }
103 SrtPacket::Data { payload_offset, .. } => {
104 if peer != Some(from) {
105 continue; // ignore data before a handshake from this peer
106 }
107 if session.is_none() {
108 session = Some(ctx.open_publish(self.key.clone()).await?);
109 }
110 let sess = session.as_ref().unwrap();
111 for au in demux.push(&datagram[payload_offset..]) {
112 if au.codec == CodecId::Unknown {
113 continue;
114 }
115 let pts = au.pts_ms;
116 let frame = match au.kind {
117 TsTrackKind::Video => {
118 MediaFrame::new_video(pts, pts, au.data, au.codec, au.keyframe)
119 }
120 TsTrackKind::Audio => MediaFrame::new_audio(pts, au.data, au.codec),
121 };
122 let _ = sess.publish_frame(frame)?;
123 }
124 }
125 }
126 }
127
128 if let Some(sess) = session {
129 sess.finish().await?;
130 }
131 Ok(())
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn handler_reports_name_and_key() {
141 let h = SrtHandler::new(
142 "127.0.0.1:9000".parse().unwrap(),
143 StreamKey::new("live", "feed"),
144 );
145 assert_eq!(h.name(), "srt");
146 assert_eq!(h.key.stream_id.as_str(), "feed");
147 }
148}