srt_protocol/connection/
status.rs

1use std::time::{Duration, Instant};
2
3use log::info;
4
5use crate::settings::SocketId;
6
7#[derive(Debug, Clone, Eq, PartialEq)]
8enum Status {
9    Open(Duration),    // (flush_timeout)
10    Shutdown(Instant), // (flush_deadline)
11    Drain(Instant),    // (drain_deadline)
12    Closed,
13}
14
15#[derive(Debug)]
16pub struct ConnectionStatus {
17    connection: Status,
18    sender: Status,
19    receiver: Status,
20}
21
22impl ConnectionStatus {
23    pub fn new(flush_timeout: Duration) -> Self {
24        Self {
25            connection: Status::Open(flush_timeout),
26            receiver: Status::Open(flush_timeout),
27            sender: Status::Open(flush_timeout),
28        }
29    }
30
31    pub fn is_open(&self) -> bool {
32        !self.is_closed()
33    }
34
35    pub fn is_closed(&self) -> bool {
36        matches!(self.connection, Status::Closed)
37    }
38
39    pub fn should_drain_send_buffer(&self) -> bool {
40        use Status::*;
41        matches!(self.sender, Shutdown(_) | Drain(_))
42    }
43
44    pub fn on_data_stream_closed(&mut self, now: Instant) {
45        use Status::*;
46        if let Open(timeout) = self.sender {
47            info!("data stream closed, sender is in shutdown");
48            self.sender = Shutdown(now + timeout);
49        }
50    }
51
52    pub fn on_socket_closed(&mut self, now: Instant) {
53        use Status::*;
54        if let Open(timeout) = self.receiver {
55            info!("socket closed, receiver is draining");
56            self.receiver = Drain(now + timeout);
57        }
58    }
59
60    pub fn on_peer_idle_timeout(&mut self, now: Instant) {
61        use Status::*;
62        if let Open(timeout) = self.receiver {
63            info!("peer idle timeout, receiver is draining");
64            self.receiver = Drain(now + timeout);
65        }
66    }
67
68    pub fn handle_shutdown_packet(&mut self, now: Instant, log_sockid: SocketId) {
69        use Status::*;
70        if let Open(timeout) = self.receiver {
71            info!("{log_sockid:?} received shutdown packet, draining for {timeout:?}");
72            self.receiver = Drain(now + timeout);
73        }
74    }
75
76    pub fn check_sender_shutdown(
77        &mut self,
78        now: Instant,
79        send_buffer_flushed: bool,
80        receive_buffer_flushed: bool,
81        output_empty: bool,
82    ) -> bool {
83        use Status::*;
84        let result = match self.sender {
85            Shutdown(timeout) if send_buffer_flushed && output_empty || now > timeout => {
86                info!("sender Shutdown -> Drain");
87                self.sender = Drain(timeout);
88                true
89            }
90            Drain(timeout) if send_buffer_flushed && output_empty || now > timeout => {
91                info!("sender Drain -> Closed");
92                self.sender = Closed;
93                false
94            }
95            _ => false,
96        };
97        if matches!(self.sender, Closed) && receive_buffer_flushed && output_empty {
98            info!("sender closed and receiver flushed, socket is closed");
99            self.connection = Closed;
100        }
101        result
102    }
103
104    pub fn check_receive_close_timeout(
105        &mut self,
106        now: Instant,
107        receive_buffer_flushed: bool,
108        log_sockid: SocketId,
109    ) -> bool {
110        use Status::*;
111        match self.receiver {
112            Shutdown(_) | Drain(_) if receive_buffer_flushed => {
113                self.receiver = Closed;
114                self.connection = Closed;
115                info!("{log_sockid:?} reciever closed and flushed, connection is closed");
116                false
117            }
118            Shutdown(timeout) | Drain(timeout) if now > timeout => {
119                self.receiver = Closed;
120                self.connection = Closed;
121                info!("{log_sockid:?} reciever timed out flushing ({:?} too late), connection is closed", now - timeout);
122                true
123            }
124            _ => false,
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn open_close() {
135        let timeout = Duration::from_secs(10);
136        let mut status = ConnectionStatus::new(timeout);
137
138        assert!(status.is_open());
139        assert!(!status.is_closed());
140        assert!(!status.should_drain_send_buffer());
141
142        let now = Instant::now();
143        status.on_socket_closed(now);
144
145        assert!(status.is_open());
146        assert!(!status.is_closed());
147        assert!(!status.should_drain_send_buffer());
148    }
149}