ace_sim/tcp_bus.rs
1//! TCP-aware simulation bus.
2//!
3//! Models a connection-oriented transport between exactly two node addresses. Connection state is
4//! owned by the bus - nodes do not track TCP life-cycle. The bus rejects messages between
5//! disconnected nodes and can inject TCP-level faults independently of message-level faults.
6
7// region: Imports
8
9use heapless::Vec;
10
11use crate::{
12 bus::{Envelope, SimBus},
13 clock::{Duration, Instant},
14 fault::FaultConfig,
15 io::NodeAddress,
16 rng::{Rng, Xorshift64},
17};
18
19// endregion: Imports
20
21// region: TcpFaultConfig
22
23/// TCP-level fault configuration - extends `FaultConfig` with connection-oriented faults.
24///
25/// Applied independently from the message-level faults in `FaultConfig`. This allows modeling a
26/// reliable TCP connection with unreliable message delivery, or a flaky TCP connection that resets
27/// mid-session.
28#[derive(Debug, Clone)]
29pub struct TcpFaultConfig {
30 /// Underlying message-level fault config.
31 pub message: FaultConfig,
32
33 /// Probability that a new connection attempt is refused. Models a gateway that is at capacity
34 /// or unreachable.
35 pub connection_refused: (u32, u32),
36
37 /// Probability that an established connection is reset mid-session. Models ignition off,
38 /// network fault, or gateway restart.
39 pub connection_reset: (u32, u32),
40
41 /// Probability that a connection attempt times out without response.
42 pub connection_timeout: (u32, u32),
43}
44
45impl TcpFaultConfig {
46 pub fn none() -> Self {
47 Self {
48 message: FaultConfig::none(),
49 connection_refused: (0, 1),
50 connection_reset: (0, 1),
51 connection_timeout: (0, 1),
52 }
53 }
54
55 pub fn light() -> Self {
56 Self {
57 message: FaultConfig::light(),
58 connection_refused: (1, 200),
59 connection_reset: (1, 200),
60 connection_timeout: (1, 200),
61 }
62 }
63
64 pub fn chaos() -> Self {
65 Self {
66 message: FaultConfig::chaos(),
67 connection_refused: (1, 20),
68 connection_reset: (1, 20),
69 connection_timeout: (1, 20),
70 }
71 }
72}
73
74// endregion: TcpFaultConfig
75
76// region: TcpConnectionState
77
78/// The state of a TCP connection between two nodes on the bus.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub enum TcpConnectionState {
81 /// No connection exists - messages are rejected.
82 Disconnected,
83
84 /// A connection request is in progress.
85 Connecting {
86 initiated_by: NodeAddress,
87 at: Instant,
88 },
89
90 /// Connection is established - messages flow normally.
91 Connected {
92 initiator: NodeAddress,
93 acceptor: NodeAddress,
94 },
95
96 /// Connection was reset by the bus fault injector. Nodes will observe this as a
97 /// `TcpEvent::ConnectionReset`
98 Reset,
99}
100
101impl TcpConnectionState {
102 pub fn is_connected(&self) -> bool {
103 matches!(self, Self::Connected { .. })
104 }
105}
106
107// endregion: TcpConnectionState
108
109// region: TcpEvent
110
111/// Events the `TcpSimBus` delivers to nodes alongside messages.
112///
113/// Nodes train these via `TcpSimBus::drain_events()` after each tick.
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub enum TcpEvent {
116 /// A connection request from `from` to `to` was accepted.
117 ConnectionEstablished { from: NodeAddress, to: NodeAddress },
118
119 /// A connection request was refused by the bus fault injector.
120 ConnectionRefused { from: NodeAddress, to: NodeAddress },
121
122 /// A connection request timed out
123 ConnectionTimeout { from: NodeAddress, to: NodeAddress },
124
125 /// An established connection was reset by the bus fault injector. Both nodes should threat
126 /// this as a TCP RST
127 ConnectionReset { from: NodeAddress, to: NodeAddress },
128
129 /// The connection was closed cleanly by one of the nodes.
130 ConnectionClosed { from: NodeAddress, to: NodeAddress },
131}
132
133// endregion: TcpEvent
134
135// region: TcpSimBus
136
137/// A TCP-aware simulation bus.
138///
139/// Wraps `SimBus` for message delivery and adds connection state tracking and TCP-level fault
140/// injection on top. The bus owns the connection state - nodes request connections and observe
141/// events but do not track TCP state themselves.
142///
143/// `N` - max message payload bytes
144/// `Q` - max messages in-flight simultaneously
145pub struct TcpSimBus<const N: usize, const Q: usize> {
146 /// Underlying message bus - handles delivery, delays, and message faults.
147 inner: SimBus<N, Q>,
148
149 /// TCP fault configuration.
150 tcp_faults: TcpFaultConfig,
151
152 /// Current connection state between node pairs. For simplicity each bus instance models one
153 /// logical TCP connection between a tester and a gateway. Multi-connection scenarios use
154 /// multiple `TcpSimBus` instances.
155 connection: TcpConnectionState,
156
157 /// Connect timeout duration - if a connecting state persists longer than this without the bus
158 /// processing a tick, it times out.
159 connect_timeout: Duration,
160
161 /// Accumulated TCP events for nodes to drain.
162 events: Vec<TcpEvent, 16>,
163
164 /// Dedicated RNG for TCP-level fault decisions, seeded independently from the message-level
165 /// RNG so fault regimes can be composed freely.
166 rng: Xorshift64,
167}
168
169impl<const N: usize, const Q: usize> TcpSimBus<N, Q> {
170 /// Creates a new `TcpSimBus`.
171 ///
172 /// `seed` - seeds both the message bus RNG and the TCP fault RNG. The TCP RNG uses
173 /// `seed.wrapping_add(1)` so the two RNGs produce independent sequences from the same seed.
174 pub fn new(seed: u64, faults: TcpFaultConfig) -> Self {
175 Self {
176 inner: SimBus::new(seed, faults.message.clone()),
177 tcp_faults: faults,
178 connection: TcpConnectionState::Disconnected,
179 connect_timeout: Duration::from_millis(5_000),
180 events: Vec::new(),
181 rng: Xorshift64::new(seed.wrapping_add(1)),
182 }
183 }
184
185 // region: Connection management
186
187 /// Requests a TCP connection from `from` to `to`.
188 ///
189 /// The connection may be established, refused, or timeout depending on the `TcpFaultConfig`.
190 /// The outcome appears as a `TcpEvent` on the next `tick`.
191 pub fn connect(&mut self, from: NodeAddress, to: NodeAddress) {
192 if self.connection.is_connected() {
193 return;
194 }
195
196 let now = self.inner.now();
197
198 // Fault: connection refused
199 if self.rng.chance(
200 self.tcp_faults.connection_refused.0,
201 self.tcp_faults.connection_refused.1,
202 ) {
203 let _ = self.events.push(TcpEvent::ConnectionRefused {
204 from: from.clone(),
205 to: to.clone(),
206 });
207 return;
208 }
209
210 // Fault: connection timeout - record connecting state, timeout is check in tick()
211 if self.rng.chance(
212 self.tcp_faults.connection_timeout.0,
213 self.tcp_faults.connection_timeout.1,
214 ) {
215 self.connection = TcpConnectionState::Connecting {
216 initiated_by: from,
217 at: now,
218 };
219 return;
220 }
221
222 // Success
223 self.connection = TcpConnectionState::Connected {
224 initiator: from.clone(),
225 acceptor: to.clone(),
226 };
227
228 let _ = self
229 .events
230 .push(TcpEvent::ConnectionEstablished { from, to });
231 }
232
233 /// Closes the connection cleanly from the given node.
234 pub fn disconnect(&mut self, from: NodeAddress, to: NodeAddress) {
235 if self.connection.is_connected() {
236 self.connection = TcpConnectionState::Disconnected;
237 let _ = self.events.push(TcpEvent::ConnectionClosed { from, to });
238 }
239 }
240
241 pub fn connection_state(&self) -> &TcpConnectionState {
242 &self.connection
243 }
244
245 pub fn is_connected(&self) -> bool {
246 self.connection.is_connected()
247 }
248
249 // endregion: Connection management
250
251 // region: Message delivery
252
253 /// Enqueues a message - rejected if the connection is not established.
254 ///
255 /// Returns `true` if the message was accepted, `false` if rejected due to connection state or
256 /// message-level fault injection.
257 pub fn send(&mut self, src: NodeAddress, dst: NodeAddress, data: &[u8]) -> bool {
258 if !self.connection.is_connected() {
259 return false;
260 }
261 self.inner.send(src, dst, data)
262 }
263
264 // endregion: Message delivery
265
266 // region: Tick
267
268 /// Advances simulation time, delivers due messages, and checks connection-level fault
269 /// injection.
270 pub fn tick(&mut self, duration: Duration) -> Vec<Envelope<N>, Q> {
271 let now = self.inner.now();
272
273 // Check connecting timeout
274 if let TcpConnectionState::Connecting { initiated_by, at } = &self.connection.clone() {
275 let elapsed = now.checked_duration_since(*at).unwrap_or(Duration::ZERO);
276
277 if elapsed >= self.connect_timeout {
278 let from = initiated_by.clone();
279
280 self.connection = TcpConnectionState::Disconnected;
281
282 let _ = self.events.push(TcpEvent::ConnectionTimeout {
283 from: from.clone(),
284 to: NodeAddress(0),
285 });
286 }
287 }
288
289 // Check mid-session connection reset fault
290 if self.connection.is_connected() {
291 if self.rng.chance(
292 self.tcp_faults.connection_reset.0,
293 self.tcp_faults.connection_reset.1,
294 ) {
295 if let TcpConnectionState::Connected {
296 initiator,
297 acceptor,
298 } = self.connection.clone()
299 {
300 self.connection = TcpConnectionState::Reset;
301
302 let _ = self.events.push(TcpEvent::ConnectionReset {
303 from: initiator,
304 to: acceptor,
305 });
306 }
307 }
308 }
309
310 // If connection was just reset, clear the queue and return nothing
311 if matches!(self.connection, TcpConnectionState::Reset) {
312 self.connection = TcpConnectionState::Disconnected;
313
314 return Vec::new();
315 }
316
317 self.inner.tick(duration)
318 }
319
320 // endregion: Tick
321
322 // region: Accessors
323
324 pub fn now(&self) -> Instant {
325 self.inner.now()
326 }
327
328 /// Drains accumulated TCP events.
329 pub fn drain_events(&mut self) -> impl Iterator<Item = TcpEvent> + '_ {
330 self.events.drain(..)
331 }
332
333 pub fn set_connect_timeout(&mut self, timeout: Duration) {
334 self.connect_timeout = timeout;
335 }
336
337 pub fn set_faults(&mut self, faults: TcpFaultConfig) {
338 self.inner.set_faults(faults.message.clone());
339 self.tcp_faults = faults;
340 }
341
342 pub fn inner_mut(&mut self) -> &mut SimBus<N, Q> {
343 &mut self.inner
344 }
345
346 // endregion: Accessors
347}
348
349// endregion: TcpSimBus