use std::fmt;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub enum PeerAddr {
Tcp(std::net::SocketAddr),
Unix(PathBuf),
}
impl fmt::Display for PeerAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PeerAddr::Tcp(addr) => write!(f, "{addr}"),
PeerAddr::Unix(path) => {
if path.as_os_str().is_empty() {
f.write_str("(unnamed)")
} else {
write!(f, "{}", path.display())
}
}
}
}
}
#[derive(Debug)]
pub enum RecvMode {
Multi,
#[cfg(feature = "timestamps")]
MsgMulti,
Closed,
Connecting,
}
pub struct ConnectionState {
pub recv_mode: RecvMode,
pub active: bool,
pub generation: u32,
pub outbound: bool,
pub established: bool,
pub peer_addr: Option<PeerAddr>,
pub connect_timeout_armed: bool,
#[cfg(feature = "timestamps")]
pub recv_timestamp_ns: u64,
}
impl Default for ConnectionState {
fn default() -> Self {
Self::new()
}
}
impl ConnectionState {
pub fn new() -> Self {
ConnectionState {
recv_mode: RecvMode::Closed,
active: false,
generation: 0,
outbound: false,
established: false,
peer_addr: None,
connect_timeout_armed: false,
#[cfg(feature = "timestamps")]
recv_timestamp_ns: 0,
}
}
pub fn activate(&mut self) {
self.active = true;
self.recv_mode = RecvMode::Multi;
}
pub fn activate_outbound(&mut self) {
self.active = true;
self.outbound = true;
self.established = false;
self.recv_mode = RecvMode::Connecting;
}
pub fn deactivate(&mut self) {
self.active = false;
self.recv_mode = RecvMode::Closed;
self.outbound = false;
self.established = false;
self.peer_addr = None;
self.connect_timeout_armed = false;
#[cfg(feature = "timestamps")]
{
self.recv_timestamp_ns = 0;
}
self.generation = self.generation.wrapping_add(1);
}
}
pub struct ConnectionTable {
slots: Vec<ConnectionState>,
free_list: Vec<u32>,
}
impl ConnectionTable {
pub fn new(max_connections: u32) -> Self {
let mut slots = Vec::with_capacity(max_connections as usize);
for _ in 0..max_connections {
slots.push(ConnectionState::new());
}
let free_list: Vec<u32> = (0..max_connections).rev().collect();
ConnectionTable { slots, free_list }
}
pub fn allocate(&mut self) -> Option<u32> {
let idx = self.free_list.pop()?;
self.slots[idx as usize].activate();
Some(idx)
}
pub fn allocate_outbound(&mut self) -> Option<u32> {
let idx = self.free_list.pop()?;
self.slots[idx as usize].activate_outbound();
Some(idx)
}
pub fn release(&mut self, idx: u32) {
if (idx as usize) < self.slots.len() {
if !self.slots[idx as usize].active {
return; }
self.slots[idx as usize].deactivate();
self.free_list.push(idx);
}
}
pub fn get(&self, idx: u32) -> Option<&ConnectionState> {
self.slots.get(idx as usize).filter(|s| s.active)
}
pub fn get_mut(&mut self, idx: u32) -> Option<&mut ConnectionState> {
self.slots.get_mut(idx as usize).filter(|s| s.active)
}
#[cfg_attr(not(has_io_uring), allow(dead_code))]
pub fn active_count(&self) -> usize {
self.slots.len().saturating_sub(self.free_list.len())
}
#[cfg_attr(not(has_io_uring), allow(dead_code))]
pub fn max_slots(&self) -> u32 {
self.slots.len() as u32
}
pub fn generation(&self, idx: u32) -> u32 {
self.slots[idx as usize].generation
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn allocate_returns_indices_and_marks_active() {
let mut table = ConnectionTable::new(4);
assert_eq!(table.active_count(), 0);
let idx = table.allocate().unwrap();
assert_eq!(table.active_count(), 1);
assert!(table.get(idx).is_some());
assert!(table.get(idx).unwrap().active);
assert!(matches!(table.get(idx).unwrap().recv_mode, RecvMode::Multi));
}
#[test]
fn allocate_outbound_sets_connecting_mode() {
let mut table = ConnectionTable::new(4);
let idx = table.allocate_outbound().unwrap();
let conn = table.get(idx).unwrap();
assert!(conn.outbound);
assert!(!conn.established);
assert!(matches!(conn.recv_mode, RecvMode::Connecting));
}
#[test]
fn release_makes_slot_reusable() {
let mut table = ConnectionTable::new(2);
let idx0 = table.allocate().unwrap();
let idx1 = table.allocate().unwrap();
assert_eq!(table.active_count(), 2);
assert!(table.allocate().is_none());
table.release(idx0);
assert_eq!(table.active_count(), 1);
assert!(table.get(idx0).is_none());
let idx_new = table.allocate().unwrap();
assert_eq!(idx_new, idx0);
assert_eq!(table.active_count(), 2);
table.release(idx1);
table.release(idx_new);
}
#[test]
fn release_increments_generation() {
let mut table = ConnectionTable::new(4);
let idx = table.allocate().unwrap();
assert_eq!(table.generation(idx), 0);
table.release(idx);
assert_eq!(table.generation(idx), 1);
let idx2 = table.allocate().unwrap();
assert_eq!(idx2, idx);
assert_eq!(table.generation(idx), 1);
table.release(idx);
assert_eq!(table.generation(idx), 2);
}
#[test]
fn generation_wraps_at_u32_max() {
let mut table = ConnectionTable::new(1);
let idx = table.allocate().unwrap();
table.slots[idx as usize].generation = u32::MAX;
table.release(idx);
assert_eq!(table.generation(idx), 0); }
#[test]
fn double_release_is_no_op() {
let mut table = ConnectionTable::new(4);
let idx = table.allocate().unwrap();
let gen_before = table.generation(idx);
table.release(idx);
let gen_after = table.generation(idx);
assert_eq!(gen_after, gen_before + 1);
table.release(idx);
assert_eq!(table.generation(idx), gen_after); assert_eq!(table.active_count(), 0);
let idx0 = table.allocate().unwrap();
let idx1 = table.allocate().unwrap();
let idx2 = table.allocate().unwrap();
let idx3 = table.allocate().unwrap();
assert!(table.allocate().is_none()); table.release(idx0);
table.release(idx1);
table.release(idx2);
table.release(idx3);
}
#[test]
fn get_returns_none_for_inactive_slot() {
let mut table = ConnectionTable::new(4);
assert!(table.get(0).is_none());
let idx = table.allocate().unwrap();
assert!(table.get(idx).is_some());
table.release(idx);
assert!(table.get(idx).is_none());
}
#[test]
fn get_returns_none_for_out_of_bounds() {
let table = ConnectionTable::new(4);
assert!(table.get(99).is_none());
}
#[test]
fn release_out_of_bounds_is_no_op() {
let mut table = ConnectionTable::new(4);
table.release(99);
assert_eq!(table.active_count(), 0);
}
#[test]
fn exhaust_all_slots() {
let mut table = ConnectionTable::new(3);
let a = table.allocate().unwrap();
let b = table.allocate().unwrap();
let c = table.allocate().unwrap();
assert!(table.allocate().is_none());
assert_eq!(table.active_count(), 3);
table.release(b);
assert_eq!(table.active_count(), 2);
let d = table.allocate().unwrap();
assert_eq!(d, b); assert_eq!(table.active_count(), 3);
assert!(table.allocate().is_none());
table.release(a);
table.release(c);
table.release(d);
}
#[test]
fn deactivate_resets_all_fields() {
let mut table = ConnectionTable::new(4);
let idx = table.allocate_outbound().unwrap();
if let Some(cs) = table.get_mut(idx) {
cs.established = true;
cs.connect_timeout_armed = true;
cs.peer_addr = Some(PeerAddr::Tcp("127.0.0.1:8080".parse().unwrap()));
}
table.release(idx);
let cs = &table.slots[idx as usize];
assert!(!cs.active);
assert!(!cs.outbound);
assert!(!cs.established);
assert!(!cs.connect_timeout_armed);
assert!(cs.peer_addr.is_none());
assert!(matches!(cs.recv_mode, RecvMode::Closed));
}
#[test]
fn max_slots_returns_capacity() {
let table = ConnectionTable::new(16);
assert_eq!(table.max_slots(), 16);
assert_eq!(table.active_count(), 0);
}
#[test]
fn allocate_gives_lowest_index_first() {
let table = ConnectionTable::new(4);
assert_eq!(table.free_list.last(), Some(&0));
}
}