leptos_sync_core/transport/
mod.rs

1//! Transport layer for synchronization
2
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use thiserror::Error;
7
8pub mod websocket;
9pub mod memory;
10pub mod multi_transport;
11pub mod leptos_ws_pro_transport;
12pub mod compatibility_layer;
13pub mod hybrid_transport_impl;
14
15#[cfg(test)]
16pub mod leptos_ws_pro_tests;
17
18#[cfg(test)]
19pub mod real_websocket_tests;
20
21#[cfg(test)]
22pub mod server_compatibility_tests;
23#[cfg(test)]
24pub mod hybrid_transport_tests;
25
26#[derive(Error, Debug)]
27pub enum TransportError {
28    #[error("Connection failed: {0}")]
29    ConnectionFailed(String),
30    #[error("Send failed: {0}")]
31    SendFailed(String),
32    #[error("Receive failed: {0}")]
33    ReceiveFailed(String),
34    #[error("Serialization failed: {0}")]
35    SerializationFailed(String),
36    #[error("Not connected")]
37    NotConnected,
38}
39
40// From implementation is already in leptos_ws_pro_transport.rs
41
42impl From<compatibility_layer::CompatibilityError> for TransportError {
43    fn from(err: compatibility_layer::CompatibilityError) -> Self {
44        match err {
45            compatibility_layer::CompatibilityError::Transport(transport_err) => transport_err,
46            compatibility_layer::CompatibilityError::Serialization(msg) => {
47                TransportError::SerializationFailed(msg)
48            }
49            compatibility_layer::CompatibilityError::Protocol(msg) => {
50                TransportError::ConnectionFailed(msg)
51            }
52        }
53    }
54}
55
56/// Transport trait for synchronization
57pub trait SyncTransport: Send + Sync {
58    type Error: std::error::Error + Send + Sync;
59    
60    /// Send data to remote peers
61    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>>;
62    
63    /// Receive data from remote peers
64    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>>;
65    
66    /// Check if transport is connected
67    fn is_connected(&self) -> bool;
68}
69
70/// In-memory transport for testing
71pub struct InMemoryTransport {
72    connected: bool,
73    message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
74}
75
76impl InMemoryTransport {
77    pub fn new() -> Self {
78        Self {
79            connected: true,
80            message_queue: Arc::new(RwLock::new(Vec::new())),
81        }
82    }
83
84    pub fn with_connection_status(connected: bool) -> Self {
85        Self {
86            connected,
87            message_queue: Arc::new(RwLock::new(Vec::new())),
88        }
89    }
90}
91
92impl SyncTransport for InMemoryTransport {
93    type Error = TransportError;
94
95    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
96        Box::pin(async move {
97            if !self.connected {
98                return Err(TransportError::NotConnected);
99            }
100            
101            let mut queue = self.message_queue.write().await;
102            queue.push(data.to_vec());
103            Ok(())
104        })
105    }
106
107    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
108        Box::pin(async move {
109            if !self.connected {
110                return Err(TransportError::NotConnected);
111            }
112            
113            let mut queue = self.message_queue.write().await;
114            let messages = queue.drain(..).collect();
115            Ok(messages)
116        })
117    }
118
119    fn is_connected(&self) -> bool {
120        self.connected
121    }
122}
123
124impl Clone for InMemoryTransport {
125    fn clone(&self) -> Self {
126        Self {
127            connected: self.connected,
128            message_queue: self.message_queue.clone(),
129        }
130    }
131}
132
133/// WebSocket transport wrapper
134pub struct WebSocketTransport {
135    inner: websocket::WebSocketTransport,
136}
137
138impl WebSocketTransport {
139    pub fn new(url: String) -> Self {
140        Self {
141            inner: websocket::WebSocketTransport::new(url),
142        }
143    }
144
145    pub async fn connect(&self) -> Result<(), TransportError> {
146        self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
147    }
148
149    pub async fn disconnect(&self) -> Result<(), TransportError> {
150        self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
151    }
152}
153
154impl SyncTransport for WebSocketTransport {
155    type Error = TransportError;
156
157    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
158        Box::pin(async move {
159            self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
160        })
161    }
162
163    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
164        Box::pin(async move {
165            self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
166        })
167    }
168
169    fn is_connected(&self) -> bool {
170        self.inner.is_connected()
171    }
172}
173
174impl Clone for WebSocketTransport {
175    fn clone(&self) -> Self {
176        Self {
177            inner: self.inner.clone(),
178        }
179    }
180}
181
182// Re-export HybridTransport from the implementation module
183pub use hybrid_transport_impl::HybridTransport;
184
185/// Transport configuration
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct TransportConfig {
188    pub url: Option<String>,
189    pub timeout: std::time::Duration,
190    pub retry_attempts: u32,
191    pub heartbeat_interval: std::time::Duration,
192}
193
194impl Default for TransportConfig {
195    fn default() -> Self {
196        Self {
197            url: None,
198            timeout: std::time::Duration::from_secs(30),
199            retry_attempts: 3,
200            heartbeat_interval: std::time::Duration::from_secs(30),
201        }
202    }
203}
204
205/// Transport factory
206pub struct TransportFactory;
207
208impl TransportFactory {
209    pub fn websocket(url: String) -> WebSocketTransport {
210        WebSocketTransport::new(url)
211    }
212
213    pub fn leptos_ws_pro(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
214        HybridTransport::with_leptos_ws_pro(config)
215    }
216
217    pub fn compatibility(config: leptos_ws_pro_transport::LeptosWsProConfig) -> HybridTransport {
218        HybridTransport::with_compatibility(config)
219    }
220
221    pub fn in_memory() -> InMemoryTransport {
222        InMemoryTransport::new()
223    }
224
225    pub fn hybrid(primary_url: String) -> HybridTransport {
226        let primary = HybridTransport::with_leptos_ws_pro(leptos_ws_pro_transport::LeptosWsProConfig {
227            url: primary_url,
228            ..Default::default()
229        });
230        let fallback = HybridTransport::with_in_memory();
231        HybridTransport::with_fallback(primary, fallback)
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    #[tokio::test]
240    async fn test_in_memory_transport() {
241        let transport = InMemoryTransport::new();
242        
243        // Test send
244        let data = b"test message";
245        assert!(transport.send(data).await.is_ok());
246        
247        // Test receive
248        let received = transport.receive().await.unwrap();
249        assert_eq!(received.len(), 1);
250        assert_eq!(received[0], data);
251    }
252
253    #[tokio::test]
254    async fn test_hybrid_transport_fallback() {
255        let primary = HybridTransport::with_websocket("ws://invalid-url".to_string());
256        let fallback = HybridTransport::with_in_memory();
257        let transport = HybridTransport::with_fallback(primary, fallback.clone());
258        
259        // Send message to fallback transport directly
260        let data = b"test message";
261        assert!(fallback.send(data).await.is_ok());
262        
263        let received = fallback.receive().await.unwrap();
264        assert_eq!(received.len(), 1);
265        assert_eq!(received[0], data);
266    }
267}