leptos_sync_core/transport/
mod.rs1use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use thiserror::Error;
7
8pub mod websocket;
9pub mod memory;
10pub mod multi_transport;
11pub mod leptos_ws_pro_transport;
12pub mod compatibility_layer;
13pub mod hybrid_transport_impl;
14
15#[cfg(test)]
16pub mod leptos_ws_pro_tests;
17
18#[cfg(test)]
19pub mod real_websocket_tests;
20
21#[cfg(test)]
22pub mod server_compatibility_tests;
23#[cfg(test)]
24pub mod hybrid_transport_tests;
25
26#[derive(Error, Debug)]
27pub enum TransportError {
28 #[error("Connection failed: {0}")]
29 ConnectionFailed(String),
30 #[error("Send failed: {0}")]
31 SendFailed(String),
32 #[error("Receive failed: {0}")]
33 ReceiveFailed(String),
34 #[error("Serialization failed: {0}")]
35 SerializationFailed(String),
36 #[error("Not connected")]
37 NotConnected,
38}
39
40impl From<compatibility_layer::CompatibilityError> for TransportError {
43 fn from(err: compatibility_layer::CompatibilityError) -> Self {
44 match err {
45 compatibility_layer::CompatibilityError::Transport(transport_err) => transport_err,
46 compatibility_layer::CompatibilityError::Serialization(msg) => {
47 TransportError::SerializationFailed(msg)
48 }
49 compatibility_layer::CompatibilityError::Protocol(msg) => {
50 TransportError::ConnectionFailed(msg)
51 }
52 }
53 }
54}
55
56pub trait SyncTransport: Send + Sync {
58 type Error: std::error::Error + Send + Sync;
59
60 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>>;
62
63 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>>;
65
66 fn is_connected(&self) -> bool;
68}
69
70pub struct InMemoryTransport {
72 connected: bool,
73 message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
74}
75
76impl InMemoryTransport {
77 pub fn new() -> Self {
78 Self {
79 connected: true,
80 message_queue: Arc::new(RwLock::new(Vec::new())),
81 }
82 }
83
84 pub fn with_connection_status(connected: bool) -> Self {
85 Self {
86 connected,
87 message_queue: Arc::new(RwLock::new(Vec::new())),
88 }
89 }
90}
91
92impl SyncTransport for InMemoryTransport {
93 type Error = TransportError;
94
95 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
96 Box::pin(async move {
97 if !self.connected {
98 return Err(TransportError::NotConnected);
99 }
100
101 let mut queue = self.message_queue.write().await;
102 queue.push(data.to_vec());
103 Ok(())
104 })
105 }
106
107 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
108 Box::pin(async move {
109 if !self.connected {
110 return Err(TransportError::NotConnected);
111 }
112
113 let mut queue = self.message_queue.write().await;
114 let messages = queue.drain(..).collect();
115 Ok(messages)
116 })
117 }
118
119 fn is_connected(&self) -> bool {
120 self.connected
121 }
122}
123
124impl Clone for InMemoryTransport {
125 fn clone(&self) -> Self {
126 Self {
127 connected: self.connected,
128 message_queue: self.message_queue.clone(),
129 }
130 }
131}
132
133pub struct WebSocketTransport {
135 inner: websocket::WebSocketTransport,
136}
137
138impl WebSocketTransport {
139 pub fn new(url: String) -> Self {
140 Self {
141 inner: websocket::WebSocketTransport::new(url),
142 }
143 }
144
145 pub async fn connect(&self) -> Result<(), TransportError> {
146 self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
147 }
148
149 pub async fn disconnect(&self) -> Result<(), TransportError> {
150 self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
151 }
152}
153
154impl SyncTransport for WebSocketTransport {
155 type Error = TransportError;
156
157 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
158 Box::pin(async move {
159 self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
160 })
161 }
162
163 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
164 Box::pin(async move {
165 self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
166 })
167 }
168
169 fn is_connected(&self) -> bool {
170 self.inner.is_connected()
171 }
172}
173
174impl Clone for WebSocketTransport {
175 fn clone(&self) -> Self {
176 Self {
177 inner: self.inner.clone(),
178 }
179 }
180}
181
182pub use hybrid_transport_impl::HybridTransport;
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct TransportConfig {
188 pub url: Option<String>,
189 pub timeout: std::time::Duration,
190 pub retry_attempts: u32,
191 pub heartbeat_interval: std::time::Duration,
192}
193
194impl Default for TransportConfig {
195 fn default() -> Self {
196 Self {
197 url: None,
198 timeout: std::time::Duration::from_secs(30),
199 retry_attempts: 3,
200 heartbeat_interval: std::time::Duration::from_secs(30),
201 }
202 }
203}
204
205pub struct TransportFactory;
207
208impl TransportFactory {
209 pub fn websocket(url: String) -> WebSocketTransport {
210 WebSocketTransport::new(url)
211 }
212
213 pub fn leptos_ws_pro(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
214 HybridTransport::with_leptos_ws_pro(config)
215 }
216
217 pub fn compatibility(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
218 HybridTransport::with_compatibility(config)
219 }
220
221 pub fn in_memory() -> InMemoryTransport {
222 InMemoryTransport::new()
223 }
224
225 pub fn hybrid(primary_url: String) -> HybridTransport {
226 let primary = HybridTransport::with_leptos_ws_pro(leptos_ws_pro_transport::LeptosWsProConfig {
227 url: primary_url,
228 ..Default::default()
229 });
230 let fallback = HybridTransport::with_in_memory();
231 HybridTransport::with_fallback(primary, fallback)
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[tokio::test]
240 async fn test_in_memory_transport() {
241 let transport = InMemoryTransport::new();
242
243 let data = b"test message";
245 assert!(transport.send(data).await.is_ok());
246
247 let received = transport.receive().await.unwrap();
249 assert_eq!(received.len(), 1);
250 assert_eq!(received[0], data);
251 }
252
253 #[tokio::test]
254 async fn test_hybrid_transport_fallback() {
255 let primary = HybridTransport::with_websocket("ws://invalid-url".to_string());
256 let fallback = HybridTransport::with_in_memory();
257 let transport = HybridTransport::with_fallback(primary, fallback.clone());
258
259 let data = b"test message";
261 assert!(fallback.send(data).await.is_ok());
262
263 let received = fallback.receive().await.unwrap();
264 assert_eq!(received.len(), 1);
265 assert_eq!(received[0], data);
266 }
267}