1use enum_dispatch::enum_dispatch;
9
10use crate::{Frame, TransportError};
11
12#[enum_dispatch]
13pub(crate) trait TransportBackend: Send + Sync + Clone + 'static {
14 async fn send_frame(&self, frame: Frame) -> Result<(), TransportError>;
15 async fn recv_frame(&self) -> Result<Frame, TransportError>;
16 fn close(&self);
17 fn is_closed(&self) -> bool;
18}
19
20#[enum_dispatch(TransportBackend)]
21#[derive(Clone, Debug)]
22pub enum Transport {
23 #[cfg(feature = "mem")]
24 Mem(mem::MemTransport),
25 #[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
26 Stream(stream::StreamTransport),
27 #[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
28 Shm(shm::ShmTransport),
29 #[cfg(feature = "websocket")]
30 WebSocket(websocket::WebSocketTransport),
31}
32
33impl Transport {
34 pub async fn send_frame(&self, frame: Frame) -> Result<(), TransportError> {
35 TransportBackend::send_frame(self, frame).await
36 }
37
38 pub async fn recv_frame(&self) -> Result<Frame, TransportError> {
39 TransportBackend::recv_frame(self).await
40 }
41
42 pub fn close(&self) {
43 TransportBackend::close(self);
44 }
45
46 pub fn is_closed(&self) -> bool {
47 TransportBackend::is_closed(self)
48 }
49
50 #[cfg(feature = "mem")]
51 pub fn mem_pair() -> (Self, Self) {
52 let (a, b) = mem::MemTransport::pair();
53 (Transport::Mem(a), Transport::Mem(b))
54 }
55
56 #[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
57 pub fn shm(session: std::sync::Arc<shm::ShmSession>) -> Self {
58 Transport::Shm(shm::ShmTransport::new(session))
59 }
60
61 #[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
62 pub fn shm_pair() -> (Self, Self) {
63 let (a, b) = shm::ShmTransport::pair().expect("failed to create shm transport pair");
64 (Transport::Shm(a), Transport::Shm(b))
65 }
66
67 #[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
68 pub fn stream<S>(stream: S) -> Self
69 where
70 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync + 'static,
71 {
72 Transport::Stream(stream::StreamTransport::new(stream))
73 }
74
75 #[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
76 pub fn stream_pair() -> (Self, Self) {
77 let (a, b) = stream::StreamTransport::pair();
78 (Transport::Stream(a), Transport::Stream(b))
79 }
80
81 #[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
82 pub fn websocket<S>(ws: tokio_tungstenite::WebSocketStream<S>) -> Self
83 where
84 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
85 {
86 Transport::WebSocket(websocket::WebSocketTransport::new(ws))
87 }
88
89 #[cfg(all(feature = "websocket", not(target_arch = "wasm32")))]
90 pub async fn websocket_pair() -> (Self, Self) {
91 let (a, b) = websocket::WebSocketTransport::pair().await;
92 (Transport::WebSocket(a), Transport::WebSocket(b))
93 }
94
95 #[cfg(all(feature = "websocket-axum", not(target_arch = "wasm32")))]
96 pub fn websocket_axum(ws: axum::extract::ws::WebSocket) -> Self {
97 Transport::WebSocket(websocket::WebSocketTransport::from_axum(ws))
98 }
99}
100
101#[cfg(feature = "mem")]
102pub mod mem;
103#[cfg(all(feature = "shm", not(target_arch = "wasm32")))]
104pub mod shm;
105#[cfg(all(feature = "stream", not(target_arch = "wasm32")))]
106pub mod stream;
107#[cfg(feature = "websocket")]
108pub mod websocket;