rapace-core 0.5.0

Core types and traits for rapace RPC
Documentation
use std::sync::Arc;

use tokio::sync::mpsc;

use crate::{Frame, Payload, TransportError};

use super::TransportBackend;

const CHANNEL_CAPACITY: usize = 64;

#[derive(Clone, Debug)]
pub struct MemTransport {
    inner: Arc<MemInner>,
}

#[derive(Debug)]
struct MemInner {
    tx: mpsc::Sender<Frame>,
    rx: tokio::sync::Mutex<mpsc::Receiver<Frame>>,
    closed: std::sync::atomic::AtomicBool,
}

impl MemTransport {
    pub fn pair() -> (Self, Self) {
        let (tx_a, rx_a) = mpsc::channel(CHANNEL_CAPACITY);
        let (tx_b, rx_b) = mpsc::channel(CHANNEL_CAPACITY);

        let inner_a = Arc::new(MemInner {
            tx: tx_b,
            rx: tokio::sync::Mutex::new(rx_a),
            closed: std::sync::atomic::AtomicBool::new(false),
        });

        let inner_b = Arc::new(MemInner {
            tx: tx_a,
            rx: tokio::sync::Mutex::new(rx_b),
            closed: std::sync::atomic::AtomicBool::new(false),
        });

        (Self { inner: inner_a }, Self { inner: inner_b })
    }

    fn is_closed_inner(&self) -> bool {
        self.inner.closed.load(std::sync::atomic::Ordering::Acquire)
    }
}

impl TransportBackend for MemTransport {
    async fn send_frame(&self, mut frame: Frame) -> Result<(), TransportError> {
        if self.is_closed_inner() {
            return Err(TransportError::Closed);
        }

        if frame.desc.is_inline() {
            frame.payload = Payload::Inline;
        }

        self.inner
            .tx
            .send(frame)
            .await
            .map_err(|_| TransportError::Closed)
    }

    async fn recv_frame(&self) -> Result<Frame, TransportError> {
        if self.is_closed_inner() {
            return Err(TransportError::Closed);
        }

        let frame = {
            let mut rx = self.inner.rx.lock().await;
            rx.recv().await.ok_or(TransportError::Closed)?
        };

        Ok(frame)
    }

    fn close(&self) {
        self.inner
            .closed
            .store(true, std::sync::atomic::Ordering::Release);
    }

    fn is_closed(&self) -> bool {
        self.is_closed_inner()
    }
}