Skip to main content

ace_sim/
tcp_bus.rs

1//! TCP-aware simulation bus.
2//!
3//! Models a connection-oriented transport between exactly two node addresses. Connection state is
4//! owned by the bus - nodes do not track TCP life-cycle. The bus rejects messages between
5//! disconnected nodes and can inject TCP-level faults independently of message-level faults.
6
7// region: Imports
8
9use heapless::Vec;
10
11use crate::{
12    bus::{Envelope, SimBus},
13    clock::{Duration, Instant},
14    fault::FaultConfig,
15    io::NodeAddress,
16    rng::{Rng, Xorshift64},
17};
18
19// endregion: Imports
20
21// region: TcpFaultConfig
22
23/// TCP-level fault configuration - extends `FaultConfig` with connection-oriented faults.
24///
25/// Applied independently from the message-level faults in `FaultConfig`. This allows modeling a
26/// reliable TCP connection with unreliable message delivery, or a flaky TCP connection that resets
27/// mid-session.
28#[derive(Debug, Clone)]
29pub struct TcpFaultConfig {
30    /// Underlying message-level fault config.
31    pub message: FaultConfig,
32
33    /// Probability that a new connection attempt is refused. Models a gateway that is at capacity
34    /// or unreachable.
35    pub connection_refused: (u32, u32),
36
37    /// Probability that an established connection is reset mid-session. Models ignition off,
38    /// network fault, or gateway restart.
39    pub connection_reset: (u32, u32),
40
41    /// Probability that a connection attempt times out without response.
42    pub connection_timeout: (u32, u32),
43}
44
45impl TcpFaultConfig {
46    pub fn none() -> Self {
47        Self {
48            message: FaultConfig::none(),
49            connection_refused: (0, 1),
50            connection_reset: (0, 1),
51            connection_timeout: (0, 1),
52        }
53    }
54
55    pub fn light() -> Self {
56        Self {
57            message: FaultConfig::light(),
58            connection_refused: (1, 200),
59            connection_reset: (1, 200),
60            connection_timeout: (1, 200),
61        }
62    }
63
64    pub fn chaos() -> Self {
65        Self {
66            message: FaultConfig::chaos(),
67            connection_refused: (1, 20),
68            connection_reset: (1, 20),
69            connection_timeout: (1, 20),
70        }
71    }
72}
73
74// endregion: TcpFaultConfig
75
76// region: TcpConnectionState
77
78/// The state of a TCP connection between two nodes on the bus.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub enum TcpConnectionState {
81    /// No connection exists - messages are rejected.
82    Disconnected,
83
84    /// A connection request is in progress.
85    Connecting {
86        initiated_by: NodeAddress,
87        at: Instant,
88    },
89
90    /// Connection is established - messages flow normally.
91    Connected {
92        initiator: NodeAddress,
93        acceptor: NodeAddress,
94    },
95
96    /// Connection was reset by the bus fault injector. Nodes will observe this as a
97    /// `TcpEvent::ConnectionReset`
98    Reset,
99}
100
101impl TcpConnectionState {
102    pub fn is_connected(&self) -> bool {
103        matches!(self, Self::Connected { .. })
104    }
105}
106
107// endregion: TcpConnectionState
108
109// region: TcpEvent
110
111/// Events the `TcpSimBus` delivers to nodes alongside messages.
112///
113/// Nodes train these via `TcpSimBus::drain_events()` after each tick.
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub enum TcpEvent {
116    /// A connection request from `from` to `to` was accepted.
117    ConnectionEstablished { from: NodeAddress, to: NodeAddress },
118
119    /// A connection request was refused by the bus fault injector.
120    ConnectionRefused { from: NodeAddress, to: NodeAddress },
121
122    /// A connection request timed out
123    ConnectionTimeout { from: NodeAddress, to: NodeAddress },
124
125    /// An established connection was reset by the bus fault injector. Both nodes should threat
126    /// this as a TCP RST
127    ConnectionReset { from: NodeAddress, to: NodeAddress },
128
129    /// The connection was closed cleanly by one of the nodes.
130    ConnectionClosed { from: NodeAddress, to: NodeAddress },
131}
132
133// endregion: TcpEvent
134
135// region: TcpSimBus
136
137/// A TCP-aware simulation bus.
138///
139/// Wraps `SimBus` for message delivery and adds connection state tracking and TCP-level fault
140/// injection on top. The bus owns the connection state - nodes request connections and observe
141/// events but do not track TCP state themselves.
142///
143/// `N` - max message payload bytes
144/// `Q` - max messages in-flight simultaneously
145#[derive(Debug)]
146pub struct TcpSimBus<const N: usize, const Q: usize> {
147    /// Underlying message bus - handles delivery, delays, and message faults.
148    inner: SimBus<N, Q>,
149
150    /// TCP fault configuration.
151    tcp_faults: TcpFaultConfig,
152
153    /// Current connection state between node pairs. For simplicity each bus instance models one
154    /// logical TCP connection between a tester and a gateway. Multi-connection scenarios use
155    /// multiple `TcpSimBus` instances.
156    connection: TcpConnectionState,
157
158    /// Connect timeout duration - if a connecting state persists longer than this without the bus
159    /// processing a tick, it times out.
160    connect_timeout: Duration,
161
162    /// Accumulated TCP events for nodes to drain.
163    events: Vec<TcpEvent, 16>,
164
165    /// Dedicated RNG for TCP-level fault decisions, seeded independently from the message-level
166    /// RNG so fault regimes can be composed freely.
167    rng: Xorshift64,
168}
169
170impl<const N: usize, const Q: usize> TcpSimBus<N, Q> {
171    /// Creates a new `TcpSimBus`.
172    ///
173    /// `seed` - seeds both the message bus RNG and the TCP fault RNG. The TCP RNG uses
174    /// `seed.wrapping_add(1)` so the two RNGs produce independent sequences from the same seed.
175    pub fn new(seed: u64, faults: TcpFaultConfig) -> Self {
176        Self {
177            inner: SimBus::new(seed, faults.message.clone()),
178            tcp_faults: faults,
179            connection: TcpConnectionState::Disconnected,
180            connect_timeout: Duration::from_millis(5_000),
181            events: Vec::new(),
182            rng: Xorshift64::new(seed.wrapping_add(1)),
183        }
184    }
185
186    // region: Connection management
187
188    /// Requests a TCP connection from `from` to `to`.
189    ///
190    /// The connection may be established, refused, or timeout depending on the `TcpFaultConfig`.
191    /// The outcome appears as a `TcpEvent` on the next `tick`.
192    pub fn connect(&mut self, from: NodeAddress, to: NodeAddress) {
193        if self.connection.is_connected() {
194            return;
195        }
196
197        let now = self.inner.now();
198
199        // Fault: connection refused
200        if self.rng.chance(
201            self.tcp_faults.connection_refused.0,
202            self.tcp_faults.connection_refused.1,
203        ) {
204            let _ = self.events.push(TcpEvent::ConnectionRefused {
205                from: from.clone(),
206                to: to.clone(),
207            });
208            return;
209        }
210
211        // Fault: connection timeout - record connecting state, timeout is check in tick()
212        if self.rng.chance(
213            self.tcp_faults.connection_timeout.0,
214            self.tcp_faults.connection_timeout.1,
215        ) {
216            self.connection = TcpConnectionState::Connecting {
217                initiated_by: from,
218                at: now,
219            };
220            return;
221        }
222
223        // Success
224        self.connection = TcpConnectionState::Connected {
225            initiator: from.clone(),
226            acceptor: to.clone(),
227        };
228
229        let _ = self
230            .events
231            .push(TcpEvent::ConnectionEstablished { from, to });
232    }
233
234    /// Closes the connection cleanly from the given node.
235    pub fn disconnect(&mut self, from: NodeAddress, to: NodeAddress) {
236        if self.connection.is_connected() {
237            self.connection = TcpConnectionState::Disconnected;
238            let _ = self.events.push(TcpEvent::ConnectionClosed { from, to });
239        }
240    }
241
242    pub fn connection_state(&self) -> &TcpConnectionState {
243        &self.connection
244    }
245
246    pub fn is_connected(&self) -> bool {
247        self.connection.is_connected()
248    }
249
250    // endregion: Connection management
251
252    // region: Message delivery
253
254    /// Enqueues a message - rejected if the connection is not established.
255    ///
256    /// Returns `true` if the message was accepted, `false` if rejected due to connection state or
257    /// message-level fault injection.
258    pub fn send(&mut self, src: NodeAddress, dst: NodeAddress, data: &[u8]) -> bool {
259        if !self.connection.is_connected() {
260            return false;
261        }
262        self.inner.send(src, dst, data)
263    }
264
265    // endregion: Message delivery
266
267    // region: Tick
268
269    /// Advances simulation time, delivers due messages, and checks connection-level fault
270    /// injection.
271    pub fn tick(&mut self, duration: Duration) -> Vec<Envelope<N>, Q> {
272        let now = self.inner.now();
273
274        // Check connecting timeout
275        if let TcpConnectionState::Connecting { initiated_by, at } = &self.connection.clone() {
276            let elapsed = now.checked_duration_since(*at).unwrap_or(Duration::ZERO);
277
278            if elapsed >= self.connect_timeout {
279                let from = initiated_by.clone();
280
281                self.connection = TcpConnectionState::Disconnected;
282
283                let _ = self.events.push(TcpEvent::ConnectionTimeout {
284                    from: from.clone(),
285                    to: NodeAddress(0),
286                });
287            }
288        }
289
290        // Check mid-session connection reset fault
291        if self.connection.is_connected() {
292            if self.rng.chance(
293                self.tcp_faults.connection_reset.0,
294                self.tcp_faults.connection_reset.1,
295            ) {
296                if let TcpConnectionState::Connected {
297                    initiator,
298                    acceptor,
299                } = self.connection.clone()
300                {
301                    self.connection = TcpConnectionState::Reset;
302
303                    let _ = self.events.push(TcpEvent::ConnectionReset {
304                        from: initiator,
305                        to: acceptor,
306                    });
307                }
308            }
309        }
310
311        // If connection was just reset, clear the queue and return nothing
312        if matches!(self.connection, TcpConnectionState::Reset) {
313            self.connection = TcpConnectionState::Disconnected;
314
315            return Vec::new();
316        }
317
318        self.inner.tick(duration)
319    }
320
321    // endregion: Tick
322
323    // region: Accessors
324
325    pub fn now(&self) -> Instant {
326        self.inner.now()
327    }
328
329    /// Drains accumulated TCP events.
330    pub fn drain_events(&mut self) -> impl Iterator<Item = TcpEvent> + '_ {
331        self.events.drain(..)
332    }
333
334    pub fn set_connect_timeout(&mut self, timeout: Duration) {
335        self.connect_timeout = timeout;
336    }
337
338    pub fn set_faults(&mut self, faults: TcpFaultConfig) {
339        self.inner.set_faults(faults.message.clone());
340        self.tcp_faults = faults;
341    }
342
343    pub fn inner_mut(&mut self) -> &mut SimBus<N, Q> {
344        &mut self.inner
345    }
346
347    // endregion: Accessors
348}
349
350// endregion: TcpSimBus