Skip to main content

arcly_stream/protocol/srt/
egress.rs

1//! SRT **caller egress** — dial an SRT listener and push an MPEG-TS feed.
2//!
3//! The egress counterpart to the [listener ingest](super::SrtHandler):
4//! [`SrtCaller`] connects to a remote SRT listener (`srt://host:port` in caller
5//! mode), runs the caller handshake (`INDUCTION` → `CONCLUSION`), then wraps an
6//! MPEG-TS byte stream into SRT data packets and sends them.
7//!
8//! The transport stays **MPEG-TS-agnostic of the engine**: the caller consumes
9//! already-muxed TS bytes from an [`mpsc`] channel, so the host pairs it with a
10//! [`MpegTsMuxer`](crate::packager::MpegTsMuxer) to get a full frame → SRT path
11//! without coupling this module to the packager.
12//!
13//! Scope: unencrypted, and the minimal HSv5 handshake (no `HSREQ`/`KMREQ`
14//! extension blocks) — it interoperates with this crate's own listener; full
15//! extension interop with third-party SRT stacks is a follow-up.
16
17use std::net::SocketAddr;
18use std::time::{Duration, Instant};
19
20use tokio::net::UdpSocket;
21use tokio::sync::mpsc;
22use tokio::time::timeout;
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, info};
25
26use super::handshake::{caller_conclusion, caller_induction, SrtHandshake};
27use super::packet::build_data_packet;
28use crate::{Result, StreamError};
29
30/// 7 × 188-byte TS packets = the classic 1316-byte SRT payload.
31const TS_BYTES_PER_DATAGRAM: usize = 7 * 188;
32/// Bound the handshake so a dead listener can't wedge the caller.
33const SETUP_TIMEOUT: Duration = Duration::from_secs(5);
34
35/// Pushes an MPEG-TS feed to a remote SRT listener in caller mode.
36pub struct SrtCaller {
37    addr: SocketAddr,
38    socket_id: u32,
39}
40
41impl SrtCaller {
42    /// A caller targeting the listener at `addr`.
43    pub fn new(addr: SocketAddr) -> Self {
44        // A non-zero, stable-ish local socket id (real callers randomize).
45        let socket_id = (0x4152_434C ^ (addr.port() as u32).rotate_left(16)) | 1;
46        Self { addr, socket_id }
47    }
48
49    /// Connect, handshake, and forward MPEG-TS chunks from `ts` until the channel
50    /// closes or `shutdown` fires. Each inbound chunk is split into
51    /// 1316-byte SRT datagrams.
52    pub async fn run(
53        self,
54        mut ts: mpsc::Receiver<bytes::Bytes>,
55        shutdown: CancellationToken,
56    ) -> Result<()> {
57        let sock = UdpSocket::bind(("0.0.0.0", 0)).await?;
58        sock.connect(self.addr).await?;
59        let mut buf = [0u8; 1500];
60
61        // INDUCTION → cookie.
62        sock.send(&caller_induction(self.socket_id, 0)).await?;
63        let n = timeout(SETUP_TIMEOUT, sock.recv(&mut buf))
64            .await
65            .map_err(|_| StreamError::protocol("srt induction timed out"))??;
66        let induction = SrtHandshake::parse(&buf[..n])
67            .ok_or_else(|| StreamError::protocol("malformed srt induction response"))?;
68
69        // CONCLUSION (echo the cookie) → agreement.
70        sock.send(&caller_conclusion(self.socket_id, 0, induction.cookie))
71            .await?;
72        let n = timeout(SETUP_TIMEOUT, sock.recv(&mut buf))
73            .await
74            .map_err(|_| StreamError::protocol("srt conclusion timed out"))??;
75        let agreement = SrtHandshake::parse(&buf[..n])
76            .ok_or_else(|| StreamError::protocol("malformed srt conclusion response"))?;
77        // The data packets' destination is the socket id the listener returned.
78        let dest = agreement.socket_id;
79        info!(addr = %self.addr, dest, "srt caller connected");
80
81        let start = Instant::now();
82        let mut seq = 0u32;
83        let mut msg = 0u32;
84        loop {
85            tokio::select! {
86                _ = shutdown.cancelled() => break,
87                chunk = ts.recv() => match chunk {
88                    Some(bytes) => {
89                        for piece in bytes.chunks(TS_BYTES_PER_DATAGRAM) {
90                            let ts_us = start.elapsed().as_micros() as u32;
91                            let pkt = build_data_packet(seq, msg, ts_us, dest, piece);
92                            if sock.send(&pkt).await.is_err() {
93                                return Ok(()); // listener gone
94                            }
95                            seq = seq.wrapping_add(1) & 0x7FFF_FFFF;
96                            msg = msg.wrapping_add(1) & 0x03FF_FFFF;
97                        }
98                    }
99                    None => break, // source closed
100                }
101            }
102        }
103        debug!(addr = %self.addr, "srt caller finished");
104        Ok(())
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use crate::protocol::srt::{handshake::respond, SrtPacket};
112
113    #[tokio::test]
114    async fn caller_handshakes_and_sends_data_over_loopback() {
115        // A fake listener: this crate's own `respond` for the handshake, then it
116        // captures the first data packet — a real end-to-end UDP loopback of the
117        // caller flow (handshake + data framing).
118        let listener = UdpSocket::bind("127.0.0.1:0").await.unwrap();
119        let addr = listener.local_addr().unwrap();
120
121        let (tx, rx) = mpsc::channel(4);
122        let shutdown = CancellationToken::new();
123        let sh = shutdown.clone();
124        let caller = SrtCaller::new(addr);
125        let caller_task = tokio::spawn(async move { caller.run(rx, sh).await });
126
127        let mut buf = [0u8; 1500];
128        // Induction.
129        let (n, peer) = listener.recv_from(&mut buf).await.unwrap();
130        let reply = respond(&buf[..n]).unwrap();
131        listener.send_to(&reply, peer).await.unwrap();
132        // Conclusion.
133        let (n, peer) = listener.recv_from(&mut buf).await.unwrap();
134        let reply = respond(&buf[..n]).unwrap();
135        listener.send_to(&reply, peer).await.unwrap();
136
137        // Feed one TS datagram; expect an SRT data packet on the wire.
138        tx.send(bytes::Bytes::from(vec![0x47u8; TS_BYTES_PER_DATAGRAM]))
139            .await
140            .unwrap();
141        let (n, _) = timeout(Duration::from_secs(5), listener.recv_from(&mut buf))
142            .await
143            .expect("data packet arrived")
144            .unwrap();
145        assert!(
146            matches!(SrtPacket::parse(&buf[..n]), Some(SrtPacket::Data { .. })),
147            "caller sent an SRT data packet after the handshake"
148        );
149
150        shutdown.cancel();
151        let _ = caller_task.await;
152    }
153}