leptos_sync_core/transport/
mod.rs1use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use thiserror::Error;
7
8pub mod websocket;
9pub mod memory;
10pub mod multi_transport;
11pub mod leptos_ws_pro_transport;
12pub mod compatibility_layer;
13pub mod hybrid_transport_impl;
14pub mod message_protocol;
15pub mod websocket_client;
16pub mod websocket_integration;
17
18#[cfg(test)]
19pub mod leptos_ws_pro_tests;
20
21#[cfg(test)]
22pub mod real_websocket_tests;
23
24#[cfg(test)]
25pub mod server_compatibility_tests;
26#[cfg(test)]
27pub mod hybrid_transport_tests;
28
29#[cfg(test)]
30pub mod enhanced_features_tests;
31
32#[cfg(test)]
33pub mod websocket_integration_tests;
34
35#[derive(Error, Debug)]
36pub enum TransportError {
37 #[error("Connection failed: {0}")]
38 ConnectionFailed(String),
39 #[error("Send failed: {0}")]
40 SendFailed(String),
41 #[error("Receive failed: {0}")]
42 ReceiveFailed(String),
43 #[error("Serialization failed: {0}")]
44 SerializationFailed(String),
45 #[error("Not connected")]
46 NotConnected,
47}
48
49impl From<compatibility_layer::CompatibilityError> for TransportError {
52 fn from(err: compatibility_layer::CompatibilityError) -> Self {
53 match err {
54 compatibility_layer::CompatibilityError::Transport(transport_err) => transport_err,
55 compatibility_layer::CompatibilityError::Serialization(msg) => {
56 TransportError::SerializationFailed(msg)
57 }
58 compatibility_layer::CompatibilityError::Protocol(msg) => {
59 TransportError::ConnectionFailed(msg)
60 }
61 }
62 }
63}
64
65pub trait SyncTransport: Send + Sync {
67 type Error: std::error::Error + Send + Sync;
68
69 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>>;
71
72 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>>;
74
75 fn is_connected(&self) -> bool;
77}
78
79pub struct InMemoryTransport {
81 connected: bool,
82 message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
83}
84
85impl InMemoryTransport {
86 pub fn new() -> Self {
87 Self {
88 connected: true,
89 message_queue: Arc::new(RwLock::new(Vec::new())),
90 }
91 }
92
93 pub fn with_connection_status(connected: bool) -> Self {
94 Self {
95 connected,
96 message_queue: Arc::new(RwLock::new(Vec::new())),
97 }
98 }
99}
100
101impl SyncTransport for InMemoryTransport {
102 type Error = TransportError;
103
104 fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
105 Box::pin(async move {
106 if !self.connected {
107 return Err(TransportError::NotConnected);
108 }
109
110 let mut queue = self.message_queue.write().await;
111 queue.push(data.to_vec());
112 Ok(())
113 })
114 }
115
116 fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
117 Box::pin(async move {
118 if !self.connected {
119 return Err(TransportError::NotConnected);
120 }
121
122 let mut queue = self.message_queue.write().await;
123 let messages = queue.drain(..).collect();
124 Ok(messages)
125 })
126 }
127
128 fn is_connected(&self) -> bool {
129 self.connected
130 }
131}
132
133impl Clone for InMemoryTransport {
134 fn clone(&self) -> Self {
135 Self {
136 connected: self.connected,
137 message_queue: self.message_queue.clone(),
138 }
139 }
140}
141
142pub use websocket::{WebSocketTransport, WebSocketConfig, WebSocketError, ConnectionState};
144
145pub use hybrid_transport_impl::HybridTransport;
147
148pub use websocket_client::{WebSocketClient, WebSocketClientConfig, WebSocketClientError};
150pub use message_protocol::{SyncMessage, MessageCodec, CrdtType, UserInfo, PresenceAction, ServerInfo};
151pub use websocket_integration::{WebSocketSyncEngine, WebSocketIntegrationConfig, WebSocketSyncEngineBuilder};
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct TransportConfig {
156 pub url: Option<String>,
157 pub timeout: std::time::Duration,
158 pub retry_attempts: u32,
159 pub heartbeat_interval: std::time::Duration,
160}
161
162impl Default for TransportConfig {
163 fn default() -> Self {
164 Self {
165 url: None,
166 timeout: std::time::Duration::from_secs(30),
167 retry_attempts: 3,
168 heartbeat_interval: std::time::Duration::from_secs(30),
169 }
170 }
171}
172
173pub struct TransportFactory;
175
176impl TransportFactory {
177 pub fn websocket(url: String) -> WebSocketTransport {
178 WebSocketTransport::new(url)
179 }
180
181 pub fn leptos_ws_pro(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
182 HybridTransport::with_leptos_ws_pro(config)
183 }
184
185 pub fn compatibility(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
186 HybridTransport::with_compatibility(config)
187 }
188
189 pub fn in_memory() -> InMemoryTransport {
190 InMemoryTransport::new()
191 }
192
193 pub fn hybrid(primary_url: String) -> HybridTransport {
194 let primary = HybridTransport::with_leptos_ws_pro(leptos_ws_pro_transport::LeptosWsProConfig {
195 url: primary_url,
196 ..Default::default()
197 });
198 let fallback = HybridTransport::with_in_memory();
199 HybridTransport::with_fallback(primary, fallback)
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[tokio::test]
208 async fn test_in_memory_transport() {
209 let transport = InMemoryTransport::new();
210
211 let data = b"test message";
213 assert!(transport.send(data).await.is_ok());
214
215 let received = transport.receive().await.unwrap();
217 assert_eq!(received.len(), 1);
218 assert_eq!(received[0], data);
219 }
220
221 #[tokio::test]
222 async fn test_hybrid_transport_fallback() {
223 let primary = HybridTransport::with_websocket("ws://invalid-url".to_string());
224 let fallback = HybridTransport::with_in_memory();
225 let transport = HybridTransport::with_fallback(primary, fallback.clone());
226
227 let data = b"test message";
229 assert!(fallback.send(data).await.is_ok());
230
231 let received = fallback.receive().await.unwrap();
232 assert_eq!(received.len(), 1);
233 assert_eq!(received[0], data);
234 }
235}