rapace_core/transport/
mem.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4
5use crate::{Frame, Payload, TransportError};
6
7use super::TransportBackend;
8
9const CHANNEL_CAPACITY: usize = 64;
10
11#[derive(Clone, Debug)]
12pub struct MemTransport {
13    inner: Arc<MemInner>,
14}
15
16#[derive(Debug)]
17struct MemInner {
18    tx: mpsc::Sender<Frame>,
19    rx: tokio::sync::Mutex<mpsc::Receiver<Frame>>,
20    closed: std::sync::atomic::AtomicBool,
21}
22
23impl MemTransport {
24    pub fn pair() -> (Self, Self) {
25        let (tx_a, rx_a) = mpsc::channel(CHANNEL_CAPACITY);
26        let (tx_b, rx_b) = mpsc::channel(CHANNEL_CAPACITY);
27
28        let inner_a = Arc::new(MemInner {
29            tx: tx_b,
30            rx: tokio::sync::Mutex::new(rx_a),
31            closed: std::sync::atomic::AtomicBool::new(false),
32        });
33
34        let inner_b = Arc::new(MemInner {
35            tx: tx_a,
36            rx: tokio::sync::Mutex::new(rx_b),
37            closed: std::sync::atomic::AtomicBool::new(false),
38        });
39
40        (Self { inner: inner_a }, Self { inner: inner_b })
41    }
42
43    fn is_closed_inner(&self) -> bool {
44        self.inner.closed.load(std::sync::atomic::Ordering::Acquire)
45    }
46}
47
48impl TransportBackend for MemTransport {
49    async fn send_frame(&self, mut frame: Frame) -> Result<(), TransportError> {
50        if self.is_closed_inner() {
51            return Err(TransportError::Closed);
52        }
53
54        if frame.desc.is_inline() {
55            frame.payload = Payload::Inline;
56        }
57
58        self.inner
59            .tx
60            .send(frame)
61            .await
62            .map_err(|_| TransportError::Closed)
63    }
64
65    async fn recv_frame(&self) -> Result<Frame, TransportError> {
66        if self.is_closed_inner() {
67            return Err(TransportError::Closed);
68        }
69
70        let frame = {
71            let mut rx = self.inner.rx.lock().await;
72            rx.recv().await.ok_or(TransportError::Closed)?
73        };
74
75        Ok(frame)
76    }
77
78    fn close(&self) {
79        self.inner
80            .closed
81            .store(true, std::sync::atomic::Ordering::Release);
82    }
83
84    fn is_closed(&self) -> bool {
85        self.is_closed_inner()
86    }
87}