leptos_sync_core/transport/
websocket_integration.rs1use super::{CrdtType, SyncMessage, WebSocketClient, WebSocketClientConfig};
4use crate::crdt::{Mergeable, ReplicaId};
5use crate::storage::Storage;
6use crate::sync::{SyncEngine, SyncEngineError};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::SystemTime;
10use thiserror::Error;
11use tokio::sync::RwLock;
12use tokio::time::{interval, Duration};
13
14#[derive(Error, Debug)]
15pub enum WebSocketIntegrationError {
16 #[error("Sync engine error: {0}")]
17 SyncEngine(#[from] SyncEngineError),
18 #[error("Transport error: {0}")]
19 Transport(#[from] super::TransportError),
20 #[error("WebSocket client error: {0}")]
21 WebSocketClient(#[from] super::WebSocketClientError),
22 #[error("Serialization error: {0}")]
23 Serialization(#[from] serde_json::Error),
24 #[error("Integration error: {0}")]
25 Integration(String),
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct WebSocketIntegrationConfig {
31 pub client_config: WebSocketClientConfig,
32 pub sync_interval: Duration,
33 pub delta_batch_size: usize,
34 pub enable_compression: bool,
35 pub enable_heartbeat: bool,
36}
37
38impl Default for WebSocketIntegrationConfig {
39 fn default() -> Self {
40 Self {
41 client_config: WebSocketClientConfig::default(),
42 sync_interval: Duration::from_millis(100),
43 delta_batch_size: 10,
44 enable_compression: false,
45 enable_heartbeat: true,
46 }
47 }
48}
49
50pub struct WebSocketSyncEngine {
52 sync_engine: Arc<SyncEngine<WebSocketClient>>,
53 websocket_client: Arc<WebSocketClient>,
54 config: WebSocketIntegrationConfig,
55 is_running: Arc<RwLock<bool>>,
56 sync_task: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
57}
58
59impl WebSocketSyncEngine {
60 pub fn new(
62 storage: Storage,
63 config: WebSocketIntegrationConfig,
64 replica_id: ReplicaId,
65 ) -> Self {
66 let websocket_client = Arc::new(WebSocketClient::new(
67 config.client_config.clone(),
68 replica_id,
69 ));
70
71 let sync_engine = Arc::new(SyncEngine::new(storage, (*websocket_client).clone()));
72
73 Self {
74 sync_engine,
75 websocket_client,
76 config,
77 is_running: Arc::new(RwLock::new(false)),
78 sync_task: Arc::new(RwLock::new(None)),
79 }
80 }
81
82 pub async fn start(&self) -> Result<(), WebSocketIntegrationError> {
84 let mut is_running = self.is_running.write().await;
85 if *is_running {
86 return Ok(());
87 }
88
89 self.websocket_client.connect().await?;
91
92 self.start_sync_task().await;
94
95 *is_running = true;
96 tracing::info!("WebSocket sync engine started");
97 Ok(())
98 }
99
100 pub async fn stop(&self) -> Result<(), WebSocketIntegrationError> {
102 let mut is_running = self.is_running.write().await;
103 if !*is_running {
104 return Ok(());
105 }
106
107 self.stop_sync_task().await;
109
110 self.websocket_client.disconnect().await?;
112
113 *is_running = false;
114 tracing::info!("WebSocket sync engine stopped");
115 Ok(())
116 }
117
118 pub async fn send_delta(
120 &self,
121 collection_id: String,
122 crdt_type: CrdtType,
123 delta: Vec<u8>,
124 ) -> Result<(), WebSocketIntegrationError> {
125 let message = SyncMessage::Delta {
126 collection_id,
127 crdt_type,
128 delta,
129 timestamp: SystemTime::now(),
130 replica_id: self.websocket_client.replica_id(),
131 };
132
133 self.websocket_client.send_message(message).await?;
134 Ok(())
135 }
136
137 pub fn sync_engine(&self) -> &Arc<SyncEngine<WebSocketClient>> {
139 &self.sync_engine
140 }
141
142 pub fn websocket_client(&self) -> &Arc<WebSocketClient> {
144 &self.websocket_client
145 }
146
147 pub async fn is_running(&self) -> bool {
149 *self.is_running.read().await
150 }
151
152 pub async fn is_connected(&self) -> bool {
154 self.websocket_client.is_connected().await
155 }
156
157 async fn start_sync_task(&self) {
160 let sync_engine = self.sync_engine.clone();
161 let websocket_client = self.websocket_client.clone();
162 let config = self.config.clone();
163 let is_running = self.is_running.clone();
164
165 let sync_task = tokio::spawn(async move {
166 let mut interval = interval(config.sync_interval);
167 let mut delta_buffer = Vec::new();
168
169 loop {
170 interval.tick().await;
171
172 if !*is_running.read().await {
174 break;
175 }
176
177 if let Ok(Some(message)) = websocket_client.receive_message().await {
179 if let Err(e) = Self::handle_incoming_message(&sync_engine, message).await {
180 tracing::error!("Error handling incoming message: {}", e);
181 }
182 }
183
184 if delta_buffer.len() >= config.delta_batch_size {
188 Self::send_delta_batch(&websocket_client, &mut delta_buffer).await;
189 }
190 }
191 });
192
193 let mut task_handle = self.sync_task.write().await;
194 *task_handle = Some(sync_task);
195 }
196
197 async fn stop_sync_task(&self) {
198 let mut task_handle = self.sync_task.write().await;
199 if let Some(task) = task_handle.take() {
200 task.abort();
201 }
202 }
203
204 async fn handle_incoming_message(
205 sync_engine: &Arc<SyncEngine<WebSocketClient>>,
206 message: SyncMessage,
207 ) -> Result<(), WebSocketIntegrationError> {
208 match message {
209 SyncMessage::Delta {
210 collection_id,
211 crdt_type,
212 delta,
213 replica_id,
214 ..
215 } => {
216 tracing::debug!(
218 "Received delta for collection {} from replica {:?}",
219 collection_id,
220 replica_id
221 );
222
223 }
226 SyncMessage::Heartbeat { replica_id, .. } => {
227 tracing::debug!("Received heartbeat from replica {:?}", replica_id);
228 }
230 SyncMessage::PeerJoin {
231 replica_id,
232 user_info,
233 } => {
234 tracing::info!(
235 "Peer joined: {:?} with user info: {:?}",
236 replica_id,
237 user_info
238 );
239 }
241 SyncMessage::PeerLeave { replica_id } => {
242 tracing::info!("Peer left: {:?}", replica_id);
243 }
245 _ => {
246 tracing::debug!("Received message: {:?}", message);
247 }
248 }
249
250 Ok(())
251 }
252
253 async fn send_delta_batch(
254 websocket_client: &Arc<WebSocketClient>,
255 delta_buffer: &mut Vec<(String, CrdtType, Vec<u8>)>,
256 ) {
257 for (collection_id, crdt_type, delta) in delta_buffer.drain(..) {
258 let message = SyncMessage::Delta {
259 collection_id,
260 crdt_type,
261 delta,
262 timestamp: SystemTime::now(),
263 replica_id: websocket_client.replica_id(),
264 };
265
266 if let Err(e) = websocket_client.send_message(message).await {
267 tracing::error!("Failed to send delta: {}", e);
268 }
269 }
270 }
271}
272
273pub struct WebSocketSyncEngineBuilder {
275 config: WebSocketIntegrationConfig,
276 replica_id: Option<ReplicaId>,
277}
278
279impl WebSocketSyncEngineBuilder {
280 pub fn new() -> Self {
281 Self {
282 config: WebSocketIntegrationConfig::default(),
283 replica_id: None,
284 }
285 }
286
287 pub fn with_config(mut self, config: WebSocketIntegrationConfig) -> Self {
288 self.config = config;
289 self
290 }
291
292 pub fn with_replica_id(mut self, replica_id: ReplicaId) -> Self {
293 self.replica_id = Some(replica_id);
294 self
295 }
296
297 pub fn with_url(mut self, url: String) -> Self {
298 self.config.client_config.url = url;
299 self
300 }
301
302 pub fn build(self, storage: Storage) -> WebSocketSyncEngine {
303 let replica_id = self
304 .replica_id
305 .unwrap_or_else(|| ReplicaId::from(uuid::Uuid::new_v4()));
306 WebSocketSyncEngine::new(storage, self.config, replica_id)
307 }
308}
309
310impl Default for WebSocketSyncEngineBuilder {
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::crdt::ReplicaId;
320 use crate::storage::memory::MemoryStorage;
321
322 fn create_test_replica_id() -> ReplicaId {
323 ReplicaId::from(uuid::Uuid::new_v4())
324 }
325
326 #[tokio::test]
327 async fn test_websocket_sync_engine_creation() {
328 let storage = MemoryStorage::new();
329 let config = WebSocketIntegrationConfig::default();
330 let replica_id = create_test_replica_id();
331
332 let engine =
333 WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
334
335 assert_eq!(engine.websocket_client().replica_id(), replica_id);
336 assert!(!engine.is_running().await);
337 }
338
339 #[tokio::test]
340 async fn test_websocket_sync_engine_builder() {
341 let storage = MemoryStorage::new();
342 let replica_id = create_test_replica_id();
343
344 let engine = WebSocketSyncEngineBuilder::new()
345 .with_replica_id(replica_id)
346 .with_url("ws://test.example.com".to_string())
347 .build(crate::storage::Storage::Memory(storage));
348
349 assert_eq!(engine.websocket_client().replica_id(), replica_id);
350 assert_eq!(engine.config.client_config.url, "ws://test.example.com");
351 }
352
353 #[tokio::test]
354 async fn test_send_delta() {
355 let storage = MemoryStorage::new();
356 let config = WebSocketIntegrationConfig::default();
357 let replica_id = create_test_replica_id();
358
359 let engine =
360 WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
361
362 let delta_data = b"test delta".to_vec();
363 let result = engine
364 .send_delta(
365 "test_collection".to_string(),
366 CrdtType::LwwRegister,
367 delta_data,
368 )
369 .await;
370
371 assert!(result.is_ok());
373 }
374
375 #[tokio::test]
376 async fn test_start_stop_cycle() {
377 let storage = MemoryStorage::new();
378 let config = WebSocketIntegrationConfig::default();
379 let replica_id = create_test_replica_id();
380
381 let engine =
382 WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
383
384 assert!(!engine.is_running().await);
386
387 let result = engine.start().await;
389 assert!(result.is_ok());
390 assert!(engine.is_running().await);
391
392 let result = engine.stop().await;
394 assert!(result.is_ok());
395 assert!(!engine.is_running().await);
396 }
397}