rapace_core/
transport.rs

1//! Transport enum and internal backend trait.
2//!
3//! The public API is the [`Transport`] enum. Each backend lives in its own
4//! module under `transport/` and implements the internal [`TransportBackend`]
5//! trait. We use `enum_dispatch` to forward calls without handwritten `match`
6//! boilerplate.
7
8use enum_dispatch::enum_dispatch;
9
10use crate::{Frame, TransportError};
11
12#[enum_dispatch]
13pub(crate) trait TransportBackend: Send + Sync + Clone + 'static {
14    async fn send_frame(&self, frame: Frame) -> Result<(), TransportError>;
15    async fn recv_frame(&self) -> Result<Frame, TransportError>;
16    fn close(&self);
17    fn is_closed(&self) -> bool;
18}
19
20#[enum_dispatch(TransportBackend)]
21#[derive(Clone, Debug)]
22pub enum Transport {
23    #[cfg(feature = "mem")]
24    Mem(mem::MemTransport),
25    #[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
26    Stream(stream::StreamTransport),
27    #[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
28    Shm(shm::ShmTransport),
29    #[cfg(feature = "websocket")]
30    WebSocket(websocket::WebSocketTransport),
31}
32
33impl Transport {
34    pub async fn send_frame(&self, frame: Frame) -> Result<(), TransportError> {
35        TransportBackend::send_frame(self, frame).await
36    }
37
38    pub async fn recv_frame(&self) -> Result<Frame, TransportError> {
39        TransportBackend::recv_frame(self).await
40    }
41
42    pub fn close(&self) {
43        TransportBackend::close(self);
44    }
45
46    pub fn is_closed(&self) -> bool {
47        TransportBackend::is_closed(self)
48    }
49
50    #[cfg(feature = "mem")]
51    pub fn mem_pair() -> (Self, Self) {
52        let (a, b) = mem::MemTransport::pair();
53        (Transport::Mem(a), Transport::Mem(b))
54    }
55
56    #[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
57    pub fn shm(session: std::sync::Arc<shm::ShmSession>) -> Self {
58        Transport::Shm(shm::ShmTransport::new(session))
59    }
60
61    #[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
62    pub fn shm_pair() -> (Self, Self) {
63        let (a, b) = shm::ShmTransport::pair().expect("failed to create shm transport pair");
64        (Transport::Shm(a), Transport::Shm(b))
65    }
66
67    #[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
68    pub fn stream<S>(stream: S) -> Self
69    where
70        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync + 'static,
71    {
72        Transport::Stream(stream::StreamTransport::new(stream))
73    }
74
75    #[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
76    pub fn stream_pair() -> (Self, Self) {
77        let (a, b) = stream::StreamTransport::pair();
78        (Transport::Stream(a), Transport::Stream(b))
79    }
80
81    #[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
82    pub fn websocket<S>(ws: tokio_tungstenite::WebSocketStream<S>) -> Self
83    where
84        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
85    {
86        Transport::WebSocket(websocket::WebSocketTransport::new(ws))
87    }
88
89    #[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
90    pub async fn websocket_pair() -> (Self, Self) {
91        let (a, b) = websocket::WebSocketTransport::pair().await;
92        (Transport::WebSocket(a), Transport::WebSocket(b))
93    }
94
95    #[cfg(all(feature = "websocket-axum", not(target_arch = "wasm32")))]
96    pub fn websocket_axum(ws: axum::extract::ws::WebSocket) -> Self {
97        Transport::WebSocket(websocket::WebSocketTransport::from_axum(ws))
98    }
99}
100
101#[cfg(feature = "mem")]
102pub mod mem;
103#[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
104pub mod shm;
105#[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
106pub mod stream;
107#[cfg(feature = "websocket")]
108pub mod websocket;