arcly_stream/protocol/srt/mod.rs
1//! Native SRT ingest handler (feature `srt`).
2//!
3//! Accepts SRT (Secure Reliable Transport) connections in **listener** mode,
4//! parses the control/data packet stream, and extracts the encapsulated
5//! MPEG-TS payload into elementary media frames that flow onto the engine bus
6//! through the standard [`InboundProtocol`] seam.
7//!
8//! [`InboundProtocol`]: crate::inbound::InboundProtocol
9//!
10//! # Pipeline
11//!
12//! ```text
13//! UDP datagram ─▶ SrtPacket::parse ─▶ data payload ─▶ TsDemuxer ─▶ AccessUnit
14//! │ │
15//! └─ handshake (induction/conclusion) ▼
16//! PublishSession
17//! ```
18//!
19//! The SRT [packet header][SrtPacket] (SRT RFC, draft-sharabayko-srt) and the
20//! [`SrtHandshake`] induction/conclusion exchange are parsed here; the MPEG-TS
21//! bytes ride in SRT *data* packets and are demuxed by [`TsDemuxer`].
22//!
23//! # Scope (per the v0.1.3 design decision)
24//!
25//! Unencrypted transport only. SRT's AES-CTR key exchange (the `KMREQ`/`KMRSP`
26//! handshake extensions) requires a crypto backend and is intentionally out of
27//! scope for the `#![forbid(unsafe_code)]`, dependency-light kernel — an
28//! encrypted feed is rejected at handshake. Basic ACK/NAK keep-alive is handled;
29//! a full ARQ retransmission window is not modeled (the listener tolerates loss
30//! and re-syncs on the next TS/PES boundary).
31
32mod egress;
33mod handshake;
34mod packet;
35
36pub use egress::SrtCaller;
37pub use handshake::{HandshakeType, SrtHandshake};
38pub use packet::{ControlType, SrtPacket};
39// The MPEG-TS demuxer now lives in the shared `protocol::tsdemux` module (also
40// used by `udp` ingest); re-exported here so existing paths keep working.
41pub use crate::protocol::tsdemux::{TsDemuxer, TsPayload, TsTrackKind};
42
43use crate::inbound::{InboundProtocol, IngestContext};
44use crate::{CodecId, MediaFrame, Result, StreamKey};
45use async_trait::async_trait;
46use std::net::SocketAddr;
47use tokio_util::sync::CancellationToken;
48use tracing::{debug, info, warn};
49
50/// SRT ingest worker — binds a UDP listener and demuxes one MPEG-TS feed.
51#[derive(Debug, Clone)]
52pub struct SrtHandler {
53 bind: SocketAddr,
54 key: StreamKey,
55}
56
57impl SrtHandler {
58 /// A listener bound to `bind` that publishes the received feed as `key`.
59 pub fn new(bind: SocketAddr, key: StreamKey) -> Self {
60 Self { bind, key }
61 }
62}
63
64#[async_trait]
65impl InboundProtocol for SrtHandler {
66 fn name(&self) -> &'static str {
67 "srt"
68 }
69
70 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
71 use tokio::net::UdpSocket;
72
73 let socket = UdpSocket::bind(self.bind).await?;
74 info!(bind = %self.bind, "srt listener bound");
75 let mut buf = vec![0u8; 1500];
76 let mut peer: Option<SocketAddr> = None;
77 let mut session: Option<crate::inbound::PublishSession> = None;
78 let mut demux = TsDemuxer::new();
79
80 loop {
81 let (n, from) = tokio::select! {
82 _ = shutdown.cancelled() => break,
83 r = socket.recv_from(&mut buf) => match r {
84 Ok(v) => v,
85 Err(e) => { warn!(error = %e, "srt recv failed"); continue; }
86 }
87 };
88 let datagram = &buf[..n];
89 let Some(pkt) = SrtPacket::parse(datagram) else {
90 continue;
91 };
92
93 match pkt {
94 SrtPacket::Control { control_type, .. } => {
95 // Reply to the induction/conclusion handshake to bring the
96 // session up; other control packets (ACK/NAK/keepalive) need
97 // no action for an unencrypted, loss-tolerant TS ingest.
98 if control_type == ControlType::Handshake {
99 if let Some(reply) = handshake::respond(datagram) {
100 let _ = socket.send_to(&reply, from).await;
101 peer = Some(from);
102 debug!(%from, "srt handshake answered");
103 }
104 }
105 }
106 SrtPacket::Data { payload_offset, .. } => {
107 if peer != Some(from) {
108 continue; // ignore data before a handshake from this peer
109 }
110 if session.is_none() {
111 session = Some(ctx.open_publish(self.key.clone()).await?);
112 }
113 let sess = session.as_ref().unwrap();
114 for au in demux.push(&datagram[payload_offset..]) {
115 if au.codec == CodecId::Unknown {
116 continue;
117 }
118 let pts = au.pts_ms;
119 let mut frame = match au.kind {
120 TsTrackKind::Video => {
121 MediaFrame::new_video(pts, pts, au.data, au.codec, au.keyframe)
122 }
123 TsTrackKind::Audio => MediaFrame::new_audio(pts, au.data, au.codec),
124 };
125 if au.is_config {
126 frame.flags |= crate::FrameFlags::CONFIG;
127 }
128 let _ = sess.publish_frame(frame)?;
129 }
130 }
131 }
132 }
133
134 if let Some(sess) = session {
135 sess.finish().await?;
136 }
137 Ok(())
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 #[test]
146 fn handler_reports_name_and_key() {
147 let h = SrtHandler::new(
148 "127.0.0.1:9000".parse().unwrap(),
149 StreamKey::new("live", "feed"),
150 );
151 assert_eq!(h.name(), "srt");
152 assert_eq!(h.key.stream_id.as_str(), "feed");
153 }
154}