enigma_protocol/
transport.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use tokio::sync::{mpsc, Mutex};
6
7use crate::error::{EnigmaProtocolError, Result};
8
9#[async_trait]
10pub trait Transport: Send + Sync {
11 async fn send(&self, data: Bytes) -> Result<()>;
12 async fn recv(&self) -> Result<Bytes>;
13 async fn close(&self) -> Result<()>;
14}
15
16pub struct InMemoryDuplexTransport {
17 sender: mpsc::Sender<Bytes>,
18 receiver: Mutex<mpsc::Receiver<Bytes>>,
19}
20
21impl InMemoryDuplexTransport {
22 pub fn new(sender: mpsc::Sender<Bytes>, receiver: mpsc::Receiver<Bytes>) -> Self {
23 Self {
24 sender,
25 receiver: Mutex::new(receiver),
26 }
27 }
28}
29
30#[async_trait]
31impl Transport for InMemoryDuplexTransport {
32 async fn send(&self, data: Bytes) -> Result<()> {
33 self.sender
34 .send(data)
35 .await
36 .map_err(|_| EnigmaProtocolError::Transport)
37 }
38
39 async fn recv(&self) -> Result<Bytes> {
40 let mut guard = self.receiver.lock().await;
41 guard.recv().await.ok_or(EnigmaProtocolError::Transport)
42 }
43
44 async fn close(&self) -> Result<()> {
45 let mut guard = self.receiver.lock().await;
46 guard.close();
47 Ok(())
48 }
49}
50
51pub fn in_memory_duplex_pair(buffer: usize) -> (Arc<dyn Transport>, Arc<dyn Transport>) {
52 let (a_tx, a_rx) = mpsc::channel(buffer);
53 let (b_tx, b_rx) = mpsc::channel(buffer);
54 let first: Arc<dyn Transport> = Arc::new(InMemoryDuplexTransport::new(a_tx, b_rx));
55 let second: Arc<dyn Transport> = Arc::new(InMemoryDuplexTransport::new(b_tx, a_rx));
56 (first, second)
57}