leptos_sync_core/transport/
mod.rs

1//! Transport layer for synchronization
2
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use thiserror::Error;
7
8pub mod websocket;
9pub mod memory;
10pub mod multi_transport;
11pub mod leptos_ws_pro_transport;
12pub mod compatibility_layer;
13pub mod hybrid_transport_impl;
14pub mod message_protocol;
15pub mod websocket_client;
16pub mod websocket_integration;
17
18#[cfg(test)]
19pub mod leptos_ws_pro_tests;
20
21#[cfg(test)]
22pub mod real_websocket_tests;
23
24#[cfg(test)]
25pub mod server_compatibility_tests;
26#[cfg(test)]
27pub mod hybrid_transport_tests;
28
29#[cfg(test)]
30pub mod enhanced_features_tests;
31
32#[cfg(test)]
33pub mod websocket_integration_tests;
34
35#[derive(Error, Debug)]
36pub enum TransportError {
37    #[error("Connection failed: {0}")]
38    ConnectionFailed(String),
39    #[error("Send failed: {0}")]
40    SendFailed(String),
41    #[error("Receive failed: {0}")]
42    ReceiveFailed(String),
43    #[error("Serialization failed: {0}")]
44    SerializationFailed(String),
45    #[error("Not connected")]
46    NotConnected,
47}
48
49// From implementation is already in leptos_ws_pro_transport.rs
50
51impl From<compatibility_layer::CompatibilityError> for TransportError {
52    fn from(err: compatibility_layer::CompatibilityError) -> Self {
53        match err {
54            compatibility_layer::CompatibilityError::Transport(transport_err) => transport_err,
55            compatibility_layer::CompatibilityError::Serialization(msg) => {
56                TransportError::SerializationFailed(msg)
57            }
58            compatibility_layer::CompatibilityError::Protocol(msg) => {
59                TransportError::ConnectionFailed(msg)
60            }
61        }
62    }
63}
64
65/// Transport trait for synchronization
66pub trait SyncTransport: Send + Sync {
67    type Error: std::error::Error + Send + Sync;
68    
69    /// Send data to remote peers
70    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>>;
71    
72    /// Receive data from remote peers
73    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>>;
74    
75    /// Check if transport is connected
76    fn is_connected(&self) -> bool;
77}
78
79/// In-memory transport for testing
80pub struct InMemoryTransport {
81    connected: bool,
82    message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
83}
84
85impl InMemoryTransport {
86    pub fn new() -> Self {
87        Self {
88            connected: true,
89            message_queue: Arc::new(RwLock::new(Vec::new())),
90        }
91    }
92
93    pub fn with_connection_status(connected: bool) -> Self {
94        Self {
95            connected,
96            message_queue: Arc::new(RwLock::new(Vec::new())),
97        }
98    }
99}
100
101impl SyncTransport for InMemoryTransport {
102    type Error = TransportError;
103
104    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
105        Box::pin(async move {
106            if !self.connected {
107                return Err(TransportError::NotConnected);
108            }
109            
110            let mut queue = self.message_queue.write().await;
111            queue.push(data.to_vec());
112            Ok(())
113        })
114    }
115
116    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
117        Box::pin(async move {
118            if !self.connected {
119                return Err(TransportError::NotConnected);
120            }
121            
122            let mut queue = self.message_queue.write().await;
123            let messages = queue.drain(..).collect();
124            Ok(messages)
125        })
126    }
127
128    fn is_connected(&self) -> bool {
129        self.connected
130    }
131}
132
133impl Clone for InMemoryTransport {
134    fn clone(&self) -> Self {
135        Self {
136            connected: self.connected,
137            message_queue: self.message_queue.clone(),
138        }
139    }
140}
141
142// Re-export the WebSocket transport from the websocket module
143pub use websocket::{WebSocketTransport, WebSocketConfig, WebSocketError, ConnectionState};
144
145// Re-export HybridTransport from the implementation module
146pub use hybrid_transport_impl::HybridTransport;
147
148// Re-export WebSocket types
149pub use websocket_client::{WebSocketClient, WebSocketClientConfig, WebSocketClientError};
150pub use message_protocol::{SyncMessage, MessageCodec, CrdtType, UserInfo, PresenceAction, ServerInfo};
151pub use websocket_integration::{WebSocketSyncEngine, WebSocketIntegrationConfig, WebSocketSyncEngineBuilder};
152
153/// Transport configuration
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct TransportConfig {
156    pub url: Option<String>,
157    pub timeout: std::time::Duration,
158    pub retry_attempts: u32,
159    pub heartbeat_interval: std::time::Duration,
160}
161
162impl Default for TransportConfig {
163    fn default() -> Self {
164        Self {
165            url: None,
166            timeout: std::time::Duration::from_secs(30),
167            retry_attempts: 3,
168            heartbeat_interval: std::time::Duration::from_secs(30),
169        }
170    }
171}
172
173/// Transport factory
174pub struct TransportFactory;
175
176impl TransportFactory {
177    pub fn websocket(url: String) -> WebSocketTransport {
178        WebSocketTransport::new(url)
179    }
180
181    pub fn leptos_ws_pro(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
182        HybridTransport::with_leptos_ws_pro(config)
183    }
184
185    pub fn compatibility(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
186        HybridTransport::with_compatibility(config)
187    }
188
189    pub fn in_memory() -> InMemoryTransport {
190        InMemoryTransport::new()
191    }
192
193    pub fn hybrid(primary_url: String) -> HybridTransport {
194        let primary = HybridTransport::with_leptos_ws_pro(leptos_ws_pro_transport::LeptosWsProConfig {
195            url: primary_url,
196            ..Default::default()
197        });
198        let fallback = HybridTransport::with_in_memory();
199        HybridTransport::with_fallback(primary, fallback)
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[tokio::test]
208    async fn test_in_memory_transport() {
209        let transport = InMemoryTransport::new();
210        
211        // Test send
212        let data = b"test message";
213        assert!(transport.send(data).await.is_ok());
214        
215        // Test receive
216        let received = transport.receive().await.unwrap();
217        assert_eq!(received.len(), 1);
218        assert_eq!(received[0], data);
219    }
220
221    #[tokio::test]
222    async fn test_hybrid_transport_fallback() {
223        let primary = HybridTransport::with_websocket("ws://invalid-url".to_string());
224        let fallback = HybridTransport::with_in_memory();
225        let transport = HybridTransport::with_fallback(primary, fallback.clone());
226        
227        // Send message to fallback transport directly
228        let data = b"test message";
229        assert!(fallback.send(data).await.is_ok());
230        
231        let received = fallback.receive().await.unwrap();
232        assert_eq!(received.len(), 1);
233        assert_eq!(received[0], data);
234    }
235}