Skip to main content

ace_sim/
tcp_bus.rs

1//! TCP-aware simulation bus.
2//!
3//! Models a connection-oriented transport between exactly two node addresses. Connection state is
4//! owned by the bus - nodes do not track TCP life-cycle. The bus rejects messages between
5//! disconnected nodes and can inject TCP-level faults independently of message-level faults.
6
7// region: Imports
8
9use heapless::Vec;
10
11use crate::{
12    bus::{Envelope, SimBus},
13    clock::{Duration, Instant},
14    fault::FaultConfig,
15    io::NodeAddress,
16    rng::{Rng, Xorshift64},
17};
18
19// endregion: Imports
20
21// region: TcpFaultConfig
22
23/// TCP-level fault configuration - extends `FaultConfig` with connection-oriented faults.
24///
25/// Applied independently from the message-level faults in `FaultConfig`. This allows modeling a
26/// reliable TCP connection with unreliable message delivery, or a flaky TCP connection that resets
27/// mid-session.
28#[derive(Debug, Clone)]
29pub struct TcpFaultConfig {
30    /// Underlying message-level fault config.
31    pub message: FaultConfig,
32
33    /// Probability that a new connection attempt is refused. Models a gateway that is at capacity
34    /// or unreachable.
35    pub connection_refused: (u32, u32),
36
37    /// Probability that an established connection is reset mid-session. Models ignition off,
38    /// network fault, or gateway restart.
39    pub connection_reset: (u32, u32),
40
41    /// Probability that a connection attempt times out without response.
42    pub connection_timeout: (u32, u32),
43}
44
45impl TcpFaultConfig {
46    pub fn none() -> Self {
47        Self {
48            message: FaultConfig::none(),
49            connection_refused: (0, 1),
50            connection_reset: (0, 1),
51            connection_timeout: (0, 1),
52        }
53    }
54
55    pub fn light() -> Self {
56        Self {
57            message: FaultConfig::light(),
58            connection_refused: (1, 200),
59            connection_reset: (1, 200),
60            connection_timeout: (1, 200),
61        }
62    }
63
64    pub fn chaos() -> Self {
65        Self {
66            message: FaultConfig::chaos(),
67            connection_refused: (1, 20),
68            connection_reset: (1, 20),
69            connection_timeout: (1, 20),
70        }
71    }
72}
73
74// endregion: TcpFaultConfig
75
76// region: TcpConnectionState
77
78/// The state of a TCP connection between two nodes on the bus.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub enum TcpConnectionState {
81    /// No connection exists - messages are rejected.
82    Disconnected,
83
84    /// A connection request is in progress.
85    Connecting {
86        initiated_by: NodeAddress,
87        at: Instant,
88    },
89
90    /// Connection is established - messages flow normally.
91    Connected {
92        initiator: NodeAddress,
93        acceptor: NodeAddress,
94    },
95
96    /// Connection was reset by the bus fault injector. Nodes will observe this as a
97    /// `TcpEvent::ConnectionReset`
98    Reset,
99}
100
101impl TcpConnectionState {
102    pub fn is_connected(&self) -> bool {
103        matches!(self, Self::Connected { .. })
104    }
105}
106
107// endregion: TcpConnectionState
108
109// region: TcpEvent
110
111/// Events the `TcpSimBus` delivers to nodes alongside messages.
112///
113/// Nodes train these via `TcpSimBus::drain_events()` after each tick.
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub enum TcpEvent {
116    /// A connection request from `from` to `to` was accepted.
117    ConnectionEstablished { from: NodeAddress, to: NodeAddress },
118
119    /// A connection request was refused by the bus fault injector.
120    ConnectionRefused { from: NodeAddress, to: NodeAddress },
121
122    /// A connection request timed out
123    ConnectionTimeout { from: NodeAddress, to: NodeAddress },
124
125    /// An established connection was reset by the bus fault injector. Both nodes should threat
126    /// this as a TCP RST
127    ConnectionReset { from: NodeAddress, to: NodeAddress },
128
129    /// The connection was closed cleanly by one of the nodes.
130    ConnectionClosed { from: NodeAddress, to: NodeAddress },
131}
132
133// endregion: TcpEvent
134
135// region: TcpSimBus
136
137/// A TCP-aware simulation bus.
138///
139/// Wraps `SimBus` for message delivery and adds connection state tracking and TCP-level fault
140/// injection on top. The bus owns the connection state - nodes request connections and observe
141/// events but do not track TCP state themselves.
142///
143/// `N` - max message payload bytes
144/// `Q` - max messages in-flight simultaneously
145pub struct TcpSimBus<const N: usize, const Q: usize> {
146    /// Underlying message bus - handles delivery, delays, and message faults.
147    inner: SimBus<N, Q>,
148
149    /// TCP fault configuration.
150    tcp_faults: TcpFaultConfig,
151
152    /// Current connection state between node pairs. For simplicity each bus instance models one
153    /// logical TCP connection between a tester and a gateway. Multi-connection scenarios use
154    /// multiple `TcpSimBus` instances.
155    connection: TcpConnectionState,
156
157    /// Connect timeout duration - if a connecting state persists longer than this without the bus
158    /// processing a tick, it times out.
159    connect_timeout: Duration,
160
161    /// Accumulated TCP events for nodes to drain.
162    events: Vec<TcpEvent, 16>,
163
164    /// Dedicated RNG for TCP-level fault decisions, seeded independently from the message-level
165    /// RNG so fault regimes can be composed freely.
166    rng: Xorshift64,
167}
168
169impl<const N: usize, const Q: usize> TcpSimBus<N, Q> {
170    /// Creates a new `TcpSimBus`.
171    ///
172    /// `seed` - seeds both the message bus RNG and the TCP fault RNG. The TCP RNG uses
173    /// `seed.wrapping_add(1)` so the two RNGs produce independent sequences from the same seed.
174    pub fn new(seed: u64, faults: TcpFaultConfig) -> Self {
175        Self {
176            inner: SimBus::new(seed, faults.message.clone()),
177            tcp_faults: faults,
178            connection: TcpConnectionState::Disconnected,
179            connect_timeout: Duration::from_millis(5_000),
180            events: Vec::new(),
181            rng: Xorshift64::new(seed.wrapping_add(1)),
182        }
183    }
184
185    // region: Connection management
186
187    /// Requests a TCP connection from `from` to `to`.
188    ///
189    /// The connection may be established, refused, or timeout depending on the `TcpFaultConfig`.
190    /// The outcome appears as a `TcpEvent` on the next `tick`.
191    pub fn connect(&mut self, from: NodeAddress, to: NodeAddress) {
192        if self.connection.is_connected() {
193            return;
194        }
195
196        let now = self.inner.now();
197
198        // Fault: connection refused
199        if self.rng.chance(
200            self.tcp_faults.connection_refused.0,
201            self.tcp_faults.connection_refused.1,
202        ) {
203            let _ = self.events.push(TcpEvent::ConnectionRefused {
204                from: from.clone(),
205                to: to.clone(),
206            });
207            return;
208        }
209
210        // Fault: connection timeout - record connecting state, timeout is check in tick()
211        if self.rng.chance(
212            self.tcp_faults.connection_timeout.0,
213            self.tcp_faults.connection_timeout.1,
214        ) {
215            self.connection = TcpConnectionState::Connecting {
216                initiated_by: from,
217                at: now,
218            };
219            return;
220        }
221
222        // Success
223        self.connection = TcpConnectionState::Connected {
224            initiator: from.clone(),
225            acceptor: to.clone(),
226        };
227
228        let _ = self
229            .events
230            .push(TcpEvent::ConnectionEstablished { from, to });
231    }
232
233    /// Closes the connection cleanly from the given node.
234    pub fn disconnect(&mut self, from: NodeAddress, to: NodeAddress) {
235        if self.connection.is_connected() {
236            self.connection = TcpConnectionState::Disconnected;
237            let _ = self.events.push(TcpEvent::ConnectionClosed { from, to });
238        }
239    }
240
241    pub fn connection_state(&self) -> &TcpConnectionState {
242        &self.connection
243    }
244
245    pub fn is_connected(&self) -> bool {
246        self.connection.is_connected()
247    }
248
249    // endregion: Connection management
250
251    // region: Message delivery
252
253    /// Enqueues a message - rejected if the connection is not established.
254    ///
255    /// Returns `true` if the message was accepted, `false` if rejected due to connection state or
256    /// message-level fault injection.
257    pub fn send(&mut self, src: NodeAddress, dst: NodeAddress, data: &[u8]) -> bool {
258        if !self.connection.is_connected() {
259            return false;
260        }
261        self.inner.send(src, dst, data)
262    }
263
264    // endregion: Message delivery
265
266    // region: Tick
267
268    /// Advances simulation time, delivers due messages, and checks connection-level fault
269    /// injection.
270    pub fn tick(&mut self, duration: Duration) -> Vec<Envelope<N>, Q> {
271        let now = self.inner.now();
272
273        // Check connecting timeout
274        if let TcpConnectionState::Connecting { initiated_by, at } = &self.connection.clone() {
275            let elapsed = now.checked_duration_since(*at).unwrap_or(Duration::ZERO);
276
277            if elapsed >= self.connect_timeout {
278                let from = initiated_by.clone();
279
280                self.connection = TcpConnectionState::Disconnected;
281
282                let _ = self.events.push(TcpEvent::ConnectionTimeout {
283                    from: from.clone(),
284                    to: NodeAddress(0),
285                });
286            }
287        }
288
289        // Check mid-session connection reset fault
290        if self.connection.is_connected() {
291            if self.rng.chance(
292                self.tcp_faults.connection_reset.0,
293                self.tcp_faults.connection_reset.1,
294            ) {
295                if let TcpConnectionState::Connected {
296                    initiator,
297                    acceptor,
298                } = self.connection.clone()
299                {
300                    self.connection = TcpConnectionState::Reset;
301
302                    let _ = self.events.push(TcpEvent::ConnectionReset {
303                        from: initiator,
304                        to: acceptor,
305                    });
306                }
307            }
308        }
309
310        // If connection was just reset, clear the queue and return nothing
311        if matches!(self.connection, TcpConnectionState::Reset) {
312            self.connection = TcpConnectionState::Disconnected;
313
314            return Vec::new();
315        }
316
317        self.inner.tick(duration)
318    }
319
320    // endregion: Tick
321
322    // region: Accessors
323
324    pub fn now(&self) -> Instant {
325        self.inner.now()
326    }
327
328    /// Drains accumulated TCP events.
329    pub fn drain_events(&mut self) -> impl Iterator<Item = TcpEvent> + '_ {
330        self.events.drain(..)
331    }
332
333    pub fn set_connect_timeout(&mut self, timeout: Duration) {
334        self.connect_timeout = timeout;
335    }
336
337    pub fn set_faults(&mut self, faults: TcpFaultConfig) {
338        self.inner.set_faults(faults.message.clone());
339        self.tcp_faults = faults;
340    }
341
342    pub fn inner_mut(&mut self) -> &mut SimBus<N, Q> {
343        &mut self.inner
344    }
345
346    // endregion: Accessors
347}
348
349// endregion: TcpSimBus