arcly_stream/protocol/srt/
egress.rs1use std::net::SocketAddr;
18use std::time::{Duration, Instant};
19
20use tokio::net::UdpSocket;
21use tokio::sync::mpsc;
22use tokio::time::timeout;
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, info};
25
26use super::handshake::{caller_conclusion, caller_induction, SrtHandshake};
27use super::packet::build_data_packet;
28use crate::{Result, StreamError};
29
30const TS_BYTES_PER_DATAGRAM: usize = 7 * 188;
32const SETUP_TIMEOUT: Duration = Duration::from_secs(5);
34
35pub struct SrtCaller {
37 addr: SocketAddr,
38 socket_id: u32,
39}
40
41impl SrtCaller {
42 pub fn new(addr: SocketAddr) -> Self {
44 let socket_id = (0x4152_434C ^ (addr.port() as u32).rotate_left(16)) | 1;
46 Self { addr, socket_id }
47 }
48
49 pub async fn run(
53 self,
54 mut ts: mpsc::Receiver<bytes::Bytes>,
55 shutdown: CancellationToken,
56 ) -> Result<()> {
57 let sock = UdpSocket::bind(("0.0.0.0", 0)).await?;
58 sock.connect(self.addr).await?;
59 let mut buf = [0u8; 1500];
60
61 sock.send(&caller_induction(self.socket_id, 0)).await?;
63 let n = timeout(SETUP_TIMEOUT, sock.recv(&mut buf))
64 .await
65 .map_err(|_| StreamError::protocol("srt induction timed out"))??;
66 let induction = SrtHandshake::parse(&buf[..n])
67 .ok_or_else(|| StreamError::protocol("malformed srt induction response"))?;
68
69 sock.send(&caller_conclusion(self.socket_id, 0, induction.cookie))
71 .await?;
72 let n = timeout(SETUP_TIMEOUT, sock.recv(&mut buf))
73 .await
74 .map_err(|_| StreamError::protocol("srt conclusion timed out"))??;
75 let agreement = SrtHandshake::parse(&buf[..n])
76 .ok_or_else(|| StreamError::protocol("malformed srt conclusion response"))?;
77 let dest = agreement.socket_id;
79 info!(addr = %self.addr, dest, "srt caller connected");
80
81 let start = Instant::now();
82 let mut seq = 0u32;
83 let mut msg = 0u32;
84 loop {
85 tokio::select! {
86 _ = shutdown.cancelled() => break,
87 chunk = ts.recv() => match chunk {
88 Some(bytes) => {
89 for piece in bytes.chunks(TS_BYTES_PER_DATAGRAM) {
90 let ts_us = start.elapsed().as_micros() as u32;
91 let pkt = build_data_packet(seq, msg, ts_us, dest, piece);
92 if sock.send(&pkt).await.is_err() {
93 return Ok(()); }
95 seq = seq.wrapping_add(1) & 0x7FFF_FFFF;
96 msg = msg.wrapping_add(1) & 0x03FF_FFFF;
97 }
98 }
99 None => break, }
101 }
102 }
103 debug!(addr = %self.addr, "srt caller finished");
104 Ok(())
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use crate::protocol::srt::{handshake::respond, SrtPacket};
112
113 #[tokio::test]
114 async fn caller_handshakes_and_sends_data_over_loopback() {
115 let listener = UdpSocket::bind("127.0.0.1:0").await.unwrap();
119 let addr = listener.local_addr().unwrap();
120
121 let (tx, rx) = mpsc::channel(4);
122 let shutdown = CancellationToken::new();
123 let sh = shutdown.clone();
124 let caller = SrtCaller::new(addr);
125 let caller_task = tokio::spawn(async move { caller.run(rx, sh).await });
126
127 let mut buf = [0u8; 1500];
128 let (n, peer) = listener.recv_from(&mut buf).await.unwrap();
130 let reply = respond(&buf[..n]).unwrap();
131 listener.send_to(&reply, peer).await.unwrap();
132 let (n, peer) = listener.recv_from(&mut buf).await.unwrap();
134 let reply = respond(&buf[..n]).unwrap();
135 listener.send_to(&reply, peer).await.unwrap();
136
137 tx.send(bytes::Bytes::from(vec![0x47u8; TS_BYTES_PER_DATAGRAM]))
139 .await
140 .unwrap();
141 let (n, _) = timeout(Duration::from_secs(5), listener.recv_from(&mut buf))
142 .await
143 .expect("data packet arrived")
144 .unwrap();
145 assert!(
146 matches!(SrtPacket::parse(&buf[..n]), Some(SrtPacket::Data { .. })),
147 "caller sent an SRT data packet after the handshake"
148 );
149
150 shutdown.cancel();
151 let _ = caller_task.await;
152 }
153}