use heapless::Vec;
use crate::{
bus::{Envelope, SimBus},
clock::{Duration, Instant},
fault::FaultConfig,
io::NodeAddress,
rng::{Rng, Xorshift64},
};
#[derive(Debug, Clone)]
pub struct TcpFaultConfig {
pub message: FaultConfig,
pub connection_refused: (u32, u32),
pub connection_reset: (u32, u32),
pub connection_timeout: (u32, u32),
}
impl TcpFaultConfig {
pub fn none() -> Self {
Self {
message: FaultConfig::none(),
connection_refused: (0, 1),
connection_reset: (0, 1),
connection_timeout: (0, 1),
}
}
pub fn light() -> Self {
Self {
message: FaultConfig::light(),
connection_refused: (1, 200),
connection_reset: (1, 200),
connection_timeout: (1, 200),
}
}
pub fn chaos() -> Self {
Self {
message: FaultConfig::chaos(),
connection_refused: (1, 20),
connection_reset: (1, 20),
connection_timeout: (1, 20),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TcpConnectionState {
Disconnected,
Connecting {
initiated_by: NodeAddress,
at: Instant,
},
Connected {
initiator: NodeAddress,
acceptor: NodeAddress,
},
Reset,
}
impl TcpConnectionState {
pub fn is_connected(&self) -> bool {
matches!(self, Self::Connected { .. })
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TcpEvent {
ConnectionEstablished { from: NodeAddress, to: NodeAddress },
ConnectionRefused { from: NodeAddress, to: NodeAddress },
ConnectionTimeout { from: NodeAddress, to: NodeAddress },
ConnectionReset { from: NodeAddress, to: NodeAddress },
ConnectionClosed { from: NodeAddress, to: NodeAddress },
}
#[derive(Debug)]
pub struct TcpSimBus<const N: usize, const Q: usize> {
inner: SimBus<N, Q>,
tcp_faults: TcpFaultConfig,
connection: TcpConnectionState,
connect_timeout: Duration,
events: Vec<TcpEvent, 16>,
rng: Xorshift64,
}
impl<const N: usize, const Q: usize> TcpSimBus<N, Q> {
pub fn new(seed: u64, faults: TcpFaultConfig) -> Self {
Self {
inner: SimBus::new(seed, faults.message.clone()),
tcp_faults: faults,
connection: TcpConnectionState::Disconnected,
connect_timeout: Duration::from_millis(5_000),
events: Vec::new(),
rng: Xorshift64::new(seed.wrapping_add(1)),
}
}
pub fn connect(&mut self, from: NodeAddress, to: NodeAddress) {
if self.connection.is_connected() {
return;
}
let now = self.inner.now();
if self.rng.chance(
self.tcp_faults.connection_refused.0,
self.tcp_faults.connection_refused.1,
) {
let _ = self.events.push(TcpEvent::ConnectionRefused {
from: from.clone(),
to: to.clone(),
});
return;
}
if self.rng.chance(
self.tcp_faults.connection_timeout.0,
self.tcp_faults.connection_timeout.1,
) {
self.connection = TcpConnectionState::Connecting {
initiated_by: from,
at: now,
};
return;
}
self.connection = TcpConnectionState::Connected {
initiator: from.clone(),
acceptor: to.clone(),
};
let _ = self
.events
.push(TcpEvent::ConnectionEstablished { from, to });
}
pub fn disconnect(&mut self, from: NodeAddress, to: NodeAddress) {
if self.connection.is_connected() {
self.connection = TcpConnectionState::Disconnected;
let _ = self.events.push(TcpEvent::ConnectionClosed { from, to });
}
}
pub fn connection_state(&self) -> &TcpConnectionState {
&self.connection
}
pub fn is_connected(&self) -> bool {
self.connection.is_connected()
}
pub fn send(&mut self, src: NodeAddress, dst: NodeAddress, data: &[u8]) -> bool {
if !self.connection.is_connected() {
return false;
}
self.inner.send(src, dst, data)
}
pub fn tick(&mut self, duration: Duration) -> Vec<Envelope<N>, Q> {
let now = self.inner.now();
if let TcpConnectionState::Connecting { initiated_by, at } = &self.connection.clone() {
let elapsed = now.checked_duration_since(*at).unwrap_or(Duration::ZERO);
if elapsed >= self.connect_timeout {
let from = initiated_by.clone();
self.connection = TcpConnectionState::Disconnected;
let _ = self.events.push(TcpEvent::ConnectionTimeout {
from: from.clone(),
to: NodeAddress(0),
});
}
}
if self.connection.is_connected() {
if self.rng.chance(
self.tcp_faults.connection_reset.0,
self.tcp_faults.connection_reset.1,
) {
if let TcpConnectionState::Connected {
initiator,
acceptor,
} = self.connection.clone()
{
self.connection = TcpConnectionState::Reset;
let _ = self.events.push(TcpEvent::ConnectionReset {
from: initiator,
to: acceptor,
});
}
}
}
if matches!(self.connection, TcpConnectionState::Reset) {
self.connection = TcpConnectionState::Disconnected;
return Vec::new();
}
self.inner.tick(duration)
}
pub fn now(&self) -> Instant {
self.inner.now()
}
pub fn drain_events(&mut self) -> impl Iterator<Item = TcpEvent> + '_ {
self.events.drain(..)
}
pub fn set_connect_timeout(&mut self, timeout: Duration) {
self.connect_timeout = timeout;
}
pub fn set_faults(&mut self, faults: TcpFaultConfig) {
self.inner.set_faults(faults.message.clone());
self.tcp_faults = faults;
}
pub fn inner_mut(&mut self) -> &mut SimBus<N, Q> {
&mut self.inner
}
}