srt_protocol/connection/
mod.rs

1pub mod status;
2pub use status::*;
3
4use std::{
5    convert::TryFrom,
6    fmt::Debug,
7    io,
8    net::SocketAddr,
9    time::{Duration, Instant},
10};
11
12use bytes::Bytes;
13
14use crate::{
15    options::*,
16    packet::*,
17    protocol::{
18        handshake::Handshake,
19        output::Output,
20        receiver::{Receiver, ReceiverContext},
21        sender::{Sender, SenderContext},
22        time::Timers,
23    },
24    settings::CipherSettings,
25    statistics::SocketStatistics,
26};
27
28#[derive(Debug, Eq, PartialEq)]
29pub struct Connection {
30    pub settings: ConnectionSettings,
31    pub handshake: Handshake,
32}
33
34#[derive(Debug, Eq, PartialEq, Clone)]
35pub struct ConnectionSettings {
36    /// The remote socket to send & receive to
37    pub remote: SocketAddr,
38
39    /// The socket id of the UDT entity on the other side
40    pub remote_sockid: SocketId,
41
42    /// The local UDT socket id
43    pub local_sockid: SocketId,
44
45    /// The time that this socket started at, used to develop timestamps
46    /// This is precisely the time that the Initiator sends the first packet (or an approximation if not the initiator, assuming symmetrical latency)
47    pub socket_start_time: Instant,
48
49    /// the initial RTT, to be used with TSBPD
50    pub rtt: Duration,
51
52    /// The first sequence number that will be sent/received
53    pub init_seq_num: SeqNumber,
54
55    /// The maximum packet size
56    pub max_packet_size: PacketSize,
57
58    /// The maximum flow size
59    pub max_flow_size: PacketCount,
60
61    /// The TSBPD of the connection--the max of each side's respective latencies
62    pub send_tsbpd_latency: Duration,
63    pub recv_tsbpd_latency: Duration,
64
65    /// The Too-Late Packet Drop (TLPKTDROP) mechanism allows the sender to
66    /// drop packets that have no chance to be delivered in time, and allows
67    /// the receiver to skip missing packets that have not been delivered in time
68    pub too_late_packet_drop: bool,
69
70    pub peer_idle_timeout: Duration,
71
72    /// Size of the receive buffer, in packets
73    pub recv_buffer_size: PacketCount,
74    /// Size of the send buffer, in packets
75    pub send_buffer_size: PacketCount,
76    pub cipher: Option<CipherSettings>,
77    pub stream_id: Option<String>,
78    pub bandwidth: LiveBandwidthMode,
79    pub statistics_interval: Duration,
80}
81
82#[derive(Debug)]
83pub struct DuplexConnection {
84    settings: ConnectionSettings,
85    timers: Timers,
86    handshake: Handshake,
87    output: Output,
88    sender: Sender,
89    receiver: Receiver,
90    stats: SocketStatistics,
91    status: ConnectionStatus,
92}
93
94#[allow(clippy::large_enum_variant)]
95#[derive(Debug, Clone, Eq, PartialEq)]
96pub enum Action<'a> {
97    ReleaseData((Instant, Bytes)),
98    SendPacket((Packet, SocketAddr)),
99    UpdateStatistics(&'a SocketStatistics),
100    WaitForData(Duration),
101    Close,
102}
103
104#[allow(clippy::large_enum_variant)]
105#[derive(Debug)]
106pub enum Input {
107    Data(Option<(Instant, Bytes)>),
108    Packet(ReceivePacketResult),
109    DataReleased,
110    PacketSent,
111    StatisticsUpdated,
112    Timer,
113}
114
115impl DuplexConnection {
116    pub fn new(connection: Connection) -> DuplexConnection {
117        let settings = connection.settings;
118
119        DuplexConnection {
120            settings: settings.clone(),
121            handshake: connection.handshake,
122            output: Output::new(&settings),
123            status: ConnectionStatus::new(settings.send_tsbpd_latency * 2), // the timeout should be larger than latency as otherwise packets that have just arrived definitely have a change to flush
124            timers: Timers::new(
125                settings.socket_start_time,
126                settings.statistics_interval,
127                settings.peer_idle_timeout,
128            ),
129            stats: SocketStatistics::new(),
130            receiver: Receiver::new(settings.clone()),
131            sender: Sender::new(settings),
132        }
133    }
134
135    pub fn handle_input(&mut self, now: Instant, input: Input) -> Action {
136        self.debug(now, "input", &input);
137
138        match input {
139            Input::Data(data) => self.handle_data_input(now, data),
140            Input::Packet(packet) => self.handle_packet_input(now, packet),
141            _ => {}
142        };
143
144        let action = if self.should_close(now) {
145            Action::Close
146        } else if self.should_update_statistics(now) {
147            self.update_statistics(now);
148            Action::UpdateStatistics(&self.stats)
149        } else if let Some(packet) = self.next_packet(now) {
150            Action::SendPacket(packet)
151        } else if let Some(data) = self.next_data(now) {
152            Action::ReleaseData(data)
153        } else {
154            Action::WaitForData(self.next_timer(now) - now)
155        };
156
157        self.debug(now, "action", &action);
158        action
159    }
160
161    pub fn is_open(&self) -> bool {
162        self.status.is_open()
163    }
164
165    pub fn settings(&self) -> &ConnectionSettings {
166        &self.settings
167    }
168
169    pub fn update_statistics(&mut self, now: Instant) {
170        self.stats.elapsed_time = now - self.settings.socket_start_time;
171        self.stats.tx_buffered_time = self.sender.tx_buffered_time();
172        self.stats.tx_buffered_data = self.sender.tx_buffered_packets();
173        self.stats.tx_buffered_bytes = self.sender.tx_buffered_bytes();
174
175        self.stats.rx_acknowledged_time = self.receiver.rx_acknowledged_time();
176    }
177
178    pub fn next_packet(&mut self, now: Instant) -> Option<(Packet, SocketAddr)> {
179        let p = self.output.pop_packet()?;
180        self.stats.tx_all_packets += 1;
181        self.stats.tx_all_bytes += u64::try_from(p.wire_size()).unwrap();
182
183        // payload length + (20 bytes IPv4 + 8 bytes UDP + 16 bytes SRT)
184        match &p {
185            Packet::Data(d) => {
186                self.stats.tx_data += 1;
187                self.stats.tx_bytes += u64::try_from(d.wire_size()).unwrap();
188            }
189            Packet::Control(c) => match c.control_type {
190                ControlTypes::Ack(ref a) => {
191                    self.stats.tx_ack += 1;
192                    if matches!(a, Acknowledgement::Lite(_)) {
193                        self.stats.tx_light_ack += 1;
194                    }
195                }
196                ControlTypes::Nak(_) => {
197                    self.stats.tx_nak += 1;
198                }
199                ControlTypes::Ack2(_) => {
200                    self.stats.tx_ack2 += 1;
201                }
202                _ => {}
203            },
204        }
205        self.debug(now, "send", &p);
206        Some((p, self.settings.remote))
207    }
208
209    pub fn next_data(&mut self, now: Instant) -> Option<(Instant, Bytes)> {
210        match self.receiver.arq.pop_next_message(now) {
211            Ok(Some(data)) => {
212                self.debug(now, "output", &data);
213                Some(data)
214            }
215            Err(error) => {
216                self.warn(now, "output", &error);
217                let dropped = error.too_late_packets.end - error.too_late_packets.start;
218                self.stats.rx_dropped_data += dropped as u64;
219                None
220            }
221            _ => None,
222        }
223    }
224
225    pub fn next_timer(&self, now: Instant) -> Instant {
226        let has_packets_to_send = self.sender.has_packets_to_send();
227        let next_message = self.receiver.arq.next_message_release_time();
228        let unacked_packets = self.receiver.arq.unacked_packet_count();
229        self.timers
230            .next_timer(now, has_packets_to_send, next_message, unacked_packets)
231    }
232
233    pub fn should_close(&mut self, now: Instant) -> bool {
234        if !self.is_open() {
235            true
236        } else {
237            self.check_timers(now);
238            false
239        }
240    }
241
242    pub fn should_update_statistics(&mut self, now: Instant) -> bool {
243        self.timers.check_statistics(now).is_some()
244    }
245
246    pub fn statistics(&self) -> &SocketStatistics {
247        &self.stats
248    }
249
250    pub fn check_timers(&mut self, now: Instant) -> Instant {
251        if self.timers.check_full_ack(now).is_some() {
252            self.receiver().on_full_ack_event(now);
253        }
254        if self.timers.check_nak(now).is_some() {
255            self.receiver().on_nak_event(now);
256        }
257        if self.timers.check_peer_idle_timeout(now).is_some() {
258            self.on_peer_idle_timeout(now);
259        }
260        if let Some(elapsed_periods) = self.timers.check_snd(now) {
261            self.sender().on_snd_event(now, elapsed_periods)
262        }
263
264        if self.status.check_receive_close_timeout(
265            now,
266            self.receiver.is_flushed(),
267            self.settings.local_sockid,
268        ) {
269            self.receiver().on_close_timeout(now);
270        }
271        if self.status.check_sender_shutdown(
272            now,
273            self.sender.is_flushed(),
274            self.receiver.is_flushed(),
275            self.output.is_empty(),
276        ) {
277            self.output.send_control(now, ControlTypes::Shutdown);
278        }
279
280        self.output.ensure_alive(now);
281
282        self.next_timer(now)
283    }
284
285    pub fn handle_data_input(&mut self, now: Instant, data: Option<(Instant, Bytes)>) {
286        self.debug(now, "input", &data);
287        match data {
288            Some(item) => {
289                self.sender().handle_data(now, item);
290            }
291            None => {
292                self.handle_data_stream_close(now);
293            }
294        }
295    }
296
297    pub fn handle_packet_input(&mut self, now: Instant, packet: ReceivePacketResult) {
298        self.debug(now, "packet", &packet);
299        use ReceivePacketError::*;
300        match packet {
301            Ok(packet) => self.handle_packet(now, packet),
302            Err(Io(error)) => self.handle_socket_close(now, error),
303            Err(Parse(e)) => self.warn(now, "packet", &e),
304        }
305    }
306
307    fn handle_data_stream_close(&mut self, now: Instant) {
308        self.info(now, "closed data", &());
309        self.status.on_data_stream_closed(now);
310    }
311
312    fn handle_socket_close(&mut self, now: Instant, error: io::Error) {
313        self.warn(now, "closed socket", &error);
314        self.status.on_socket_closed(now);
315    }
316
317    pub fn on_peer_idle_timeout(&mut self, now: Instant) {
318        self.output.send_control(now, ControlTypes::Shutdown);
319        self.status.on_peer_idle_timeout(now);
320    }
321
322    fn handle_packet(&mut self, now: Instant, (packet, from): (Packet, SocketAddr)) {
323        // TODO: record/report packets from invalid hosts?
324        // We don't care about packets from elsewhere
325        if from != self.settings.remote {
326            self.info(now, "invalid address", &(packet, from));
327            return;
328        }
329
330        if self.settings.local_sockid != packet.dest_sockid() {
331            self.info(now, "invalid socket id", &(packet, from));
332            return;
333        }
334
335        self.timers.reset_exp(now);
336
337        self.stats.rx_all_packets += 1;
338        self.stats.rx_all_bytes += u64::try_from(packet.wire_size()).unwrap();
339        match packet {
340            Packet::Data(data) => self.receiver().handle_data_packet(now, data),
341            Packet::Control(control) => self.handle_control_packet(now, control),
342        }
343    }
344
345    fn handle_control_packet(&mut self, now: Instant, control: ControlPacket) {
346        self.receiver().synchronize_clock(now, control.timestamp);
347
348        use ControlTypes::*;
349        match control.control_type {
350            // sender-responsible packets
351            Ack(ack) => self.sender().handle_ack_packet(now, ack),
352            DropRequest { range, .. } => self.receiver().handle_drop_request(now, range),
353            Handshake(shake) => self.handle_handshake_packet(now, shake),
354            Nak(nak) => self.sender().handle_nak_packet(now, nak),
355            // receiver-responsible
356            Ack2(seq_num) => self.receiver().handle_ack2_packet(now, seq_num),
357            // both
358            Shutdown => self
359                .status
360                .handle_shutdown_packet(now, self.settings.local_sockid),
361            // neither--this exists just to keep the connection alive
362            KeepAlive => {}
363            // TODO: case UMSG_CGWARNING: // 100 - Delay Warning
364            //            // One way packet delay is increasing, so decrease the sending rate
365            //            ControlTypes::DelayWarning?
366            CongestionWarning => todo!(),
367            // TODO: case UMSG_PEERERROR: // 1000 - An error has happened to the peer side
368            PeerError(_) => todo!(),
369            // TODO: case UMSG_EXT: // 0x7FFF - reserved and user defined messages
370            Srt(s) => self.handle_srt_control_packet(now, s),
371        }
372    }
373
374    fn handle_handshake_packet(&mut self, now: Instant, handshake: HandshakeControlInfo) {
375        if let Some(control) = self.handshake.handle_handshake(handshake) {
376            self.output.send_control(now, control);
377        }
378    }
379
380    fn handle_srt_control_packet(&mut self, now: Instant, pack: SrtControlPacket) {
381        use self::SrtControlPacket::*;
382        match pack {
383            HandshakeRequest(_) | HandshakeResponse(_) => self.warn(now, "handshake", &pack),
384            KeyRefreshRequest(keying_material) => self
385                .receiver()
386                .handle_key_refresh_request(now, keying_material),
387            KeyRefreshResponse(keying_material) => {
388                self.sender().handle_key_refresh_response(keying_material)
389            }
390            _ => unimplemented!("{:?}", pack),
391        }
392    }
393
394    fn sender(&mut self) -> SenderContext {
395        SenderContext::new(
396            &mut self.status,
397            &mut self.timers,
398            &mut self.output,
399            &mut self.stats,
400            &mut self.sender,
401        )
402    }
403
404    fn receiver(&mut self) -> ReceiverContext {
405        ReceiverContext::new(
406            &mut self.timers,
407            &mut self.output,
408            &mut self.stats,
409            &mut self.receiver,
410        )
411    }
412
413    fn debug(&self, now: Instant, tag: &str, debug: &impl Debug) {
414        log::debug!(
415            "{:?}|{:?}|{} - {:?}",
416            TimeSpan::from_interval(self.settings.socket_start_time, now),
417            self.settings.local_sockid,
418            tag,
419            debug
420        );
421    }
422
423    fn info(&self, now: Instant, tag: &str, debug: &impl Debug) {
424        log::info!(
425            "{:?}|{:?}|{} - {:?}",
426            TimeSpan::from_interval(self.settings.socket_start_time, now),
427            self.settings.local_sockid,
428            tag,
429            debug
430        );
431    }
432
433    fn warn(&self, now: Instant, tag: &str, debug: &impl Debug) {
434        log::warn!(
435            "{:?}|{:?}|{} - {:?}",
436            TimeSpan::from_interval(self.settings.socket_start_time, now),
437            self.settings.local_sockid,
438            tag,
439            debug
440        );
441    }
442}
443
444#[cfg(test)]
445mod duplex_connection {
446    use assert_matches::assert_matches;
447
448    use Action::*;
449    use ControlTypes::*;
450    use Packet::*;
451
452    use crate::protocol::time::Rtt;
453
454    use super::*;
455
456    const MILLIS: Duration = Duration::from_millis(1);
457    const SND: Duration = MILLIS;
458    const TSBPD: Duration = Duration::from_secs(1);
459
460    fn remote_addr() -> SocketAddr {
461        ([127, 0, 0, 1], 2223).into()
462    }
463
464    fn remote_sockid() -> SocketId {
465        SocketId(2)
466    }
467
468    fn local_sockid() -> SocketId {
469        SocketId(2)
470    }
471
472    fn new_connection(now: Instant) -> Connection {
473        Connection {
474            settings: ConnectionSettings {
475                remote: remote_addr(),
476                remote_sockid: remote_sockid(),
477                local_sockid: local_sockid(),
478                socket_start_time: now,
479                rtt: Duration::default(),
480                init_seq_num: SeqNumber::new_truncate(0),
481                max_packet_size: PacketSize(1316),
482                max_flow_size: PacketCount(8192),
483                send_tsbpd_latency: TSBPD,
484                recv_tsbpd_latency: TSBPD,
485                recv_buffer_size: PacketCount(1024),
486                send_buffer_size: PacketCount(1024),
487                cipher: None,
488                stream_id: None,
489                bandwidth: LiveBandwidthMode::Unlimited,
490                statistics_interval: Duration::from_secs(10),
491                peer_idle_timeout: Duration::from_secs(5),
492                too_late_packet_drop: true,
493            },
494            handshake: crate::protocol::handshake::Handshake::Connector,
495        }
496    }
497
498    #[test]
499    fn input_data_close() {
500        let start = Instant::now();
501        let mut connection = DuplexConnection::new(new_connection(start));
502
503        let mut now = start;
504        assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
505        assert_eq!(
506            connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
507            WaitForData(SND)
508        );
509        assert_eq!(
510            connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
511            WaitForData(SND)
512        );
513
514        assert_eq!(
515            connection.handle_input(now, Input::Data(None)),
516            WaitForData(SND),
517            "input data 'close' should drain the send buffers"
518        );
519
520        now += SND;
521        assert_matches!(
522            connection.handle_input(now, Input::Timer),
523            SendPacket((Data(_), _))
524        );
525        // only send packets at a rate in accordance with the calculated SND period
526        assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
527
528        now += SND;
529        assert_matches!(
530            connection.handle_input(now, Input::Timer),
531            SendPacket((Data(_), _))
532        );
533
534        // acknowledge first packet
535        let packet = Control(ControlPacket {
536            timestamp: TimeStamp::MIN,
537            dest_sockid: SocketId(2),
538            control_type: Ack(Acknowledgement::Full(
539                SeqNumber(1),
540                AckStatistics {
541                    rtt: Rtt::new(TimeSpan::ZERO, TimeSpan::ZERO),
542                    buffer_available: 10000,
543                    packet_receive_rate: None,
544                    estimated_link_capacity: None,
545                    data_receive_rate: None,
546                },
547                FullAckSeqNumber::INITIAL,
548            )),
549        });
550        assert_eq!(
551            connection.handle_input(now, Input::Packet(Ok((packet, remote_addr())))),
552            SendPacket((
553                Control(ControlPacket {
554                    timestamp: TimeStamp::from_micros(2_000),
555                    dest_sockid: SocketId(2),
556                    control_type: Ack2(FullAckSeqNumber::INITIAL),
557                }),
558                remote_addr()
559            ))
560        );
561
562        // if the last packet was not acknowledged wait for drain timeout to actually close
563        //  which can include sending keep alive messages too
564        now += Duration::from_secs(4);
565        assert_matches!(
566            connection.handle_input(now, Input::Timer),
567            SendPacket((
568                Control(ControlPacket {
569                    control_type: Shutdown,
570                    ..
571                }),
572                _
573            ))
574        );
575        assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
576        assert_eq!(connection.handle_input(now, Input::Timer), Close);
577    }
578
579    #[test]
580    fn too_late_packet_drop() {
581        let start = Instant::now();
582        let mut connection = DuplexConnection::new(new_connection(start));
583
584        let mut now = start;
585        assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
586        assert_eq!(
587            connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
588            WaitForData(SND)
589        );
590        // the last packet sent is kept around for retransmit on flush
591        // keeping this original behavior intact otherwise integration tests fail
592        assert_eq!(
593            connection.handle_input(now, Input::Data(Some((start, Bytes::new())))),
594            WaitForData(SND)
595        );
596
597        now += SND;
598        assert_matches!(
599            connection.handle_input(now, Input::Timer),
600            SendPacket((Data(DataPacket { seq_number, retransmitted: false, .. }), _)) if seq_number.0 == 0
601        );
602        assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(SND));
603
604        // timeout : retransmits 0 and sends 1
605        now += TSBPD;
606
607        assert_matches!(
608            connection.handle_input(now, Input::Timer),
609            SendPacket((Data(DataPacket {seq_number, retransmitted: true, ..}), _)) if seq_number.0 == 0
610        );
611        assert_matches!(
612            connection.handle_input(now, Input::Timer),
613            SendPacket((Data(DataPacket {seq_number, retransmitted: false, ..}), _)) if seq_number.0 == 1
614        );
615
616        assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
617
618        now += TSBPD / 4; // TSBPD * 1.25
619        assert_matches!(connection.handle_input(now, Input::Timer), WaitForData(_));
620
621        // https://datatracker.ietf.org/doc/html/draft-sharabayko-srt-00#section-3.2.9
622        //
623        // 3.2.9.  Message Drop Request
624        //
625        //    A Message Drop Request control packet is sent by the sender to the
626        //    receiver when it requests the retransmission of an unacknowledged
627        //    packet (all or part of a message) which is not present in the
628        //    sender's buffer.  This may happen, for example, when a TTL parameter
629        //    (passed in the sending function) triggers a timeout for
630        //    retransmitting lost packets which constitute parts of a message,
631        //    causing these packets to be removed from the sender's buffer.
632        //
633        //    The sender notifies the receiver that it must not wait for
634        //    retransmission of this message.  Note that a Message Drop Request
635        //    control packet is not sent if the Too Late Packet Drop mechanism
636        //    (Section 4.6) causes the sender to drop a message, as in this case
637        //    the receiver is expected to drop it anyway.
638        assert_eq!(
639            connection.handle_input(
640                now,
641                Input::Packet(Ok((
642                    Control(ControlPacket {
643                        timestamp: TimeStamp::MIN + SND + TSBPD + TSBPD / 4,
644                        dest_sockid: remote_sockid(),
645                        control_type: Nak((SeqNumber(0)..SeqNumber(2)).into()),
646                    }),
647                    remote_addr()
648                )))
649            ),
650            SendPacket((
651                Control(ControlPacket {
652                    timestamp: TimeStamp::MIN + SND + TSBPD + TSBPD / 4,
653                    dest_sockid: remote_sockid(),
654                    control_type: DropRequest {
655                        msg_to_drop: MsgNumber(0),
656                        range: SeqNumber(0)..=SeqNumber(1)
657                    }
658                }),
659                remote_addr()
660            ))
661        );
662    }
663}