leptos_sync_core/transport/
mod.rs1use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use thiserror::Error;
7
8pub mod websocket;
9pub mod memory;
10pub mod multi_transport;
11pub mod leptos_ws_pro_transport;
12pub mod compatibility_layer;
13pub mod hybrid_transport_impl;
14
15#[cfg(test)]
16pub mod leptos_ws_pro_tests;
17
18#[cfg(test)]
19pub mod real_websocket_tests;
20
21#[cfg(test)]
22pub mod server_compatibility_tests;
23#[cfg(test)]
24pub mod hybrid_transport_tests;
25
26#[cfg(test)]
27pub mod enhanced_features_tests;
28
29#[derive(Error, Debug)]
30pub enum TransportError {
31 #[error("Connection failed: {0}")]
32 ConnectionFailed(String),
33 #[error("Send failed: {0}")]
34 SendFailed(String),
35 #[error("Receive failed: {0}")]
36 ReceiveFailed(String),
37 #[error("Serialization failed: {0}")]
38 SerializationFailed(String),
39 #[error("Not connected")]
40 NotConnected,
41}
42
43impl From<compatibility_layer::CompatibilityError> for TransportError {
46 fn from(err: compatibility_layer::CompatibilityError) -> Self {
47 match err {
48 compatibility_layer::CompatibilityError::Transport(transport_err) => transport_err,
49 compatibility_layer::CompatibilityError::Serialization(msg) => {
50 TransportError::SerializationFailed(msg)
51 }
52 compatibility_layer::CompatibilityError::Protocol(msg) => {
53 TransportError::ConnectionFailed(msg)
54 }
55 }
56 }
57}
58
59pub trait SyncTransport: Send + Sync {
61 type Error: std::error::Error + Send + Sync;
62
63 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>>;
65
66 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>>;
68
69 fn is_connected(&self) -> bool;
71}
72
73pub struct InMemoryTransport {
75 connected: bool,
76 message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
77}
78
79impl InMemoryTransport {
80 pub fn new() -> Self {
81 Self {
82 connected: true,
83 message_queue: Arc::new(RwLock::new(Vec::new())),
84 }
85 }
86
87 pub fn with_connection_status(connected: bool) -> Self {
88 Self {
89 connected,
90 message_queue: Arc::new(RwLock::new(Vec::new())),
91 }
92 }
93}
94
95impl SyncTransport for InMemoryTransport {
96 type Error = TransportError;
97
98 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
99 Box::pin(async move {
100 if !self.connected {
101 return Err(TransportError::NotConnected);
102 }
103
104 let mut queue = self.message_queue.write().await;
105 queue.push(data.to_vec());
106 Ok(())
107 })
108 }
109
110 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
111 Box::pin(async move {
112 if !self.connected {
113 return Err(TransportError::NotConnected);
114 }
115
116 let mut queue = self.message_queue.write().await;
117 let messages = queue.drain(..).collect();
118 Ok(messages)
119 })
120 }
121
122 fn is_connected(&self) -> bool {
123 self.connected
124 }
125}
126
127impl Clone for InMemoryTransport {
128 fn clone(&self) -> Self {
129 Self {
130 connected: self.connected,
131 message_queue: self.message_queue.clone(),
132 }
133 }
134}
135
136pub struct WebSocketTransport {
138 inner: websocket::WebSocketTransport,
139}
140
141impl WebSocketTransport {
142 pub fn new(url: String) -> Self {
143 Self {
144 inner: websocket::WebSocketTransport::new(url),
145 }
146 }
147
148 pub async fn connect(&self) -> Result<(), TransportError> {
149 self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
150 }
151
152 pub async fn disconnect(&self) -> Result<(), TransportError> {
153 self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
154 }
155}
156
157impl SyncTransport for WebSocketTransport {
158 type Error = TransportError;
159
160 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
161 Box::pin(async move {
162 self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
163 })
164 }
165
166 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
167 Box::pin(async move {
168 self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
169 })
170 }
171
172 fn is_connected(&self) -> bool {
173 self.inner.is_connected()
174 }
175}
176
177impl Clone for WebSocketTransport {
178 fn clone(&self) -> Self {
179 Self {
180 inner: self.inner.clone(),
181 }
182 }
183}
184
185pub use hybrid_transport_impl::HybridTransport;
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct TransportConfig {
191 pub url: Option<String>,
192 pub timeout: std::time::Duration,
193 pub retry_attempts: u32,
194 pub heartbeat_interval: std::time::Duration,
195}
196
197impl Default for TransportConfig {
198 fn default() -> Self {
199 Self {
200 url: None,
201 timeout: std::time::Duration::from_secs(30),
202 retry_attempts: 3,
203 heartbeat_interval: std::time::Duration::from_secs(30),
204 }
205 }
206}
207
208pub struct TransportFactory;
210
211impl TransportFactory {
212 pub fn websocket(url: String) -> WebSocketTransport {
213 WebSocketTransport::new(url)
214 }
215
216 pub fn leptos_ws_pro(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
217 HybridTransport::with_leptos_ws_pro(config)
218 }
219
220 pub fn compatibility(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
221 HybridTransport::with_compatibility(config)
222 }
223
224 pub fn in_memory() -> InMemoryTransport {
225 InMemoryTransport::new()
226 }
227
228 pub fn hybrid(primary_url: String) -> HybridTransport {
229 let primary = HybridTransport::with_leptos_ws_pro(leptos_ws_pro_transport::LeptosWsProConfig {
230 url: primary_url,
231 ..Default::default()
232 });
233 let fallback = HybridTransport::with_in_memory();
234 HybridTransport::with_fallback(primary, fallback)
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[tokio::test]
243 async fn test_in_memory_transport() {
244 let transport = InMemoryTransport::new();
245
246 let data = b"test message";
248 assert!(transport.send(data).await.is_ok());
249
250 let received = transport.receive().await.unwrap();
252 assert_eq!(received.len(), 1);
253 assert_eq!(received[0], data);
254 }
255
256 #[tokio::test]
257 async fn test_hybrid_transport_fallback() {
258 let primary = HybridTransport::with_websocket("ws://invalid-url".to_string());
259 let fallback = HybridTransport::with_in_memory();
260 let transport = HybridTransport::with_fallback(primary, fallback.clone());
261
262 let data = b"test message";
264 assert!(fallback.send(data).await.is_ok());
265
266 let received = fallback.receive().await.unwrap();
267 assert_eq!(received.len(), 1);
268 assert_eq!(received[0], data);
269 }
270}