srt_protocol/connection/
status.rs1use std::time::{Duration, Instant};
2
3use log::info;
4
5use crate::settings::SocketId;
6
7#[derive(Debug, Clone, Eq, PartialEq)]
8enum Status {
9 Open(Duration), Shutdown(Instant), Drain(Instant), Closed,
13}
14
15#[derive(Debug)]
16pub struct ConnectionStatus {
17 connection: Status,
18 sender: Status,
19 receiver: Status,
20}
21
22impl ConnectionStatus {
23 pub fn new(flush_timeout: Duration) -> Self {
24 Self {
25 connection: Status::Open(flush_timeout),
26 receiver: Status::Open(flush_timeout),
27 sender: Status::Open(flush_timeout),
28 }
29 }
30
31 pub fn is_open(&self) -> bool {
32 !self.is_closed()
33 }
34
35 pub fn is_closed(&self) -> bool {
36 matches!(self.connection, Status::Closed)
37 }
38
39 pub fn should_drain_send_buffer(&self) -> bool {
40 use Status::*;
41 matches!(self.sender, Shutdown(_) | Drain(_))
42 }
43
44 pub fn on_data_stream_closed(&mut self, now: Instant) {
45 use Status::*;
46 if let Open(timeout) = self.sender {
47 info!("data stream closed, sender is in shutdown");
48 self.sender = Shutdown(now + timeout);
49 }
50 }
51
52 pub fn on_socket_closed(&mut self, now: Instant) {
53 use Status::*;
54 if let Open(timeout) = self.receiver {
55 info!("socket closed, receiver is draining");
56 self.receiver = Drain(now + timeout);
57 }
58 }
59
60 pub fn on_peer_idle_timeout(&mut self, now: Instant) {
61 use Status::*;
62 if let Open(timeout) = self.receiver {
63 info!("peer idle timeout, receiver is draining");
64 self.receiver = Drain(now + timeout);
65 }
66 }
67
68 pub fn handle_shutdown_packet(&mut self, now: Instant, log_sockid: SocketId) {
69 use Status::*;
70 if let Open(timeout) = self.receiver {
71 info!("{log_sockid:?} received shutdown packet, draining for {timeout:?}");
72 self.receiver = Drain(now + timeout);
73 }
74 }
75
76 pub fn check_sender_shutdown(
77 &mut self,
78 now: Instant,
79 send_buffer_flushed: bool,
80 receive_buffer_flushed: bool,
81 output_empty: bool,
82 ) -> bool {
83 use Status::*;
84 let result = match self.sender {
85 Shutdown(timeout) if send_buffer_flushed && output_empty || now > timeout => {
86 info!("sender Shutdown -> Drain");
87 self.sender = Drain(timeout);
88 true
89 }
90 Drain(timeout) if send_buffer_flushed && output_empty || now > timeout => {
91 info!("sender Drain -> Closed");
92 self.sender = Closed;
93 false
94 }
95 _ => false,
96 };
97 if matches!(self.sender, Closed) && receive_buffer_flushed && output_empty {
98 info!("sender closed and receiver flushed, socket is closed");
99 self.connection = Closed;
100 }
101 result
102 }
103
104 pub fn check_receive_close_timeout(
105 &mut self,
106 now: Instant,
107 receive_buffer_flushed: bool,
108 log_sockid: SocketId,
109 ) -> bool {
110 use Status::*;
111 match self.receiver {
112 Shutdown(_) | Drain(_) if receive_buffer_flushed => {
113 self.receiver = Closed;
114 self.connection = Closed;
115 info!("{log_sockid:?} reciever closed and flushed, connection is closed");
116 false
117 }
118 Shutdown(timeout) | Drain(timeout) if now > timeout => {
119 self.receiver = Closed;
120 self.connection = Closed;
121 info!("{log_sockid:?} reciever timed out flushing ({:?} too late), connection is closed", now - timeout);
122 true
123 }
124 _ => false,
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn open_close() {
135 let timeout = Duration::from_secs(10);
136 let mut status = ConnectionStatus::new(timeout);
137
138 assert!(status.is_open());
139 assert!(!status.is_closed());
140 assert!(!status.should_drain_send_buffer());
141
142 let now = Instant::now();
143 status.on_socket_closed(now);
144
145 assert!(status.is_open());
146 assert!(!status.is_closed());
147 assert!(!status.should_drain_send_buffer());
148 }
149}