1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Eq, PartialEq)]
enum Status {
Open(Duration),
Shutdown(Instant),
Drain(Instant),
Closed,
}
#[derive(Debug)]
pub struct ConnectionStatus {
connection: Status,
sender: Status,
receiver: Status,
}
impl ConnectionStatus {
pub fn new(timeout: Duration) -> Self {
Self {
connection: Status::Open(timeout),
receiver: Status::Open(timeout),
sender: Status::Open(timeout),
}
}
pub fn is_open(&self) -> bool {
!self.is_closed()
}
pub fn is_closed(&self) -> bool {
matches!(self.connection, Status::Closed)
}
pub fn should_drain_send_buffer(&self) -> bool {
use Status::*;
matches!(self.sender, Shutdown(_) | Drain(_))
}
pub fn on_data_stream_closed(&mut self, now: Instant) {
use Status::*;
if let Open(timeout) = self.sender {
self.sender = Shutdown(now + timeout);
}
}
pub fn on_socket_closed(&mut self, now: Instant) {
use Status::*;
if let Open(timeout) = self.receiver {
self.receiver = Drain(now + timeout);
}
}
pub fn on_peer_idle_timeout(&mut self, now: Instant) {
use Status::*;
if let Open(timeout) = self.receiver {
self.receiver = Drain(now + timeout);
}
}
pub fn handle_shutdown_packet(&mut self, now: Instant) {
use Status::*;
if let Open(timeout) = self.receiver {
self.receiver = Drain(now + timeout);
}
}
pub fn check_sender_shutdown(
&mut self,
now: Instant,
send_buffer_flushed: bool,
receive_buffer_flushed: bool,
output_empty: bool,
) -> bool {
use Status::*;
let result = match self.sender {
Shutdown(timeout) if send_buffer_flushed && output_empty || now > timeout => {
self.sender = Drain(timeout);
true
}
Drain(timeout) if send_buffer_flushed && output_empty || now > timeout => {
self.sender = Closed;
false
}
_ => false,
};
if matches!(self.sender, Closed) && receive_buffer_flushed && output_empty {
self.connection = Closed;
}
result
}
pub fn check_receive_close_timeout(
&mut self,
now: Instant,
receive_buffer_flushed: bool,
) -> bool {
use Status::*;
match self.receiver {
Shutdown(timeout) | Drain(timeout) if now > timeout => {
self.receiver = Closed;
self.connection = Closed;
true
}
Shutdown(_) | Drain(_) if receive_buffer_flushed => {
self.receiver = Closed;
self.connection = Closed;
false
}
_ => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn open_close() {
let timeout = Duration::from_secs(10);
let mut status = ConnectionStatus::new(timeout);
assert!(status.is_open());
assert!(!status.is_closed());
assert!(!status.should_drain_send_buffer());
let now = Instant::now();
status.on_socket_closed(now);
assert!(status.is_open());
assert!(!status.is_closed());
assert!(!status.should_drain_send_buffer());
}
}