citadel_sync/
memory_transport.rs1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::mpsc;
3use std::sync::Mutex;
4
5use crate::protocol::SyncMessage;
6use crate::transport::{SyncError, SyncTransport};
7
8pub struct MemoryTransport {
16 tx: Mutex<mpsc::Sender<Vec<u8>>>,
17 rx: Mutex<mpsc::Receiver<Vec<u8>>>,
18 closed: AtomicBool,
19}
20
21impl MemoryTransport {
22 pub fn pair() -> (Self, Self) {
26 let (tx_a, rx_b) = mpsc::channel();
27 let (tx_b, rx_a) = mpsc::channel();
28
29 let a = MemoryTransport {
30 tx: Mutex::new(tx_a),
31 rx: Mutex::new(rx_a),
32 closed: AtomicBool::new(false),
33 };
34 let b = MemoryTransport {
35 tx: Mutex::new(tx_b),
36 rx: Mutex::new(rx_b),
37 closed: AtomicBool::new(false),
38 };
39
40 (a, b)
41 }
42}
43
44impl SyncTransport for MemoryTransport {
45 fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError> {
46 if self.closed.load(Ordering::Relaxed) {
47 return Err(SyncError::Closed);
48 }
49 let data = msg.serialize();
50 let tx = self.tx.lock().unwrap();
51 tx.send(data).map_err(|_| SyncError::Closed)
52 }
53
54 fn recv(&self) -> std::result::Result<SyncMessage, SyncError> {
55 if self.closed.load(Ordering::Relaxed) {
56 return Err(SyncError::Closed);
57 }
58 let rx = self.rx.lock().unwrap();
59 let data = rx.recv().map_err(|_| SyncError::Closed)?;
60 Ok(SyncMessage::deserialize(&data)?)
61 }
62
63 fn close(&self) -> std::result::Result<(), SyncError> {
64 self.closed.store(true, Ordering::Relaxed);
65 Ok(())
66 }
67}
68
69#[cfg(test)]
70#[path = "memory_transport_tests.rs"]
71mod tests;