leptos_sync_core/transport/
mod.rs

1//! Transport layer for synchronization
2
3use crate::storage::StorageError;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use thiserror::Error;
8
9pub mod websocket;
10pub mod memory;
11
12#[derive(Error, Debug)]
13pub enum TransportError {
14    #[error("Connection failed: {0}")]
15    ConnectionFailed(String),
16    #[error("Send failed: {0}")]
17    SendFailed(String),
18    #[error("Receive failed: {0}")]
19    ReceiveFailed(String),
20    #[error("Serialization failed: {0}")]
21    SerializationFailed(String),
22    #[error("Not connected")]
23    NotConnected,
24}
25
26/// Transport trait for synchronization
27pub trait SyncTransport: Send + Sync {
28    type Error: std::error::Error + Send + Sync;
29    
30    /// Send data to remote peers
31    fn send(&self, data: &[u8]) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
32    
33    /// Receive data from remote peers
34    fn receive(&self) -> impl std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send;
35    
36    /// Check if transport is connected
37    fn is_connected(&self) -> bool;
38}
39
40/// In-memory transport for testing
41pub struct InMemoryTransport {
42    connected: bool,
43    message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
44}
45
46impl InMemoryTransport {
47    pub fn new() -> Self {
48        Self {
49            connected: true,
50            message_queue: Arc::new(RwLock::new(Vec::new())),
51        }
52    }
53
54    pub fn with_connection_status(connected: bool) -> Self {
55        Self {
56            connected,
57            message_queue: Arc::new(RwLock::new(Vec::new())),
58        }
59    }
60}
61
62impl SyncTransport for InMemoryTransport {
63    type Error = TransportError;
64
65    async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
66        if !self.connected {
67            return Err(TransportError::NotConnected);
68        }
69        
70        let mut queue = self.message_queue.write().await;
71        queue.push(data.to_vec());
72        Ok(())
73    }
74
75    async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
76        if !self.connected {
77            return Err(TransportError::NotConnected);
78        }
79        
80        let mut queue = self.message_queue.write().await;
81        let messages = queue.drain(..).collect();
82        Ok(messages)
83    }
84
85    fn is_connected(&self) -> bool {
86        self.connected
87    }
88}
89
90impl Clone for InMemoryTransport {
91    fn clone(&self) -> Self {
92        Self {
93            connected: self.connected,
94            message_queue: self.message_queue.clone(),
95        }
96    }
97}
98
99/// WebSocket transport wrapper
100pub struct WebSocketTransport {
101    inner: websocket::WebSocketTransport,
102}
103
104impl WebSocketTransport {
105    pub fn new(url: String) -> Self {
106        Self {
107            inner: websocket::WebSocketTransport::new(url),
108        }
109    }
110
111    pub async fn connect(&self) -> Result<(), TransportError> {
112        self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
113    }
114
115    pub async fn disconnect(&self) -> Result<(), TransportError> {
116        self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
117    }
118}
119
120impl SyncTransport for WebSocketTransport {
121    type Error = TransportError;
122
123    async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
124        self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
125    }
126
127    async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
128        self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
129    }
130
131    fn is_connected(&self) -> bool {
132        self.inner.is_connected()
133    }
134}
135
136impl Clone for WebSocketTransport {
137    fn clone(&self) -> Self {
138        Self {
139            inner: self.inner.clone(),
140        }
141    }
142}
143
144/// Hybrid transport that can use multiple backends
145#[derive(Clone)]
146pub enum HybridTransport {
147    WebSocket(WebSocketTransport),
148    InMemory(InMemoryTransport),
149    Fallback {
150        primary: WebSocketTransport,
151        fallback: InMemoryTransport,
152    },
153}
154
155impl HybridTransport {
156    pub fn with_websocket(url: String) -> Self {
157        Self::WebSocket(WebSocketTransport::new(url))
158    }
159
160    pub fn with_in_memory() -> Self {
161        Self::InMemory(InMemoryTransport::new())
162    }
163
164    pub fn with_fallback(primary: WebSocketTransport, fallback: InMemoryTransport) -> Self {
165        Self::Fallback { primary, fallback }
166    }
167
168    pub fn add_transport(&mut self, transport: HybridTransport) {
169        // For now, just replace the current transport
170        // In a more sophisticated implementation, you'd maintain a list
171        *self = transport;
172    }
173}
174
175impl SyncTransport for HybridTransport {
176    type Error = TransportError;
177
178    async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
179        match self {
180            HybridTransport::WebSocket(ws) => ws.send(data).await,
181            HybridTransport::InMemory(mem) => mem.send(data).await,
182            HybridTransport::Fallback { primary, fallback } => {
183                // Try primary first, fall back to fallback
184                match primary.send(data).await {
185                    Ok(()) => Ok(()),
186                    Err(_) => fallback.send(data).await,
187                }
188            }
189        }
190    }
191
192    async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
193        match self {
194            HybridTransport::WebSocket(ws) => ws.receive().await,
195            HybridTransport::InMemory(mem) => mem.receive().await,
196            HybridTransport::Fallback { primary, fallback } => {
197                // Try primary first, fall back to fallback
198                match primary.receive().await {
199                    Ok(messages) => Ok(messages),
200                    Err(_) => fallback.receive().await,
201                }
202            }
203        }
204    }
205
206    fn is_connected(&self) -> bool {
207        match self {
208            HybridTransport::WebSocket(ws) => ws.is_connected(),
209            HybridTransport::InMemory(mem) => mem.is_connected(),
210            HybridTransport::Fallback { primary, fallback } => {
211                primary.is_connected() || fallback.is_connected()
212            }
213        }
214    }
215}
216
217/// Transport configuration
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct TransportConfig {
220    pub url: Option<String>,
221    pub timeout: std::time::Duration,
222    pub retry_attempts: u32,
223    pub heartbeat_interval: std::time::Duration,
224}
225
226impl Default for TransportConfig {
227    fn default() -> Self {
228        Self {
229            url: None,
230            timeout: std::time::Duration::from_secs(30),
231            retry_attempts: 3,
232            heartbeat_interval: std::time::Duration::from_secs(30),
233        }
234    }
235}
236
237/// Transport factory
238pub struct TransportFactory;
239
240impl TransportFactory {
241    pub fn websocket(url: String) -> WebSocketTransport {
242        WebSocketTransport::new(url)
243    }
244
245    pub fn in_memory() -> InMemoryTransport {
246        InMemoryTransport::new()
247    }
248
249    pub fn hybrid(primary_url: String) -> HybridTransport {
250        let primary = WebSocketTransport::new(primary_url);
251        let fallback = InMemoryTransport::new();
252        HybridTransport::with_fallback(primary, fallback)
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[tokio::test]
261    async fn test_in_memory_transport() {
262        let transport = InMemoryTransport::new();
263        
264        // Test send
265        let data = b"test message";
266        assert!(transport.send(data).await.is_ok());
267        
268        // Test receive
269        let received = transport.receive().await.unwrap();
270        assert_eq!(received.len(), 1);
271        assert_eq!(received[0], data);
272    }
273
274    #[tokio::test]
275    async fn test_hybrid_transport_fallback() {
276        let primary = WebSocketTransport::new("ws://invalid-url".to_string());
277        let fallback = InMemoryTransport::new();
278        let transport = HybridTransport::with_fallback(primary, fallback.clone());
279        
280        // Send message to fallback transport directly
281        let data = b"test message";
282        assert!(fallback.send(data).await.is_ok());
283        
284        let received = fallback.receive().await.unwrap();
285        assert_eq!(received.len(), 1);
286        assert_eq!(received[0], data);
287    }
288}