Skip to main content

telltale_runtime/testing/
transport.rs

1//! Simulated transport traits for protocol execution
2//!
3//! These traits abstract message transport for simulation and testing,
4//! allowing custom delivery semantics (delays, reordering, failures).
5
6use async_trait::async_trait;
7use parking_lot::Mutex;
8use std::collections::{BTreeMap, VecDeque};
9use std::sync::Arc;
10use telltale_types::FixedQ32;
11
12use super::envelope::ProtocolEnvelope;
13use crate::identifiers::RoleName;
14
15/// Type alias for the message queue storage shared between transports.
16/// Uses BTreeMap for deterministic iteration order in simulation.
17type MessageQueues = Arc<Mutex<BTreeMap<(RoleName, RoleName), VecDeque<ProtocolEnvelope>>>>;
18
19/// Errors that can occur during transport operations.
20#[derive(Debug, thiserror::Error)]
21pub enum TransportError {
22    /// The destination is not reachable.
23    #[error("destination unreachable: {0}")]
24    Unreachable(String),
25
26    /// No message available for receive.
27    #[error("no message available from {0}")]
28    NoMessage(String),
29
30    /// The channel is closed.
31    #[error("channel closed")]
32    ChannelClosed,
33
34    /// Serialization/deserialization error.
35    #[error("serialization error: {0}")]
36    Serialization(String),
37
38    /// Timeout waiting for message.
39    #[error("timeout waiting for message")]
40    Timeout,
41
42    /// Role was not set on the transport before use.
43    #[error("role not set on transport")]
44    RoleNotSet,
45
46    /// Generic transport error.
47    #[error("transport error: {0}")]
48    Other(String),
49}
50
51/// Result type for transport operations.
52pub type TransportResult<T> = Result<T, TransportError>;
53
54/// Synchronous simulated transport trait.
55///
56/// This trait is used for step-by-step simulation where the simulator
57/// controls message delivery timing.
58pub trait SimulatedTransport: Send {
59    /// Send a message to a destination.
60    fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()>;
61
62    /// Receive a message from a source.
63    ///
64    /// Returns `Err(TransportError::NoMessage)` if no message is available.
65    fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope>;
66
67    /// Check if a message is available from a source without consuming it.
68    fn peek(&self, from: &RoleName) -> bool;
69
70    /// Get all pending messages (for debugging/inspection).
71    fn pending_messages(&self) -> Vec<&ProtocolEnvelope>;
72}
73
74/// Asynchronous simulated transport trait.
75///
76/// This trait is used for async protocol execution with simulated transport.
77#[async_trait]
78pub trait AsyncSimulatedTransport: Send + Sync {
79    /// Send a message to a destination.
80    async fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()>;
81
82    /// Receive a message from a source, waiting if necessary.
83    async fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope>;
84
85    /// Try to receive a message without blocking.
86    fn try_recv(&mut self, from: &RoleName) -> TransportResult<Option<ProtocolEnvelope>>;
87
88    /// Check if a message is available from a source.
89    fn has_message(&self, from: &RoleName) -> bool;
90}
91
92/// In-memory transport for testing.
93///
94/// Messages are delivered in FIFO order per sender-receiver pair.
95#[derive(Debug, Default)]
96pub struct InMemoryTransport {
97    /// Current role using this transport.
98    role: Option<RoleName>,
99    /// Message queues: (from_role, to_role) -> queue.
100    queues: MessageQueues,
101}
102
103impl InMemoryTransport {
104    /// Create a new in-memory transport.
105    #[must_use]
106    pub fn new() -> Self {
107        Self {
108            role: None,
109            queues: Arc::new(Mutex::new(BTreeMap::new())),
110        }
111    }
112
113    /// Create a new transport with shared queues.
114    ///
115    /// Multiple transports sharing queues can communicate with each other.
116    #[must_use]
117    pub fn with_shared_queues(queues: MessageQueues) -> Self {
118        Self { role: None, queues }
119    }
120
121    /// Set the role for this transport.
122    pub fn set_role(&mut self, role: RoleName) {
123        self.role = Some(role);
124    }
125
126    /// Get the current role.
127    #[must_use]
128    pub fn role(&self) -> Option<&RoleName> {
129        self.role.as_ref()
130    }
131
132    /// Get the queue key for a sender-receiver pair.
133    fn queue_key(from: &RoleName, to: &RoleName) -> (RoleName, RoleName) {
134        (from.clone(), to.clone())
135    }
136
137    /// Get all messages in transit (for debugging).
138    #[must_use]
139    pub fn all_messages(&self) -> Vec<ProtocolEnvelope> {
140        let queues = self.queues.lock();
141        queues.values().flatten().cloned().collect()
142    }
143
144    /// Clear all message queues.
145    pub fn clear(&mut self) {
146        let mut queues = self.queues.lock();
147        queues.clear();
148    }
149
150    /// Get the number of pending messages.
151    #[must_use]
152    pub fn pending_count(&self) -> usize {
153        let queues = self.queues.lock();
154        queues.values().map(|q| q.len()).sum()
155    }
156}
157
158impl Clone for InMemoryTransport {
159    fn clone(&self) -> Self {
160        Self {
161            role: self.role.clone(),
162            queues: Arc::clone(&self.queues),
163        }
164    }
165}
166
167impl SimulatedTransport for InMemoryTransport {
168    fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
169        let from = self.role.as_ref().ok_or(TransportError::RoleNotSet)?;
170        let key = Self::queue_key(from, to);
171
172        let mut queues = self.queues.lock();
173        queues.entry(key).or_default().push_back(envelope);
174        Ok(())
175    }
176
177    fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
178        let to = self.role.as_ref().ok_or(TransportError::RoleNotSet)?;
179        let key = Self::queue_key(from, to);
180
181        let mut queues = self.queues.lock();
182        queues
183            .get_mut(&key)
184            .and_then(|q| q.pop_front())
185            .ok_or_else(|| TransportError::NoMessage(from.to_string()))
186    }
187
188    fn peek(&self, from: &RoleName) -> bool {
189        let Some(to) = self.role.as_ref() else {
190            return false;
191        };
192        let key = Self::queue_key(from, to);
193
194        let queues = self.queues.lock();
195        queues.get(&key).is_some_and(|q| !q.is_empty())
196    }
197
198    fn pending_messages(&self) -> Vec<&ProtocolEnvelope> {
199        // Note: Can't return references with Mutex, so return empty
200        // Use all_messages() for owned values instead
201        Vec::new()
202    }
203}
204
205#[async_trait]
206impl AsyncSimulatedTransport for InMemoryTransport {
207    async fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
208        SimulatedTransport::send(self, to, envelope)
209    }
210
211    async fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
212        // Simulated recv: returns immediately (no blocking) for deterministic testing
213        SimulatedTransport::recv(self, from)
214    }
215
216    fn try_recv(&mut self, from: &RoleName) -> TransportResult<Option<ProtocolEnvelope>> {
217        match SimulatedTransport::recv(self, from) {
218            Ok(env) => Ok(Some(env)),
219            Err(TransportError::NoMessage(_)) => Ok(None),
220            Err(e) => Err(e),
221        }
222    }
223
224    fn has_message(&self, from: &RoleName) -> bool {
225        self.peek(from)
226    }
227}
228
229/// A transport wrapper that can inject delays and failures.
230pub struct FaultyTransport<T> {
231    inner: T,
232    /// Drop probability (0.0 to 1.0).
233    drop_rate: FixedQ32,
234    /// Whether to delay messages.
235    delay: bool,
236    /// Random seed for reproducibility.
237    seed: u64,
238    /// Current random state.
239    rng_state: u64,
240}
241
242impl<T> FaultyTransport<T> {
243    /// Create a new faulty transport wrapper.
244    pub fn new(inner: T) -> Self {
245        Self {
246            inner,
247            drop_rate: FixedQ32::zero(),
248            delay: false,
249            seed: 12345,
250            rng_state: 12345,
251        }
252    }
253
254    /// Set the message drop rate (0.0 to 1.0).
255    pub fn with_drop_rate(mut self, rate: FixedQ32) -> Self {
256        self.drop_rate = rate.clamp(FixedQ32::zero(), FixedQ32::one());
257        self
258    }
259
260    /// Enable random delays.
261    pub fn with_delays(mut self) -> Self {
262        self.delay = true;
263        self
264    }
265
266    /// Set the random seed for reproducibility.
267    pub fn with_seed(mut self, seed: u64) -> Self {
268        self.seed = seed;
269        self.rng_state = seed;
270        self
271    }
272
273    /// Get a random float between 0 and 1.
274    fn random_float(&mut self) -> FixedQ32 {
275        // Simple xorshift for reproducibility
276        self.rng_state ^= self.rng_state << 13;
277        self.rng_state ^= self.rng_state >> 7;
278        self.rng_state ^= self.rng_state << 17;
279        // FixedQ32 is Q32.32: value = bits / 2^32
280        // Take upper 32 bits as fractional part to get value in [0, 1)
281        let frac_bits = i64::try_from(self.rng_state >> 32).unwrap_or(i64::MAX);
282        FixedQ32::from_bits(frac_bits)
283    }
284
285    /// Check if message should be dropped.
286    fn should_drop(&mut self) -> bool {
287        self.random_float() < self.drop_rate
288    }
289}
290
291impl<T: SimulatedTransport> SimulatedTransport for FaultyTransport<T> {
292    fn send(&mut self, to: &RoleName, envelope: ProtocolEnvelope) -> TransportResult<()> {
293        if self.should_drop() {
294            // Silently drop the message
295            return Ok(());
296        }
297        self.inner.send(to, envelope)
298    }
299
300    fn recv(&mut self, from: &RoleName) -> TransportResult<ProtocolEnvelope> {
301        self.inner.recv(from)
302    }
303
304    fn peek(&self, from: &RoleName) -> bool {
305        self.inner.peek(from)
306    }
307
308    fn pending_messages(&self) -> Vec<&ProtocolEnvelope> {
309        self.inner.pending_messages()
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    fn make_envelope(from: &str, to: &str) -> ProtocolEnvelope {
318        ProtocolEnvelope::builder()
319            .protocol("Test")
320            .sender(RoleName::new(from).unwrap())
321            .recipient(RoleName::new(to).unwrap())
322            .message_type("Msg")
323            .payload(vec![1, 2, 3])
324            .build()
325            .unwrap()
326    }
327
328    #[test]
329    fn test_in_memory_transport() {
330        let queues = Arc::new(Mutex::new(BTreeMap::new()));
331
332        let mut client = InMemoryTransport::with_shared_queues(Arc::clone(&queues));
333        client.set_role(RoleName::from_static("Client"));
334
335        let mut server = InMemoryTransport::with_shared_queues(Arc::clone(&queues));
336        server.set_role(RoleName::from_static("Server"));
337
338        // Client sends to server (use explicit trait method)
339        let env = make_envelope("Client", "Server");
340        let server_role = RoleName::from_static("Server");
341        SimulatedTransport::send(&mut client, &server_role, env).unwrap();
342
343        // Server receives
344        let client_role = RoleName::from_static("Client");
345        assert!(server.peek(&client_role));
346        let received = SimulatedTransport::recv(&mut server, &client_role).unwrap();
347        assert_eq!(received.from_role.as_str(), "Client");
348        assert_eq!(received.to_role.as_str(), "Server");
349    }
350
351    #[test]
352    fn test_no_message_error() {
353        let mut transport = InMemoryTransport::new();
354        transport.set_role(RoleName::from_static("Client"));
355
356        let server_role = RoleName::from_static("Server");
357        let result = SimulatedTransport::recv(&mut transport, &server_role);
358        assert!(matches!(result, Err(TransportError::NoMessage(_))));
359    }
360
361    #[test]
362    fn test_faulty_transport_drops() {
363        let inner = InMemoryTransport::new();
364        let mut faulty = FaultyTransport::new(inner)
365            .with_drop_rate(FixedQ32::one()) // Always drop
366            .with_seed(42);
367
368        faulty.inner.set_role(RoleName::from_static("Client"));
369
370        let env = make_envelope("Client", "Server");
371        let server_role = RoleName::from_static("Server");
372        faulty.send(&server_role, env).unwrap();
373
374        // Message should be dropped, so pending count should be 0
375        assert_eq!(faulty.inner.pending_count(), 0);
376    }
377}