Skip to main content

citadel_sync/
memory_transport.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::mpsc;
3use std::sync::Mutex;
4
5use crate::protocol::SyncMessage;
6use crate::transport::{SyncError, SyncTransport};
7
8/// In-memory transport for testing sync sessions.
9///
10/// Uses `mpsc` channels for bidirectional communication.
11/// Thread-safe (`Send + Sync`) so each side can be shared with a
12/// scoped thread via `&self`.
13///
14/// Create a connected pair with `MemoryTransport::pair()`.
15pub struct MemoryTransport {
16    tx: Mutex<mpsc::Sender<Vec<u8>>>,
17    rx: Mutex<mpsc::Receiver<Vec<u8>>>,
18    closed: AtomicBool,
19}
20
21impl MemoryTransport {
22    /// Create a connected pair of transports.
23    ///
24    /// Messages sent on one side are received by the other.
25    pub fn pair() -> (Self, Self) {
26        let (tx_a, rx_b) = mpsc::channel();
27        let (tx_b, rx_a) = mpsc::channel();
28
29        let a = MemoryTransport {
30            tx: Mutex::new(tx_a),
31            rx: Mutex::new(rx_a),
32            closed: AtomicBool::new(false),
33        };
34        let b = MemoryTransport {
35            tx: Mutex::new(tx_b),
36            rx: Mutex::new(rx_b),
37            closed: AtomicBool::new(false),
38        };
39
40        (a, b)
41    }
42}
43
44impl SyncTransport for MemoryTransport {
45    fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError> {
46        if self.closed.load(Ordering::Relaxed) {
47            return Err(SyncError::Closed);
48        }
49        let data = msg.serialize();
50        let tx = self.tx.lock().unwrap();
51        tx.send(data).map_err(|_| SyncError::Closed)
52    }
53
54    fn recv(&self) -> std::result::Result<SyncMessage, SyncError> {
55        if self.closed.load(Ordering::Relaxed) {
56            return Err(SyncError::Closed);
57        }
58        let rx = self.rx.lock().unwrap();
59        let data = rx.recv().map_err(|_| SyncError::Closed)?;
60        Ok(SyncMessage::deserialize(&data)?)
61    }
62
63    fn close(&self) -> std::result::Result<(), SyncError> {
64        self.closed.store(true, Ordering::Relaxed);
65        Ok(())
66    }
67}
68
69#[cfg(test)]
70#[path = "memory_transport_tests.rs"]
71mod tests;