leptos_sync_core/transport/
mod.rs1use crate::storage::StorageError;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use thiserror::Error;
8
9pub mod websocket;
10pub mod memory;
11
12#[derive(Error, Debug)]
13pub enum TransportError {
14 #[error("Connection failed: {0}")]
15 ConnectionFailed(String),
16 #[error("Send failed: {0}")]
17 SendFailed(String),
18 #[error("Receive failed: {0}")]
19 ReceiveFailed(String),
20 #[error("Serialization failed: {0}")]
21 SerializationFailed(String),
22 #[error("Not connected")]
23 NotConnected,
24}
25
26pub trait SyncTransport: Send + Sync {
28 type Error: std::error::Error + Send + Sync;
29
30 fn send(&self, data: &[u8]) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
32
33 fn receive(&self) -> impl std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send;
35
36 fn is_connected(&self) -> bool;
38}
39
40pub struct InMemoryTransport {
42 connected: bool,
43 message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
44}
45
46impl InMemoryTransport {
47 pub fn new() -> Self {
48 Self {
49 connected: true,
50 message_queue: Arc::new(RwLock::new(Vec::new())),
51 }
52 }
53
54 pub fn with_connection_status(connected: bool) -> Self {
55 Self {
56 connected,
57 message_queue: Arc::new(RwLock::new(Vec::new())),
58 }
59 }
60}
61
62impl SyncTransport for InMemoryTransport {
63 type Error = TransportError;
64
65 async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
66 if !self.connected {
67 return Err(TransportError::NotConnected);
68 }
69
70 let mut queue = self.message_queue.write().await;
71 queue.push(data.to_vec());
72 Ok(())
73 }
74
75 async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
76 if !self.connected {
77 return Err(TransportError::NotConnected);
78 }
79
80 let mut queue = self.message_queue.write().await;
81 let messages = queue.drain(..).collect();
82 Ok(messages)
83 }
84
85 fn is_connected(&self) -> bool {
86 self.connected
87 }
88}
89
90impl Clone for InMemoryTransport {
91 fn clone(&self) -> Self {
92 Self {
93 connected: self.connected,
94 message_queue: self.message_queue.clone(),
95 }
96 }
97}
98
99pub struct WebSocketTransport {
101 inner: websocket::WebSocketTransport,
102}
103
104impl WebSocketTransport {
105 pub fn new(url: String) -> Self {
106 Self {
107 inner: websocket::WebSocketTransport::new(url),
108 }
109 }
110
111 pub async fn connect(&self) -> Result<(), TransportError> {
112 self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
113 }
114
115 pub async fn disconnect(&self) -> Result<(), TransportError> {
116 self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
117 }
118}
119
120impl SyncTransport for WebSocketTransport {
121 type Error = TransportError;
122
123 async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
124 self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
125 }
126
127 async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
128 self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
129 }
130
131 fn is_connected(&self) -> bool {
132 self.inner.is_connected()
133 }
134}
135
136impl Clone for WebSocketTransport {
137 fn clone(&self) -> Self {
138 Self {
139 inner: self.inner.clone(),
140 }
141 }
142}
143
144#[derive(Clone)]
146pub enum HybridTransport {
147 WebSocket(WebSocketTransport),
148 InMemory(InMemoryTransport),
149 Fallback {
150 primary: WebSocketTransport,
151 fallback: InMemoryTransport,
152 },
153}
154
155impl HybridTransport {
156 pub fn with_websocket(url: String) -> Self {
157 Self::WebSocket(WebSocketTransport::new(url))
158 }
159
160 pub fn with_in_memory() -> Self {
161 Self::InMemory(InMemoryTransport::new())
162 }
163
164 pub fn with_fallback(primary: WebSocketTransport, fallback: InMemoryTransport) -> Self {
165 Self::Fallback { primary, fallback }
166 }
167
168 pub fn add_transport(&mut self, transport: HybridTransport) {
169 *self = transport;
172 }
173}
174
175impl SyncTransport for HybridTransport {
176 type Error = TransportError;
177
178 async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
179 match self {
180 HybridTransport::WebSocket(ws) => ws.send(data).await,
181 HybridTransport::InMemory(mem) => mem.send(data).await,
182 HybridTransport::Fallback { primary, fallback } => {
183 match primary.send(data).await {
185 Ok(()) => Ok(()),
186 Err(_) => fallback.send(data).await,
187 }
188 }
189 }
190 }
191
192 async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
193 match self {
194 HybridTransport::WebSocket(ws) => ws.receive().await,
195 HybridTransport::InMemory(mem) => mem.receive().await,
196 HybridTransport::Fallback { primary, fallback } => {
197 match primary.receive().await {
199 Ok(messages) => Ok(messages),
200 Err(_) => fallback.receive().await,
201 }
202 }
203 }
204 }
205
206 fn is_connected(&self) -> bool {
207 match self {
208 HybridTransport::WebSocket(ws) => ws.is_connected(),
209 HybridTransport::InMemory(mem) => mem.is_connected(),
210 HybridTransport::Fallback { primary, fallback } => {
211 primary.is_connected() || fallback.is_connected()
212 }
213 }
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct TransportConfig {
220 pub url: Option<String>,
221 pub timeout: std::time::Duration,
222 pub retry_attempts: u32,
223 pub heartbeat_interval: std::time::Duration,
224}
225
226impl Default for TransportConfig {
227 fn default() -> Self {
228 Self {
229 url: None,
230 timeout: std::time::Duration::from_secs(30),
231 retry_attempts: 3,
232 heartbeat_interval: std::time::Duration::from_secs(30),
233 }
234 }
235}
236
237pub struct TransportFactory;
239
240impl TransportFactory {
241 pub fn websocket(url: String) -> WebSocketTransport {
242 WebSocketTransport::new(url)
243 }
244
245 pub fn in_memory() -> InMemoryTransport {
246 InMemoryTransport::new()
247 }
248
249 pub fn hybrid(primary_url: String) -> HybridTransport {
250 let primary = WebSocketTransport::new(primary_url);
251 let fallback = InMemoryTransport::new();
252 HybridTransport::with_fallback(primary, fallback)
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[tokio::test]
261 async fn test_in_memory_transport() {
262 let transport = InMemoryTransport::new();
263
264 let data = b"test message";
266 assert!(transport.send(data).await.is_ok());
267
268 let received = transport.receive().await.unwrap();
270 assert_eq!(received.len(), 1);
271 assert_eq!(received[0], data);
272 }
273
274 #[tokio::test]
275 async fn test_hybrid_transport_fallback() {
276 let primary = WebSocketTransport::new("ws://invalid-url".to_string());
277 let fallback = InMemoryTransport::new();
278 let transport = HybridTransport::with_fallback(primary, fallback.clone());
279
280 let data = b"test message";
282 assert!(fallback.send(data).await.is_ok());
283
284 let received = fallback.receive().await.unwrap();
285 assert_eq!(received.len(), 1);
286 assert_eq!(received[0], data);
287 }
288}