leptos_sync_core/transport/
mod.rs1use crate::storage::StorageError;
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use thiserror::Error;
8
9pub mod websocket;
10pub mod memory;
11pub mod multi_transport;
12
13#[derive(Error, Debug)]
14pub enum TransportError {
15 #[error("Connection failed: {0}")]
16 ConnectionFailed(String),
17 #[error("Send failed: {0}")]
18 SendFailed(String),
19 #[error("Receive failed: {0}")]
20 ReceiveFailed(String),
21 #[error("Serialization failed: {0}")]
22 SerializationFailed(String),
23 #[error("Not connected")]
24 NotConnected,
25}
26
27pub trait SyncTransport: Send + Sync {
29 type Error: std::error::Error + Send + Sync;
30
31 fn send(&self, data: &[u8]) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
33
34 fn receive(&self) -> impl std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send;
36
37 fn is_connected(&self) -> bool;
39}
40
41pub struct InMemoryTransport {
43 connected: bool,
44 message_queue: Arc<RwLock<Vec<Vec<u8>>>>,
45}
46
47impl InMemoryTransport {
48 pub fn new() -> Self {
49 Self {
50 connected: true,
51 message_queue: Arc::new(RwLock::new(Vec::new())),
52 }
53 }
54
55 pub fn with_connection_status(connected: bool) -> Self {
56 Self {
57 connected,
58 message_queue: Arc::new(RwLock::new(Vec::new())),
59 }
60 }
61}
62
63impl SyncTransport for InMemoryTransport {
64 type Error = TransportError;
65
66 async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
67 if !self.connected {
68 return Err(TransportError::NotConnected);
69 }
70
71 let mut queue = self.message_queue.write().await;
72 queue.push(data.to_vec());
73 Ok(())
74 }
75
76 async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
77 if !self.connected {
78 return Err(TransportError::NotConnected);
79 }
80
81 let mut queue = self.message_queue.write().await;
82 let messages = queue.drain(..).collect();
83 Ok(messages)
84 }
85
86 fn is_connected(&self) -> bool {
87 self.connected
88 }
89}
90
91impl Clone for InMemoryTransport {
92 fn clone(&self) -> Self {
93 Self {
94 connected: self.connected,
95 message_queue: self.message_queue.clone(),
96 }
97 }
98}
99
100pub struct WebSocketTransport {
102 inner: websocket::WebSocketTransport,
103}
104
105impl WebSocketTransport {
106 pub fn new(url: String) -> Self {
107 Self {
108 inner: websocket::WebSocketTransport::new(url),
109 }
110 }
111
112 pub async fn connect(&self) -> Result<(), TransportError> {
113 self.inner.connect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
114 }
115
116 pub async fn disconnect(&self) -> Result<(), TransportError> {
117 self.inner.disconnect().await.map_err(|e| TransportError::ConnectionFailed(e.to_string()))
118 }
119}
120
121impl SyncTransport for WebSocketTransport {
122 type Error = TransportError;
123
124 async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
125 self.inner.send(data).await.map_err(|e| TransportError::SendFailed(e.to_string()))
126 }
127
128 async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
129 self.inner.receive().await.map_err(|e| TransportError::ReceiveFailed(e.to_string()))
130 }
131
132 fn is_connected(&self) -> bool {
133 self.inner.is_connected()
134 }
135}
136
137impl Clone for WebSocketTransport {
138 fn clone(&self) -> Self {
139 Self {
140 inner: self.inner.clone(),
141 }
142 }
143}
144
145#[derive(Clone)]
147pub enum HybridTransport {
148 WebSocket(WebSocketTransport),
149 InMemory(InMemoryTransport),
150 Fallback {
151 primary: WebSocketTransport,
152 fallback: InMemoryTransport,
153 },
154}
155
156impl HybridTransport {
157 pub fn with_websocket(url: String) -> Self {
158 Self::WebSocket(WebSocketTransport::new(url))
159 }
160
161 pub fn with_in_memory() -> Self {
162 Self::InMemory(InMemoryTransport::new())
163 }
164
165 pub fn with_fallback(primary: WebSocketTransport, fallback: InMemoryTransport) -> Self {
166 Self::Fallback { primary, fallback }
167 }
168
169 pub fn add_transport(&mut self, transport: HybridTransport) {
170 *self = transport;
173 }
174}
175
176impl SyncTransport for HybridTransport {
177 type Error = TransportError;
178
179 async fn send(&self, data: &[u8]) -> Result<(), Self::Error> {
180 match self {
181 HybridTransport::WebSocket(ws) => ws.send(data).await,
182 HybridTransport::InMemory(mem) => mem.send(data).await,
183 HybridTransport::Fallback { primary, fallback } => {
184 match primary.send(data).await {
186 Ok(()) => Ok(()),
187 Err(_) => fallback.send(data).await,
188 }
189 }
190 }
191 }
192
193 async fn receive(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
194 match self {
195 HybridTransport::WebSocket(ws) => ws.receive().await,
196 HybridTransport::InMemory(mem) => mem.receive().await,
197 HybridTransport::Fallback { primary, fallback } => {
198 match primary.receive().await {
200 Ok(messages) => Ok(messages),
201 Err(_) => fallback.receive().await,
202 }
203 }
204 }
205 }
206
207 fn is_connected(&self) -> bool {
208 match self {
209 HybridTransport::WebSocket(ws) => ws.is_connected(),
210 HybridTransport::InMemory(mem) => mem.is_connected(),
211 HybridTransport::Fallback { primary, fallback } => {
212 primary.is_connected() || fallback.is_connected()
213 }
214 }
215 }
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct TransportConfig {
221 pub url: Option<String>,
222 pub timeout: std::time::Duration,
223 pub retry_attempts: u32,
224 pub heartbeat_interval: std::time::Duration,
225}
226
227impl Default for TransportConfig {
228 fn default() -> Self {
229 Self {
230 url: None,
231 timeout: std::time::Duration::from_secs(30),
232 retry_attempts: 3,
233 heartbeat_interval: std::time::Duration::from_secs(30),
234 }
235 }
236}
237
238pub struct TransportFactory;
240
241impl TransportFactory {
242 pub fn websocket(url: String) -> WebSocketTransport {
243 WebSocketTransport::new(url)
244 }
245
246 pub fn in_memory() -> InMemoryTransport {
247 InMemoryTransport::new()
248 }
249
250 pub fn hybrid(primary_url: String) -> HybridTransport {
251 let primary = WebSocketTransport::new(primary_url);
252 let fallback = InMemoryTransport::new();
253 HybridTransport::with_fallback(primary, fallback)
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 #[tokio::test]
262 async fn test_in_memory_transport() {
263 let transport = InMemoryTransport::new();
264
265 let data = b"test message";
267 assert!(transport.send(data).await.is_ok());
268
269 let received = transport.receive().await.unwrap();
271 assert_eq!(received.len(), 1);
272 assert_eq!(received[0], data);
273 }
274
275 #[tokio::test]
276 async fn test_hybrid_transport_fallback() {
277 let primary = WebSocketTransport::new("ws://invalid-url".to_string());
278 let fallback = InMemoryTransport::new();
279 let transport = HybridTransport::with_fallback(primary, fallback.clone());
280
281 let data = b"test message";
283 assert!(fallback.send(data).await.is_ok());
284
285 let received = fallback.receive().await.unwrap();
286 assert_eq!(received.len(), 1);
287 assert_eq!(received[0], data);
288 }
289}