ace_sim/tcp_bus.rs
1//! TCP-aware simulation bus.
2//!
3//! Models a connection-oriented transport between exactly two node addresses. Connection state is
4//! owned by the bus - nodes do not track TCP life-cycle. The bus rejects messages between
5//! disconnected nodes and can inject TCP-level faults independently of message-level faults.
6
7// region: Imports
8
9use heapless::Vec;
10
11use crate::{
12 bus::{Envelope, SimBus},
13 clock::{Duration, Instant},
14 fault::FaultConfig,
15 io::NodeAddress,
16 rng::{Rng, Xorshift64},
17};
18
19// endregion: Imports
20
21// region: TcpFaultConfig
22
23/// TCP-level fault configuration - extends `FaultConfig` with connection-oriented faults.
24///
25/// Applied independently from the message-level faults in `FaultConfig`. This allows modeling a
26/// reliable TCP connection with unreliable message delivery, or a flaky TCP connection that resets
27/// mid-session.
28#[derive(Debug, Clone)]
29pub struct TcpFaultConfig {
30 /// Underlying message-level fault config.
31 pub message: FaultConfig,
32
33 /// Probability that a new connection attempt is refused. Models a gateway that is at capacity
34 /// or unreachable.
35 pub connection_refused: (u32, u32),
36
37 /// Probability that an established connection is reset mid-session. Models ignition off,
38 /// network fault, or gateway restart.
39 pub connection_reset: (u32, u32),
40
41 /// Probability that a connection attempt times out without response.
42 pub connection_timeout: (u32, u32),
43}
44
45impl TcpFaultConfig {
46 pub fn none() -> Self {
47 Self {
48 message: FaultConfig::none(),
49 connection_refused: (0, 1),
50 connection_reset: (0, 1),
51 connection_timeout: (0, 1),
52 }
53 }
54
55 pub fn light() -> Self {
56 Self {
57 message: FaultConfig::light(),
58 connection_refused: (1, 200),
59 connection_reset: (1, 200),
60 connection_timeout: (1, 200),
61 }
62 }
63
64 pub fn chaos() -> Self {
65 Self {
66 message: FaultConfig::chaos(),
67 connection_refused: (1, 20),
68 connection_reset: (1, 20),
69 connection_timeout: (1, 20),
70 }
71 }
72}
73
74// endregion: TcpFaultConfig
75
76// region: TcpConnectionState
77
78/// The state of a TCP connection between two nodes on the bus.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub enum TcpConnectionState {
81 /// No connection exists - messages are rejected.
82 Disconnected,
83
84 /// A connection request is in progress.
85 Connecting {
86 initiated_by: NodeAddress,
87 at: Instant,
88 },
89
90 /// Connection is established - messages flow normally.
91 Connected {
92 initiator: NodeAddress,
93 acceptor: NodeAddress,
94 },
95
96 /// Connection was reset by the bus fault injector. Nodes will observe this as a
97 /// `TcpEvent::ConnectionReset`
98 Reset,
99}
100
101impl TcpConnectionState {
102 pub fn is_connected(&self) -> bool {
103 matches!(self, Self::Connected { .. })
104 }
105}
106
107// endregion: TcpConnectionState
108
109// region: TcpEvent
110
111/// Events the `TcpSimBus` delivers to nodes alongside messages.
112///
113/// Nodes train these via `TcpSimBus::drain_events()` after each tick.
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub enum TcpEvent {
116 /// A connection request from `from` to `to` was accepted.
117 ConnectionEstablished { from: NodeAddress, to: NodeAddress },
118
119 /// A connection request was refused by the bus fault injector.
120 ConnectionRefused { from: NodeAddress, to: NodeAddress },
121
122 /// A connection request timed out
123 ConnectionTimeout { from: NodeAddress, to: NodeAddress },
124
125 /// An established connection was reset by the bus fault injector. Both nodes should threat
126 /// this as a TCP RST
127 ConnectionReset { from: NodeAddress, to: NodeAddress },
128
129 /// The connection was closed cleanly by one of the nodes.
130 ConnectionClosed { from: NodeAddress, to: NodeAddress },
131}
132
133// endregion: TcpEvent
134
135// region: TcpSimBus
136
137/// A TCP-aware simulation bus.
138///
139/// Wraps `SimBus` for message delivery and adds connection state tracking and TCP-level fault
140/// injection on top. The bus owns the connection state - nodes request connections and observe
141/// events but do not track TCP state themselves.
142///
143/// `N` - max message payload bytes
144/// `Q` - max messages in-flight simultaneously
145#[derive(Debug)]
146pub struct TcpSimBus<const N: usize, const Q: usize> {
147 /// Underlying message bus - handles delivery, delays, and message faults.
148 inner: SimBus<N, Q>,
149
150 /// TCP fault configuration.
151 tcp_faults: TcpFaultConfig,
152
153 /// Current connection state between node pairs. For simplicity each bus instance models one
154 /// logical TCP connection between a tester and a gateway. Multi-connection scenarios use
155 /// multiple `TcpSimBus` instances.
156 connection: TcpConnectionState,
157
158 /// Connect timeout duration - if a connecting state persists longer than this without the bus
159 /// processing a tick, it times out.
160 connect_timeout: Duration,
161
162 /// Accumulated TCP events for nodes to drain.
163 events: Vec<TcpEvent, 16>,
164
165 /// Dedicated RNG for TCP-level fault decisions, seeded independently from the message-level
166 /// RNG so fault regimes can be composed freely.
167 rng: Xorshift64,
168}
169
170impl<const N: usize, const Q: usize> TcpSimBus<N, Q> {
171 /// Creates a new `TcpSimBus`.
172 ///
173 /// `seed` - seeds both the message bus RNG and the TCP fault RNG. The TCP RNG uses
174 /// `seed.wrapping_add(1)` so the two RNGs produce independent sequences from the same seed.
175 pub fn new(seed: u64, faults: TcpFaultConfig) -> Self {
176 Self {
177 inner: SimBus::new(seed, faults.message.clone()),
178 tcp_faults: faults,
179 connection: TcpConnectionState::Disconnected,
180 connect_timeout: Duration::from_millis(5_000),
181 events: Vec::new(),
182 rng: Xorshift64::new(seed.wrapping_add(1)),
183 }
184 }
185
186 // region: Connection management
187
188 /// Requests a TCP connection from `from` to `to`.
189 ///
190 /// The connection may be established, refused, or timeout depending on the `TcpFaultConfig`.
191 /// The outcome appears as a `TcpEvent` on the next `tick`.
192 pub fn connect(&mut self, from: NodeAddress, to: NodeAddress) {
193 if self.connection.is_connected() {
194 return;
195 }
196
197 let now = self.inner.now();
198
199 // Fault: connection refused
200 if self.rng.chance(
201 self.tcp_faults.connection_refused.0,
202 self.tcp_faults.connection_refused.1,
203 ) {
204 let _ = self.events.push(TcpEvent::ConnectionRefused {
205 from: from.clone(),
206 to: to.clone(),
207 });
208 return;
209 }
210
211 // Fault: connection timeout - record connecting state, timeout is check in tick()
212 if self.rng.chance(
213 self.tcp_faults.connection_timeout.0,
214 self.tcp_faults.connection_timeout.1,
215 ) {
216 self.connection = TcpConnectionState::Connecting {
217 initiated_by: from,
218 at: now,
219 };
220 return;
221 }
222
223 // Success
224 self.connection = TcpConnectionState::Connected {
225 initiator: from.clone(),
226 acceptor: to.clone(),
227 };
228
229 let _ = self
230 .events
231 .push(TcpEvent::ConnectionEstablished { from, to });
232 }
233
234 /// Closes the connection cleanly from the given node.
235 pub fn disconnect(&mut self, from: NodeAddress, to: NodeAddress) {
236 if self.connection.is_connected() {
237 self.connection = TcpConnectionState::Disconnected;
238 let _ = self.events.push(TcpEvent::ConnectionClosed { from, to });
239 }
240 }
241
242 pub fn connection_state(&self) -> &TcpConnectionState {
243 &self.connection
244 }
245
246 pub fn is_connected(&self) -> bool {
247 self.connection.is_connected()
248 }
249
250 // endregion: Connection management
251
252 // region: Message delivery
253
254 /// Enqueues a message - rejected if the connection is not established.
255 ///
256 /// Returns `true` if the message was accepted, `false` if rejected due to connection state or
257 /// message-level fault injection.
258 pub fn send(&mut self, src: NodeAddress, dst: NodeAddress, data: &[u8]) -> bool {
259 if !self.connection.is_connected() {
260 return false;
261 }
262 self.inner.send(src, dst, data)
263 }
264
265 // endregion: Message delivery
266
267 // region: Tick
268
269 /// Advances simulation time, delivers due messages, and checks connection-level fault
270 /// injection.
271 pub fn tick(&mut self, duration: Duration) -> Vec<Envelope<N>, Q> {
272 let now = self.inner.now();
273
274 // Check connecting timeout
275 if let TcpConnectionState::Connecting { initiated_by, at } = &self.connection.clone() {
276 let elapsed = now.checked_duration_since(*at).unwrap_or(Duration::ZERO);
277
278 if elapsed >= self.connect_timeout {
279 let from = initiated_by.clone();
280
281 self.connection = TcpConnectionState::Disconnected;
282
283 let _ = self.events.push(TcpEvent::ConnectionTimeout {
284 from: from.clone(),
285 to: NodeAddress(0),
286 });
287 }
288 }
289
290 // Check mid-session connection reset fault
291 if self.connection.is_connected() {
292 if self.rng.chance(
293 self.tcp_faults.connection_reset.0,
294 self.tcp_faults.connection_reset.1,
295 ) {
296 if let TcpConnectionState::Connected {
297 initiator,
298 acceptor,
299 } = self.connection.clone()
300 {
301 self.connection = TcpConnectionState::Reset;
302
303 let _ = self.events.push(TcpEvent::ConnectionReset {
304 from: initiator,
305 to: acceptor,
306 });
307 }
308 }
309 }
310
311 // If connection was just reset, clear the queue and return nothing
312 if matches!(self.connection, TcpConnectionState::Reset) {
313 self.connection = TcpConnectionState::Disconnected;
314
315 return Vec::new();
316 }
317
318 self.inner.tick(duration)
319 }
320
321 // endregion: Tick
322
323 // region: Accessors
324
325 pub fn now(&self) -> Instant {
326 self.inner.now()
327 }
328
329 /// Drains accumulated TCP events.
330 pub fn drain_events(&mut self) -> impl Iterator<Item = TcpEvent> + '_ {
331 self.events.drain(..)
332 }
333
334 pub fn set_connect_timeout(&mut self, timeout: Duration) {
335 self.connect_timeout = timeout;
336 }
337
338 pub fn set_faults(&mut self, faults: TcpFaultConfig) {
339 self.inner.set_faults(faults.message.clone());
340 self.tcp_faults = faults;
341 }
342
343 pub fn inner_mut(&mut self) -> &mut SimBus<N, Q> {
344 &mut self.inner
345 }
346
347 // endregion: Accessors
348}
349
350// endregion: TcpSimBus