leptos_sync_core/transport/
websocket_integration.rs

1//! WebSocket integration with sync engine
2
3use super::{CrdtType, SyncMessage, WebSocketClient, WebSocketClientConfig};
4use crate::crdt::{Mergeable, ReplicaId};
5use crate::storage::Storage;
6use crate::sync::{SyncEngine, SyncEngineError};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::SystemTime;
10use thiserror::Error;
11use tokio::sync::RwLock;
12use tokio::time::{interval, Duration};
13
14#[derive(Error, Debug)]
15pub enum WebSocketIntegrationError {
16    #[error("Sync engine error: {0}")]
17    SyncEngine(#[from] SyncEngineError),
18    #[error("Transport error: {0}")]
19    Transport(#[from] super::TransportError),
20    #[error("WebSocket client error: {0}")]
21    WebSocketClient(#[from] super::WebSocketClientError),
22    #[error("Serialization error: {0}")]
23    Serialization(#[from] serde_json::Error),
24    #[error("Integration error: {0}")]
25    Integration(String),
26}
27
28/// Configuration for WebSocket integration
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct WebSocketIntegrationConfig {
31    pub client_config: WebSocketClientConfig,
32    pub sync_interval: Duration,
33    pub delta_batch_size: usize,
34    pub enable_compression: bool,
35    pub enable_heartbeat: bool,
36}
37
38impl Default for WebSocketIntegrationConfig {
39    fn default() -> Self {
40        Self {
41            client_config: WebSocketClientConfig::default(),
42            sync_interval: Duration::from_millis(100),
43            delta_batch_size: 10,
44            enable_compression: false,
45            enable_heartbeat: true,
46        }
47    }
48}
49
50/// WebSocket-integrated sync engine
51pub struct WebSocketSyncEngine {
52    sync_engine: Arc<SyncEngine<WebSocketClient>>,
53    websocket_client: Arc<WebSocketClient>,
54    config: WebSocketIntegrationConfig,
55    is_running: Arc<RwLock<bool>>,
56    sync_task: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
57}
58
59impl WebSocketSyncEngine {
60    /// Create a new WebSocket sync engine
61    pub fn new(
62        storage: Storage,
63        config: WebSocketIntegrationConfig,
64        replica_id: ReplicaId,
65    ) -> Self {
66        let websocket_client = Arc::new(WebSocketClient::new(
67            config.client_config.clone(),
68            replica_id,
69        ));
70
71        let sync_engine = Arc::new(SyncEngine::new(storage, (*websocket_client).clone()));
72
73        Self {
74            sync_engine,
75            websocket_client,
76            config,
77            is_running: Arc::new(RwLock::new(false)),
78            sync_task: Arc::new(RwLock::new(None)),
79        }
80    }
81
82    /// Start the WebSocket sync engine
83    pub async fn start(&self) -> Result<(), WebSocketIntegrationError> {
84        let mut is_running = self.is_running.write().await;
85        if *is_running {
86            return Ok(());
87        }
88
89        // Connect WebSocket client
90        self.websocket_client.connect().await?;
91
92        // Start sync task
93        self.start_sync_task().await;
94
95        *is_running = true;
96        tracing::info!("WebSocket sync engine started");
97        Ok(())
98    }
99
100    /// Stop the WebSocket sync engine
101    pub async fn stop(&self) -> Result<(), WebSocketIntegrationError> {
102        let mut is_running = self.is_running.write().await;
103        if !*is_running {
104            return Ok(());
105        }
106
107        // Stop sync task
108        self.stop_sync_task().await;
109
110        // Disconnect WebSocket client
111        self.websocket_client.disconnect().await?;
112
113        *is_running = false;
114        tracing::info!("WebSocket sync engine stopped");
115        Ok(())
116    }
117
118    /// Send a CRDT delta to peers
119    pub async fn send_delta(
120        &self,
121        collection_id: String,
122        crdt_type: CrdtType,
123        delta: Vec<u8>,
124    ) -> Result<(), WebSocketIntegrationError> {
125        let message = SyncMessage::Delta {
126            collection_id,
127            crdt_type,
128            delta,
129            timestamp: SystemTime::now(),
130            replica_id: self.websocket_client.replica_id(),
131        };
132
133        self.websocket_client.send_message(message).await?;
134        Ok(())
135    }
136
137    /// Get the underlying sync engine
138    pub fn sync_engine(&self) -> &Arc<SyncEngine<WebSocketClient>> {
139        &self.sync_engine
140    }
141
142    /// Get the WebSocket client
143    pub fn websocket_client(&self) -> &Arc<WebSocketClient> {
144        &self.websocket_client
145    }
146
147    /// Check if the engine is running
148    pub async fn is_running(&self) -> bool {
149        *self.is_running.read().await
150    }
151
152    /// Get connection status
153    pub async fn is_connected(&self) -> bool {
154        self.websocket_client.is_connected().await
155    }
156
157    // Private methods
158
159    async fn start_sync_task(&self) {
160        let sync_engine = self.sync_engine.clone();
161        let websocket_client = self.websocket_client.clone();
162        let config = self.config.clone();
163        let is_running = self.is_running.clone();
164
165        let sync_task = tokio::spawn(async move {
166            let mut interval = interval(config.sync_interval);
167            let mut delta_buffer = Vec::new();
168
169            loop {
170                interval.tick().await;
171
172                // Check if still running
173                if !*is_running.read().await {
174                    break;
175                }
176
177                // Process incoming messages
178                if let Ok(Some(message)) = websocket_client.receive_message().await {
179                    if let Err(e) = Self::handle_incoming_message(&sync_engine, message).await {
180                        tracing::error!("Error handling incoming message: {}", e);
181                    }
182                }
183
184                // Collect local deltas
185                // TODO: Implement delta collection from sync engine
186                // For now, just simulate
187                if delta_buffer.len() >= config.delta_batch_size {
188                    Self::send_delta_batch(&websocket_client, &mut delta_buffer).await;
189                }
190            }
191        });
192
193        let mut task_handle = self.sync_task.write().await;
194        *task_handle = Some(sync_task);
195    }
196
197    async fn stop_sync_task(&self) {
198        let mut task_handle = self.sync_task.write().await;
199        if let Some(task) = task_handle.take() {
200            task.abort();
201        }
202    }
203
204    async fn handle_incoming_message(
205        sync_engine: &Arc<SyncEngine<WebSocketClient>>,
206        message: SyncMessage,
207    ) -> Result<(), WebSocketIntegrationError> {
208        match message {
209            SyncMessage::Delta {
210                collection_id,
211                crdt_type,
212                delta,
213                replica_id,
214                ..
215            } => {
216                // Apply delta to local CRDT
217                tracing::debug!(
218                    "Received delta for collection {} from replica {:?}",
219                    collection_id,
220                    replica_id
221                );
222
223                // TODO: Apply delta to the appropriate CRDT in the sync engine
224                // This would involve deserializing the delta and merging it
225            }
226            SyncMessage::Heartbeat { replica_id, .. } => {
227                tracing::debug!("Received heartbeat from replica {:?}", replica_id);
228                // Update peer info in sync engine
229            }
230            SyncMessage::PeerJoin {
231                replica_id,
232                user_info,
233            } => {
234                tracing::info!(
235                    "Peer joined: {:?} with user info: {:?}",
236                    replica_id,
237                    user_info
238                );
239                // Add peer to sync engine
240            }
241            SyncMessage::PeerLeave { replica_id } => {
242                tracing::info!("Peer left: {:?}", replica_id);
243                // Remove peer from sync engine
244            }
245            _ => {
246                tracing::debug!("Received message: {:?}", message);
247            }
248        }
249
250        Ok(())
251    }
252
253    async fn send_delta_batch(
254        websocket_client: &Arc<WebSocketClient>,
255        delta_buffer: &mut Vec<(String, CrdtType, Vec<u8>)>,
256    ) {
257        for (collection_id, crdt_type, delta) in delta_buffer.drain(..) {
258            let message = SyncMessage::Delta {
259                collection_id,
260                crdt_type,
261                delta,
262                timestamp: SystemTime::now(),
263                replica_id: websocket_client.replica_id(),
264            };
265
266            if let Err(e) = websocket_client.send_message(message).await {
267                tracing::error!("Failed to send delta: {}", e);
268            }
269        }
270    }
271}
272
273/// Builder for WebSocket sync engine
274pub struct WebSocketSyncEngineBuilder {
275    config: WebSocketIntegrationConfig,
276    replica_id: Option<ReplicaId>,
277}
278
279impl WebSocketSyncEngineBuilder {
280    pub fn new() -> Self {
281        Self {
282            config: WebSocketIntegrationConfig::default(),
283            replica_id: None,
284        }
285    }
286
287    pub fn with_config(mut self, config: WebSocketIntegrationConfig) -> Self {
288        self.config = config;
289        self
290    }
291
292    pub fn with_replica_id(mut self, replica_id: ReplicaId) -> Self {
293        self.replica_id = Some(replica_id);
294        self
295    }
296
297    pub fn with_url(mut self, url: String) -> Self {
298        self.config.client_config.url = url;
299        self
300    }
301
302    pub fn build(self, storage: Storage) -> WebSocketSyncEngine {
303        let replica_id = self
304            .replica_id
305            .unwrap_or_else(|| ReplicaId::from(uuid::Uuid::new_v4()));
306        WebSocketSyncEngine::new(storage, self.config, replica_id)
307    }
308}
309
310impl Default for WebSocketSyncEngineBuilder {
311    fn default() -> Self {
312        Self::new()
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::crdt::ReplicaId;
320    use crate::storage::memory::MemoryStorage;
321
322    fn create_test_replica_id() -> ReplicaId {
323        ReplicaId::from(uuid::Uuid::new_v4())
324    }
325
326    #[tokio::test]
327    async fn test_websocket_sync_engine_creation() {
328        let storage = MemoryStorage::new();
329        let config = WebSocketIntegrationConfig::default();
330        let replica_id = create_test_replica_id();
331
332        let engine =
333            WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
334
335        assert_eq!(engine.websocket_client().replica_id(), replica_id);
336        assert!(!engine.is_running().await);
337    }
338
339    #[tokio::test]
340    async fn test_websocket_sync_engine_builder() {
341        let storage = MemoryStorage::new();
342        let replica_id = create_test_replica_id();
343
344        let engine = WebSocketSyncEngineBuilder::new()
345            .with_replica_id(replica_id)
346            .with_url("ws://test.example.com".to_string())
347            .build(crate::storage::Storage::Memory(storage));
348
349        assert_eq!(engine.websocket_client().replica_id(), replica_id);
350        assert_eq!(engine.config.client_config.url, "ws://test.example.com");
351    }
352
353    #[tokio::test]
354    async fn test_send_delta() {
355        let storage = MemoryStorage::new();
356        let config = WebSocketIntegrationConfig::default();
357        let replica_id = create_test_replica_id();
358
359        let engine =
360            WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
361
362        let delta_data = b"test delta".to_vec();
363        let result = engine
364            .send_delta(
365                "test_collection".to_string(),
366                CrdtType::LwwRegister,
367                delta_data,
368            )
369            .await;
370
371        // Should succeed even without connection in test environment
372        assert!(result.is_ok());
373    }
374
375    #[tokio::test]
376    async fn test_start_stop_cycle() {
377        let storage = MemoryStorage::new();
378        let config = WebSocketIntegrationConfig::default();
379        let replica_id = create_test_replica_id();
380
381        let engine =
382            WebSocketSyncEngine::new(crate::storage::Storage::Memory(storage), config, replica_id);
383
384        // Initially not running
385        assert!(!engine.is_running().await);
386
387        // Start engine
388        let result = engine.start().await;
389        assert!(result.is_ok());
390        assert!(engine.is_running().await);
391
392        // Stop engine
393        let result = engine.stop().await;
394        assert!(result.is_ok());
395        assert!(!engine.is_running().await);
396    }
397}