telltale_runtime/testing/
transport.rs1use async_trait::async_trait;
7use parking_lot::Mutex;
8use std::collections::{BTreeMap, VecDeque};
9use std::sync::Arc;
10use telltale_types::FixedQ32;
11
12use super::envelope::ProtocolEnvelope;
13use crate::identifiers::RoleName;
14
15type MessageQueues = Arc<Mutex<BTreeMap<(RoleName, RoleName), VecDeque<ProtocolEnvelope>>>>;
18
19#[derive(Debug, thiserror::Error)]
21pub enum TransportError {
22 #[error("destination unreachable: {0}")]
24 Unreachable(String),
25
26 #[error("no message available from {0}")]
28 NoMessage(String),
29
30 #[error("channel closed")]
32 ChannelClosed,
33
34 #[error("serialization error: {0}")]
36 Serialization(String),
37
38 #[error("timeout waiting for message")]
40 Timeout,
41
42 #[error("role not set on transport")]
44 RoleNotSet,
45
46 #[error("transport error: {0}")]
48 Other(String),
49}
50
51pub type TransportResult<T> = Result<T, TransportError>;
53
54pub trait SimulatedTransport: Send {
59 fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()>;
61
62 fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope>;
66
67 fn peek(&self, from: &RoleName) -> bool;
69
70 fn pending_messages(&self) -> Vec<&ProtocolEnvelope>;
72}
73
74#[async_trait]
78pub trait AsyncSimulatedTransport: Send + Sync {
79 async fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()>;
81
82 async fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope>;
84
85 fn try_recv(&mut self, from: &RoleName) -> TransportResult<Option<ProtocolEnvelope>>;
87
88 fn has_message(&self, from: &RoleName) -> bool;
90}
91
92#[derive(Debug, Default)]
96pub struct InMemoryTransport {
97 role: Option<RoleName>,
99 queues: MessageQueues,
101}
102
103impl InMemoryTransport {
104 #[must_use]
106 pub fn new() -> Self {
107 Self {
108 role: None,
109 queues: Arc::new(Mutex::new(BTreeMap::new())),
110 }
111 }
112
113 #[must_use]
117 pub fn with_shared_queues(queues: MessageQueues) -> Self {
118 Self { role: None, queues }
119 }
120
121 pub fn set_role(&mut self, role: RoleName) {
123 self.role = Some(role);
124 }
125
126 #[must_use]
128 pub fn role(&self) -> Option<&RoleName> {
129 self.role.as_ref()
130 }
131
132 fn queue_key(from: &RoleName, to: &RoleName) -> (RoleName, RoleName) {
134 (from.clone(), to.clone())
135 }
136
137 #[must_use]
139 pub fn all_messages(&self) -> Vec<ProtocolEnvelope> {
140 let queues = self.queues.lock();
141 queues.values().flatten().cloned().collect()
142 }
143
144 pub fn clear(&mut self) {
146 let mut queues = self.queues.lock();
147 queues.clear();
148 }
149
150 #[must_use]
152 pub fn pending_count(&self) -> usize {
153 let queues = self.queues.lock();
154 queues.values().map(|q| q.len()).sum()
155 }
156}
157
158impl Clone for InMemoryTransport {
159 fn clone(&self) -> Self {
160 Self {
161 role: self.role.clone(),
162 queues: Arc::clone(&self.queues),
163 }
164 }
165}
166
167impl SimulatedTransport for InMemoryTransport {
168 fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
169 let from = self.role.as_ref().ok_or(TransportError::RoleNotSet)?;
170 let key = Self::queue_key(from, to);
171
172 let mut queues = self.queues.lock();
173 queues.entry(key).or_default().push_back(envelope);
174 Ok(())
175 }
176
177 fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
178 let to = self.role.as_ref().ok_or(TransportError::RoleNotSet)?;
179 let key = Self::queue_key(from, to);
180
181 let mut queues = self.queues.lock();
182 queues
183 .get_mut(&key)
184 .and_then(|q| q.pop_front())
185 .ok_or_else(|| TransportError::NoMessage(from.to_string()))
186 }
187
188 fn peek(&self, from: &RoleName) -> bool {
189 let Some(to) = self.role.as_ref() else {
190 return false;
191 };
192 let key = Self::queue_key(from, to);
193
194 let queues = self.queues.lock();
195 queues.get(&key).is_some_and(|q| !q.is_empty())
196 }
197
198 fn pending_messages(&self) -> Vec<&ProtocolEnvelope> {
199 Vec::new()
202 }
203}
204
205#[async_trait]
206impl AsyncSimulatedTransport for InMemoryTransport {
207 async fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
208 SimulatedTransport::send(self, to, envelope)
209 }
210
211 async fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
212 SimulatedTransport::recv(self, from)
214 }
215
216 fn try_recv(&mut self, from: &RoleName) -> TransportResult<Option<ProtocolEnvelope>> {
217 match SimulatedTransport::recv(self, from) {
218 Ok(env) => Ok(Some(env)),
219 Err(TransportError::NoMessage(_)) => Ok(None),
220 Err(e) => Err(e),
221 }
222 }
223
224 fn has_message(&self, from: &RoleName) -> bool {
225 self.peek(from)
226 }
227}
228
229pub struct FaultyTransport<T> {
231 inner: T,
232 drop_rate: FixedQ32,
234 delay: bool,
236 seed: u64,
238 rng_state: u64,
240}
241
242impl<T> FaultyTransport<T> {
243 pub fn new(inner: T) -> Self {
245 Self {
246 inner,
247 drop_rate: FixedQ32::zero(),
248 delay: false,
249 seed: 12345,
250 rng_state: 12345,
251 }
252 }
253
254 pub fn with_drop_rate(mut self, rate: FixedQ32) -> Self {
256 self.drop_rate = rate.clamp(FixedQ32::zero(), FixedQ32::one());
257 self
258 }
259
260 pub fn with_delays(mut self) -> Self {
262 self.delay = true;
263 self
264 }
265
266 pub fn with_seed(mut self, seed: u64) -> Self {
268 self.seed = seed;
269 self.rng_state = seed;
270 self
271 }
272
273 fn random_float(&mut self) -> FixedQ32 {
275 self.rng_state ^= self.rng_state << 13;
277 self.rng_state ^= self.rng_state >> 7;
278 self.rng_state ^= self.rng_state << 17;
279 let frac_bits = i64::try_from(self.rng_state >> 32).unwrap_or(i64::MAX);
282 FixedQ32::from_bits(frac_bits)
283 }
284
285 fn should_drop(&mut self) -> bool {
287 self.random_float() < self.drop_rate
288 }
289}
290
291impl<T: SimulatedTransport> SimulatedTransport for FaultyTransport<T> {
292 fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
293 if self.should_drop() {
294 return Ok(());
296 }
297 self.inner.send(to, envelope)
298 }
299
300 fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
301 self.inner.recv(from)
302 }
303
304 fn peek(&self, from: &RoleName) -> bool {
305 self.inner.peek(from)
306 }
307
308 fn pending_messages(&self) -> Vec<&ProtocolEnvelope> {
309 self.inner.pending_messages()
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 fn make_envelope(from: &str, to: &str) -> ProtocolEnvelope {
318 ProtocolEnvelope::builder()
319 .protocol("Test")
320 .sender(RoleName::new(from).unwrap())
321 .recipient(RoleName::new(to).unwrap())
322 .message_type("Msg")
323 .payload(vec![1, 2, 3])
324 .build()
325 .unwrap()
326 }
327
328 #[test]
329 fn test_in_memory_transport() {
330 let queues = Arc::new(Mutex::new(BTreeMap::new()));
331
332 let mut client = InMemoryTransport::with_shared_queues(Arc::clone(&queues));
333 client.set_role(RoleName::from_static("Client"));
334
335 let mut server = InMemoryTransport::with_shared_queues(Arc::clone(&queues));
336 server.set_role(RoleName::from_static("Server"));
337
338 let env = make_envelope("Client", "Server");
340 let server_role = RoleName::from_static("Server");
341 SimulatedTransport::send(&mut client, &server_role, env).unwrap();
342
343 let client_role = RoleName::from_static("Client");
345 assert!(server.peek(&client_role));
346 let received = SimulatedTransport::recv(&mut server, &client_role).unwrap();
347 assert_eq!(received.from_role.as_str(), "Client");
348 assert_eq!(received.to_role.as_str(), "Server");
349 }
350
351 #[test]
352 fn test_no_message_error() {
353 let mut transport = InMemoryTransport::new();
354 transport.set_role(RoleName::from_static("Client"));
355
356 let server_role = RoleName::from_static("Server");
357 let result = SimulatedTransport::recv(&mut transport, &server_role);
358 assert!(matches!(result, Err(TransportError::NoMessage(_))));
359 }
360
361 #[test]
362 fn test_faulty_transport_drops() {
363 let inner = InMemoryTransport::new();
364 let mut faulty = FaultyTransport::new(inner)
365 .with_drop_rate(FixedQ32::one()) .with_seed(42);
367
368 faulty.inner.set_role(RoleName::from_static("Client"));
369
370 let env = make_envelope("Client", "Server");
371 let server_role = RoleName::from_static("Server");
372 faulty.send(&server_role, env).unwrap();
373
374 assert_eq!(faulty.inner.pending_count(), 0);
376 }
377}