leptos_sync_core/transport/
mod.rs

1//! Transport layer for synchronization
2
3use crate::storage::StorageError;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use thiserror::Error;
8
9pub mod websocket;
10pub mod memory;
11pub mod multi_transport;
12
13#[derive(Error, Debug)]
14pub enum TransportError {
15    #[error("Connection failed: {0}")]
16    ConnectionFailed(String),
17    #[error("Send failed: {0}")]
18    SendFailed(String),
19    #[error("Receive failed: {0}")]
20    ReceiveFailed(String),
21    #[error("Serialization failed: {0}")]
22    SerializationFailed(String),
23    #[error("Not connected")]
24    NotConnected,
25}
26
27/// Transport trait for synchronization
28pub trait SyncTransport: Send + Sync {
29    type Error: std::error::Error + Send + Sync;
30    
31    /// Send data to remote peers
32    fn send(&self, data: &[u8]) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
33    
34    /// Receive data from remote peers
35    fn receive(&self) -> impl std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send;
36    
37    /// Check if transport is connected
38    fn is_connected(&self) -> bool;
39}
40
41/// In-memory transport for testing
42pub struct InMemoryTransport {
43    connected: bool,
44    message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
45}
46
47impl InMemoryTransport {
48    pub fn new() -> Self {
49        Self {
50            connected: true,
51            message_queue: Arc::new(RwLock::new(Vec::new())),
52        }
53    }
54
55    pub fn with_connection_status(connected: bool) -> Self {
56        Self {
57            connected,
58            message_queue: Arc::new(RwLock::new(Vec::new())),
59        }
60    }
61}
62
63impl SyncTransport for InMemoryTransport {
64    type Error = TransportError;
65
66    async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
67        if !self.connected {
68            return Err(TransportError::NotConnected);
69        }
70        
71        let mut queue = self.message_queue.write().await;
72        queue.push(data.to_vec());
73        Ok(())
74    }
75
76    async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
77        if !self.connected {
78            return Err(TransportError::NotConnected);
79        }
80        
81        let mut queue = self.message_queue.write().await;
82        let messages = queue.drain(..).collect();
83        Ok(messages)
84    }
85
86    fn is_connected(&self) -> bool {
87        self.connected
88    }
89}
90
91impl Clone for InMemoryTransport {
92    fn clone(&self) -> Self {
93        Self {
94            connected: self.connected,
95            message_queue: self.message_queue.clone(),
96        }
97    }
98}
99
100/// WebSocket transport wrapper
101pub struct WebSocketTransport {
102    inner: websocket::WebSocketTransport,
103}
104
105impl WebSocketTransport {
106    pub fn new(url: String) -> Self {
107        Self {
108            inner: websocket::WebSocketTransport::new(url),
109        }
110    }
111
112    pub async fn connect(&self) -> Result<(), TransportError> {
113        self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
114    }
115
116    pub async fn disconnect(&self) -> Result<(), TransportError> {
117        self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
118    }
119}
120
121impl SyncTransport for WebSocketTransport {
122    type Error = TransportError;
123
124    async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
125        self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
126    }
127
128    async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
129        self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
130    }
131
132    fn is_connected(&self) -> bool {
133        self.inner.is_connected()
134    }
135}
136
137impl Clone for WebSocketTransport {
138    fn clone(&self) -> Self {
139        Self {
140            inner: self.inner.clone(),
141        }
142    }
143}
144
145/// Hybrid transport that can use multiple backends
146#[derive(Clone)]
147pub enum HybridTransport {
148    WebSocket(WebSocketTransport),
149    InMemory(InMemoryTransport),
150    Fallback {
151        primary: WebSocketTransport,
152        fallback: InMemoryTransport,
153    },
154}
155
156impl HybridTransport {
157    pub fn with_websocket(url: String) -> Self {
158        Self::WebSocket(WebSocketTransport::new(url))
159    }
160
161    pub fn with_in_memory() -> Self {
162        Self::InMemory(InMemoryTransport::new())
163    }
164
165    pub fn with_fallback(primary: WebSocketTransport, fallback: InMemoryTransport) -> Self {
166        Self::Fallback { primary, fallback }
167    }
168
169    pub fn add_transport(&mut self, transport: HybridTransport) {
170        // For now, just replace the current transport
171        // In a more sophisticated implementation, you'd maintain a list
172        *self = transport;
173    }
174}
175
176impl SyncTransport for HybridTransport {
177    type Error = TransportError;
178
179    async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
180        match self {
181            HybridTransport::WebSocket(ws) => ws.send(data).await,
182            HybridTransport::InMemory(mem) => mem.send(data).await,
183            HybridTransport::Fallback { primary, fallback } => {
184                // Try primary first, fall back to fallback
185                match primary.send(data).await {
186                    Ok(()) => Ok(()),
187                    Err(_) => fallback.send(data).await,
188                }
189            }
190        }
191    }
192
193    async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
194        match self {
195            HybridTransport::WebSocket(ws) => ws.receive().await,
196            HybridTransport::InMemory(mem) => mem.receive().await,
197            HybridTransport::Fallback { primary, fallback } => {
198                // Try primary first, fall back to fallback
199                match primary.receive().await {
200                    Ok(messages) => Ok(messages),
201                    Err(_) => fallback.receive().await,
202                }
203            }
204        }
205    }
206
207    fn is_connected(&self) -> bool {
208        match self {
209            HybridTransport::WebSocket(ws) => ws.is_connected(),
210            HybridTransport::InMemory(mem) => mem.is_connected(),
211            HybridTransport::Fallback { primary, fallback } => {
212                primary.is_connected() || fallback.is_connected()
213            }
214        }
215    }
216}
217
218/// Transport configuration
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct TransportConfig {
221    pub url: Option<String>,
222    pub timeout: std::time::Duration,
223    pub retry_attempts: u32,
224    pub heartbeat_interval: std::time::Duration,
225}
226
227impl Default for TransportConfig {
228    fn default() -> Self {
229        Self {
230            url: None,
231            timeout: std::time::Duration::from_secs(30),
232            retry_attempts: 3,
233            heartbeat_interval: std::time::Duration::from_secs(30),
234        }
235    }
236}
237
238/// Transport factory
239pub struct TransportFactory;
240
241impl TransportFactory {
242    pub fn websocket(url: String) -> WebSocketTransport {
243        WebSocketTransport::new(url)
244    }
245
246    pub fn in_memory() -> InMemoryTransport {
247        InMemoryTransport::new()
248    }
249
250    pub fn hybrid(primary_url: String) -> HybridTransport {
251        let primary = WebSocketTransport::new(primary_url);
252        let fallback = InMemoryTransport::new();
253        HybridTransport::with_fallback(primary, fallback)
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    #[tokio::test]
262    async fn test_in_memory_transport() {
263        let transport = InMemoryTransport::new();
264        
265        // Test send
266        let data = b"test message";
267        assert!(transport.send(data).await.is_ok());
268        
269        // Test receive
270        let received = transport.receive().await.unwrap();
271        assert_eq!(received.len(), 1);
272        assert_eq!(received[0], data);
273    }
274
275    #[tokio::test]
276    async fn test_hybrid_transport_fallback() {
277        let primary = WebSocketTransport::new("ws://invalid-url".to_string());
278        let fallback = InMemoryTransport::new();
279        let transport = HybridTransport::with_fallback(primary, fallback.clone());
280        
281        // Send message to fallback transport directly
282        let data = b"test message";
283        assert!(fallback.send(data).await.is_ok());
284        
285        let received = fallback.receive().await.unwrap();
286        assert_eq!(received.len(), 1);
287        assert_eq!(received[0], data);
288    }
289}