leptos_sync_core/transport/
mod.rs

1//! Transport layer for synchronization
2
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use thiserror::Error;
7
8pub mod websocket;
9pub mod memory;
10pub mod multi_transport;
11pub mod leptos_ws_pro_transport;
12pub mod compatibility_layer;
13pub mod hybrid_transport_impl;
14
15#[cfg(test)]
16pub mod leptos_ws_pro_tests;
17
18#[cfg(test)]
19pub mod real_websocket_tests;
20
21#[cfg(test)]
22pub mod server_compatibility_tests;
23#[cfg(test)]
24pub mod hybrid_transport_tests;
25
26#[cfg(test)]
27pub mod enhanced_features_tests;
28
29#[derive(Error, Debug)]
30pub enum TransportError {
31    #[error("Connection failed: {0}")]
32    ConnectionFailed(String),
33    #[error("Send failed: {0}")]
34    SendFailed(String),
35    #[error("Receive failed: {0}")]
36    ReceiveFailed(String),
37    #[error("Serialization failed: {0}")]
38    SerializationFailed(String),
39    #[error("Not connected")]
40    NotConnected,
41}
42
43// From implementation is already in leptos_ws_pro_transport.rs
44
45impl From<compatibility_layer::CompatibilityError> for TransportError {
46    fn from(err: compatibility_layer::CompatibilityError) -> Self {
47        match err {
48            compatibility_layer::CompatibilityError::Transport(transport_err) => transport_err,
49            compatibility_layer::CompatibilityError::Serialization(msg) => {
50                TransportError::SerializationFailed(msg)
51            }
52            compatibility_layer::CompatibilityError::Protocol(msg) => {
53                TransportError::ConnectionFailed(msg)
54            }
55        }
56    }
57}
58
59/// Transport trait for synchronization
60pub trait SyncTransport: Send + Sync {
61    type Error: std::error::Error + Send + Sync;
62    
63    /// Send data to remote peers
64    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>>;
65    
66    /// Receive data from remote peers
67    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>>;
68    
69    /// Check if transport is connected
70    fn is_connected(&self) -> bool;
71}
72
73/// In-memory transport for testing
74pub struct InMemoryTransport {
75    connected: bool,
76    message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
77}
78
79impl InMemoryTransport {
80    pub fn new() -> Self {
81        Self {
82            connected: true,
83            message_queue: Arc::new(RwLock::new(Vec::new())),
84        }
85    }
86
87    pub fn with_connection_status(connected: bool) -> Self {
88        Self {
89            connected,
90            message_queue: Arc::new(RwLock::new(Vec::new())),
91        }
92    }
93}
94
95impl SyncTransport for InMemoryTransport {
96    type Error = TransportError;
97
98    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
99        Box::pin(async move {
100            if !self.connected {
101                return Err(TransportError::NotConnected);
102            }
103            
104            let mut queue = self.message_queue.write().await;
105            queue.push(data.to_vec());
106            Ok(())
107        })
108    }
109
110    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
111        Box::pin(async move {
112            if !self.connected {
113                return Err(TransportError::NotConnected);
114            }
115            
116            let mut queue = self.message_queue.write().await;
117            let messages = queue.drain(..).collect();
118            Ok(messages)
119        })
120    }
121
122    fn is_connected(&self) -> bool {
123        self.connected
124    }
125}
126
127impl Clone for InMemoryTransport {
128    fn clone(&self) -> Self {
129        Self {
130            connected: self.connected,
131            message_queue: self.message_queue.clone(),
132        }
133    }
134}
135
136/// WebSocket transport wrapper
137pub struct WebSocketTransport {
138    inner: websocket::WebSocketTransport,
139}
140
141impl WebSocketTransport {
142    pub fn new(url: String) -> Self {
143        Self {
144            inner: websocket::WebSocketTransport::new(url),
145        }
146    }
147
148    pub async fn connect(&self) -> Result<(), TransportError> {
149        self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
150    }
151
152    pub async fn disconnect(&self) -> Result<(), TransportError> {
153        self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
154    }
155}
156
157impl SyncTransport for WebSocketTransport {
158    type Error = TransportError;
159
160    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
161        Box::pin(async move {
162            self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
163        })
164    }
165
166    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
167        Box::pin(async move {
168            self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
169        })
170    }
171
172    fn is_connected(&self) -> bool {
173        self.inner.is_connected()
174    }
175}
176
177impl Clone for WebSocketTransport {
178    fn clone(&self) -> Self {
179        Self {
180            inner: self.inner.clone(),
181        }
182    }
183}
184
185// Re-export HybridTransport from the implementation module
186pub use hybrid_transport_impl::HybridTransport;
187
188/// Transport configuration
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct TransportConfig {
191    pub url: Option<String>,
192    pub timeout: std::time::Duration,
193    pub retry_attempts: u32,
194    pub heartbeat_interval: std::time::Duration,
195}
196
197impl Default for TransportConfig {
198    fn default() -> Self {
199        Self {
200            url: None,
201            timeout: std::time::Duration::from_secs(30),
202            retry_attempts: 3,
203            heartbeat_interval: std::time::Duration::from_secs(30),
204        }
205    }
206}
207
208/// Transport factory
209pub struct TransportFactory;
210
211impl TransportFactory {
212    pub fn websocket(url: String) -> WebSocketTransport {
213        WebSocketTransport::new(url)
214    }
215
216    pub fn leptos_ws_pro(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
217        HybridTransport::with_leptos_ws_pro(config)
218    }
219
220    pub fn compatibility(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
221        HybridTransport::with_compatibility(config)
222    }
223
224    pub fn in_memory() -> InMemoryTransport {
225        InMemoryTransport::new()
226    }
227
228    pub fn hybrid(primary_url: String) -> HybridTransport {
229        let primary = HybridTransport::with_leptos_ws_pro(leptos_ws_pro_transport::LeptosWsProConfig {
230            url: primary_url,
231            ..Default::default()
232        });
233        let fallback = HybridTransport::with_in_memory();
234        HybridTransport::with_fallback(primary, fallback)
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[tokio::test]
243    async fn test_in_memory_transport() {
244        let transport = InMemoryTransport::new();
245        
246        // Test send
247        let data = b"test message";
248        assert!(transport.send(data).await.is_ok());
249        
250        // Test receive
251        let received = transport.receive().await.unwrap();
252        assert_eq!(received.len(), 1);
253        assert_eq!(received[0], data);
254    }
255
256    #[tokio::test]
257    async fn test_hybrid_transport_fallback() {
258        let primary = HybridTransport::with_websocket("ws://invalid-url".to_string());
259        let fallback = HybridTransport::with_in_memory();
260        let transport = HybridTransport::with_fallback(primary, fallback.clone());
261        
262        // Send message to fallback transport directly
263        let data = b"test message";
264        assert!(fallback.send(data).await.is_ok());
265        
266        let received = fallback.receive().await.unwrap();
267        assert_eq!(received.len(), 1);
268        assert_eq!(received[0], data);
269    }
270}