1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::time::{Duration, Instant};

use log::info;

use crate::settings::SocketId;

#[derive(Debug, Clone, Eq, PartialEq)]
enum Status {
    Open(Duration),    // (flush_timeout)
    Shutdown(Instant), // (flush_deadline)
    Drain(Instant),    // (drain_deadline)
    Closed,
}

#[derive(Debug)]
pub struct ConnectionStatus {
    connection: Status,
    sender: Status,
    receiver: Status,
}

impl ConnectionStatus {
    pub fn new(flush_timeout: Duration) -> Self {
        Self {
            connection: Status::Open(flush_timeout),
            receiver: Status::Open(flush_timeout),
            sender: Status::Open(flush_timeout),
        }
    }

    pub fn is_open(&self) -> bool {
        !self.is_closed()
    }

    pub fn is_closed(&self) -> bool {
        matches!(self.connection, Status::Closed)
    }

    pub fn should_drain_send_buffer(&self) -> bool {
        use Status::*;
        matches!(self.sender, Shutdown(_) | Drain(_))
    }

    pub fn on_data_stream_closed(&mut self, now: Instant) {
        use Status::*;
        if let Open(timeout) = self.sender {
            info!("data stream closed, sender is in shutdown");
            self.sender = Shutdown(now + timeout);
        }
    }

    pub fn on_socket_closed(&mut self, now: Instant) {
        use Status::*;
        if let Open(timeout) = self.receiver {
            info!("socket closed, receiver is draining");
            self.receiver = Drain(now + timeout);
        }
    }

    pub fn on_peer_idle_timeout(&mut self, now: Instant) {
        use Status::*;
        if let Open(timeout) = self.receiver {
            info!("peer idle timeout, receiver is draining");
            self.receiver = Drain(now + timeout);
        }
    }

    pub fn handle_shutdown_packet(&mut self, now: Instant, log_sockid: SocketId) {
        use Status::*;
        if let Open(timeout) = self.receiver {
            info!("{log_sockid:?} received shutdown packet, draining for {timeout:?}");
            self.receiver = Drain(now + timeout);
        }
    }

    pub fn check_sender_shutdown(
        &mut self,
        now: Instant,
        send_buffer_flushed: bool,
        receive_buffer_flushed: bool,
        output_empty: bool,
    ) -> bool {
        use Status::*;
        let result = match self.sender {
            Shutdown(timeout) if send_buffer_flushed && output_empty || now > timeout => {
                info!("sender Shutdown -> Drain");
                self.sender = Drain(timeout);
                true
            }
            Drain(timeout) if send_buffer_flushed && output_empty || now > timeout => {
                info!("sender Drain -> Closed");
                self.sender = Closed;
                false
            }
            _ => false,
        };
        if matches!(self.sender, Closed) && receive_buffer_flushed && output_empty {
            info!("sender closed and receiver flushed, socket is closed");
            self.connection = Closed;
        }
        result
    }

    pub fn check_receive_close_timeout(
        &mut self,
        now: Instant,
        receive_buffer_flushed: bool,
        log_sockid: SocketId,
    ) -> bool {
        use Status::*;
        match self.receiver {
            Shutdown(_) | Drain(_) if receive_buffer_flushed => {
                self.receiver = Closed;
                self.connection = Closed;
                info!("{log_sockid:?} reciever closed and flushed, connection is closed");
                false
            }
            Shutdown(timeout) | Drain(timeout) if now > timeout => {
                self.receiver = Closed;
                self.connection = Closed;
                info!("{log_sockid:?} reciever timed out flushing ({:?} too late), connection is closed", now - timeout);
                true
            }
            _ => false,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn open_close() {
        let timeout = Duration::from_secs(10);
        let mut status = ConnectionStatus::new(timeout);

        assert!(status.is_open());
        assert!(!status.is_closed());
        assert!(!status.should_drain_send_buffer());

        let now = Instant::now();
        status.on_socket_closed(now);

        assert!(status.is_open());
        assert!(!status.is_closed());
        assert!(!status.should_drain_send_buffer());
    }
}