1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/// Recv mode for a connection.
#[derive(Debug)]
pub enum RecvMode {
/// Multishot recv armed with provided buffer ring.
Multi,
/// Connection is closing, no recv armed.
Closed,
/// Outbound connect SQE in-flight, no recv armed yet.
Connecting,
}
/// Per-connection state tracked by the driver.
pub struct ConnectionState {
/// Current recv mode.
pub recv_mode: RecvMode,
/// Whether the connection is active.
pub active: bool,
/// Generation counter to detect stale ConnTokens.
pub generation: u32,
/// Whether this is an outbound (connect) connection.
pub outbound: bool,
/// Whether the connection has been fully established (TCP+TLS handshake done).
/// `on_close` is only fired when `established == true`.
pub established: bool,
/// Peer address (set on accept or connect).
pub peer_addr: Option<std::net::SocketAddr>,
/// Whether a connect timeout SQE is armed for this connection.
pub connect_timeout_armed: bool,
}
impl Default for ConnectionState {
fn default() -> Self {
Self::new()
}
}
impl ConnectionState {
pub fn new() -> Self {
ConnectionState {
recv_mode: RecvMode::Closed,
active: false,
generation: 0,
outbound: false,
established: false,
peer_addr: None,
connect_timeout_armed: false,
}
}
pub fn activate(&mut self) {
self.active = true;
self.recv_mode = RecvMode::Multi;
}
/// Activate as an outbound (connect) connection.
pub fn activate_outbound(&mut self) {
self.active = true;
self.outbound = true;
self.established = false;
self.recv_mode = RecvMode::Connecting;
}
pub fn deactivate(&mut self) {
self.active = false;
self.recv_mode = RecvMode::Closed;
self.outbound = false;
self.established = false;
self.peer_addr = None;
self.connect_timeout_armed = false;
self.generation = self.generation.wrapping_add(1);
}
}
/// Manages connection slots with a free list for O(1) allocation.
pub struct ConnectionTable {
slots: Vec<ConnectionState>,
free_list: Vec<u32>,
}
impl ConnectionTable {
pub fn new(max_connections: u32) -> Self {
let mut slots = Vec::with_capacity(max_connections as usize);
for _ in 0..max_connections {
slots.push(ConnectionState::new());
}
// Free list: indices in reverse order so pop gives lowest first.
let free_list: Vec<u32> = (0..max_connections).rev().collect();
ConnectionTable { slots, free_list }
}
/// Allocate a connection slot. Returns the slot index.
pub fn allocate(&mut self) -> Option<u32> {
let idx = self.free_list.pop()?;
self.slots[idx as usize].activate();
Some(idx)
}
/// Allocate a connection slot for an outbound connection. Returns the slot index.
pub fn allocate_outbound(&mut self) -> Option<u32> {
let idx = self.free_list.pop()?;
self.slots[idx as usize].activate_outbound();
Some(idx)
}
/// Release a connection slot back to the free list.
pub fn release(&mut self, idx: u32) {
if (idx as usize) < self.slots.len() {
if !self.slots[idx as usize].active {
return; // Already released — avoid double-push to free list
}
self.slots[idx as usize].deactivate();
self.free_list.push(idx);
}
}
/// Get a reference to a connection's state.
pub fn get(&self, idx: u32) -> Option<&ConnectionState> {
self.slots.get(idx as usize).filter(|s| s.active)
}
/// Get a mutable reference to a connection's state.
pub fn get_mut(&mut self, idx: u32) -> Option<&mut ConnectionState> {
self.slots.get_mut(idx as usize).filter(|s| s.active)
}
/// Get a mutable reference without checking active status (for internal use).
#[allow(dead_code)]
pub fn get_mut_unchecked(&mut self, idx: u32) -> &mut ConnectionState {
&mut self.slots[idx as usize]
}
/// Number of active connections.
pub fn active_count(&self) -> usize {
self.slots.len().saturating_sub(self.free_list.len())
}
/// Total number of connection slots (max_connections).
pub fn max_slots(&self) -> u32 {
self.slots.len() as u32
}
/// Get the generation for a slot (valid even if inactive).
pub fn generation(&self, idx: u32) -> u32 {
self.slots[idx as usize].generation
}
}