enigma_protocol/
transport.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use tokio::sync::{mpsc, Mutex};
6
7use crate::error::{EnigmaProtocolError, Result};
8
9#[async_trait]
10pub trait Transport: Send + Sync {
11    async fn send(&self, data: Bytes) -> Result<()>;
12    async fn recv(&self) -> Result<Bytes>;
13    async fn close(&self) -> Result<()>;
14}
15
16pub struct InMemoryDuplexTransport {
17    sender: mpsc::Sender<Bytes>,
18    receiver: Mutex<mpsc::Receiver<Bytes>>,
19}
20
21impl InMemoryDuplexTransport {
22    pub fn new(sender: mpsc::Sender<Bytes>, receiver: mpsc::Receiver<Bytes>) -> Self {
23        Self {
24            sender,
25            receiver: Mutex::new(receiver),
26        }
27    }
28}
29
30#[async_trait]
31impl Transport for InMemoryDuplexTransport {
32    async fn send(&self, data: Bytes) -> Result<()> {
33        self.sender
34            .send(data)
35            .await
36            .map_err(|_| EnigmaProtocolError::Transport)
37    }
38
39    async fn recv(&self) -> Result<Bytes> {
40        let mut guard = self.receiver.lock().await;
41        guard.recv().await.ok_or(EnigmaProtocolError::Transport)
42    }
43
44    async fn close(&self) -> Result<()> {
45        let mut guard = self.receiver.lock().await;
46        guard.close();
47        Ok(())
48    }
49}
50
51pub fn in_memory_duplex_pair(buffer: usize) -> (Arc<dyn Transport>, Arc<dyn Transport>) {
52    let (a_tx, a_rx) = mpsc::channel(buffer);
53    let (b_tx, b_rx) = mpsc::channel(buffer);
54    let first: Arc<dyn Transport> = Arc::new(InMemoryDuplexTransport::new(a_tx, b_rx));
55    let second: Arc<dyn Transport> = Arc::new(InMemoryDuplexTransport::new(b_tx, a_rx));
56    (first, second)
57}