rapace_core/transport/
mem.rs1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4
5use crate::{Frame, Payload, TransportError};
6
7use super::TransportBackend;
8
9const CHANNEL_CAPACITY: usize = 64;
10
11#[derive(Clone, Debug)]
12pub struct MemTransport {
13 inner: Arc<MemInner>,
14}
15
16#[derive(Debug)]
17struct MemInner {
18 tx: mpsc::Sender<Frame>,
19 rx: tokio::sync::Mutex<mpsc::Receiver<Frame>>,
20 closed: std::sync::atomic::AtomicBool,
21}
22
23impl MemTransport {
24 pub fn pair() -> (Self, Self) {
25 let (tx_a, rx_a) = mpsc::channel(CHANNEL_CAPACITY);
26 let (tx_b, rx_b) = mpsc::channel(CHANNEL_CAPACITY);
27
28 let inner_a = Arc::new(MemInner {
29 tx: tx_b,
30 rx: tokio::sync::Mutex::new(rx_a),
31 closed: std::sync::atomic::AtomicBool::new(false),
32 });
33
34 let inner_b = Arc::new(MemInner {
35 tx: tx_a,
36 rx: tokio::sync::Mutex::new(rx_b),
37 closed: std::sync::atomic::AtomicBool::new(false),
38 });
39
40 (Self { inner: inner_a }, Self { inner: inner_b })
41 }
42
43 fn is_closed_inner(&self) -> bool {
44 self.inner.closed.load(std::sync::atomic::Ordering::Acquire)
45 }
46}
47
48impl TransportBackend for MemTransport {
49 async fn send_frame(&self, mut frame: Frame) -> Result<(), TransportError> {
50 if self.is_closed_inner() {
51 return Err(TransportError::Closed);
52 }
53
54 if frame.desc.is_inline() {
55 frame.payload = Payload::Inline;
56 }
57
58 self.inner
59 .tx
60 .send(frame)
61 .await
62 .map_err(|_| TransportError::Closed)
63 }
64
65 async fn recv_frame(&self) -> Result<Frame, TransportError> {
66 if self.is_closed_inner() {
67 return Err(TransportError::Closed);
68 }
69
70 let frame = {
71 let mut rx = self.inner.rx.lock().await;
72 rx.recv().await.ok_or(TransportError::Closed)?
73 };
74
75 Ok(frame)
76 }
77
78 fn close(&self) {
79 self.inner
80 .closed
81 .store(true, std::sync::atomic::Ordering::Release);
82 }
83
84 fn is_closed(&self) -> bool {
85 self.is_closed_inner()
86 }
87}