leptos_sync_core/transport/
compatibility_layer.rs

1//! Compatibility layer for migrating from existing WebSocket implementation to leptos-ws-pro
2//! 
3//! This module provides a bridge between the existing message protocol and leptos-ws-pro,
4//! allowing for gradual migration without breaking existing functionality.
5
6use super::{SyncTransport, TransportError};
7use crate::transport::leptos_ws_pro_transport::{LeptosWsProTransport, LeptosWsProConfig, MessageProtocolAdapter};
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum CompatibilityError {
13    #[error("Transport error: {0}")]
14    Transport(#[from] TransportError),
15    #[error("Serialization error: {0}")]
16    Serialization(String),
17    #[error("Protocol error: {0}")]
18    Protocol(String),
19}
20
21/// Message types used in the existing leptos-sync protocol
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "type")]
24pub enum SyncMessage {
25    #[serde(rename = "sync")]
26    Sync {
27        peer_id: String,
28        data: serde_json::Value,
29        timestamp: String,
30    },
31    #[serde(rename = "presence")]
32    Presence {
33        peer_id: String,
34        action: String,
35        timestamp: String,
36    },
37    #[serde(rename = "heartbeat")]
38    Heartbeat {
39        timestamp: String,
40    },
41    #[serde(rename = "welcome")]
42    Welcome {
43        peer_id: String,
44        timestamp: String,
45        server_info: ServerInfo,
46    },
47    #[serde(rename = "binary_ack")]
48    BinaryAck {
49        peer_id: String,
50        size: usize,
51        timestamp: String,
52    },
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ServerInfo {
57    pub version: String,
58    pub max_connections: usize,
59    pub heartbeat_interval: u64,
60}
61
62/// Compatibility wrapper that implements the existing SyncTransport trait
63/// while using leptos-ws-pro under the hood
64pub struct CompatibilityTransport {
65    leptos_ws_pro: LeptosWsProTransport,
66    adapter: MessageProtocolAdapter,
67    message_buffer: std::collections::VecDeque<Vec<u8>>,
68}
69
70impl CompatibilityTransport {
71    /// Create a new compatibility transport
72    pub fn new(config: LeptosWsProConfig) -> Self {
73        let leptos_ws_pro = LeptosWsProTransport::new(config);
74        let adapter = MessageProtocolAdapter::new(leptos_ws_pro.clone());
75        
76        Self {
77            leptos_ws_pro,
78            adapter,
79            message_buffer: std::collections::VecDeque::new(),
80        }
81    }
82
83    /// Create with a simple URL
84    pub fn with_url(url: String) -> Self {
85        let config = LeptosWsProConfig {
86            url,
87            ..Default::default()
88        };
89        Self::new(config)
90    }
91
92    /// Connect using leptos-ws-pro
93    pub async fn connect(&self) -> Result<(), CompatibilityError> {
94        self.leptos_ws_pro.connect().await
95            .map_err(|e| CompatibilityError::Transport(e.into()))
96    }
97
98    /// Disconnect using leptos-ws-pro
99    pub async fn disconnect(&self) -> Result<(), CompatibilityError> {
100        self.leptos_ws_pro.disconnect().await
101            .map_err(|e| CompatibilityError::Transport(e.into()))
102    }
103
104    /// Send a sync message using the existing protocol
105    pub async fn send_sync(&self, peer_id: &str, data: serde_json::Value) -> Result<(), CompatibilityError> {
106        self.adapter.send_sync_message(peer_id, data).await
107            .map_err(|e| CompatibilityError::Transport(e.into()))
108    }
109
110    /// Send a presence message using the existing protocol
111    pub async fn send_presence(&self, peer_id: &str, action: &str) -> Result<(), CompatibilityError> {
112        self.adapter.send_presence_message(peer_id, action).await
113            .map_err(|e| CompatibilityError::Transport(e.into()))
114    }
115
116    /// Send a heartbeat using the existing protocol
117    pub async fn send_heartbeat(&self) -> Result<(), CompatibilityError> {
118        self.adapter.send_heartbeat().await
119            .map_err(|e| CompatibilityError::Transport(e.into()))
120    }
121
122    /// Receive and parse messages using the existing protocol
123    pub async fn receive_messages(&self) -> Result<Vec<SyncMessage>, CompatibilityError> {
124        let raw_messages = self.adapter.receive_messages().await
125            .map_err(|e| CompatibilityError::Transport(e.into()))?;
126
127        let mut parsed_messages = Vec::new();
128        for raw_message in raw_messages {
129            match serde_json::from_value::<SyncMessage>(raw_message) {
130                Ok(parsed) => parsed_messages.push(parsed),
131                Err(e) => {
132                    tracing::warn!("Failed to parse sync message: {}", e);
133                    continue;
134                }
135            }
136        }
137
138        Ok(parsed_messages)
139    }
140
141    /// Get the underlying leptos-ws-pro transport for advanced usage
142    pub fn leptos_ws_pro_transport(&self) -> &LeptosWsProTransport {
143        &self.leptos_ws_pro
144    }
145
146    /// Get the message protocol adapter
147    pub fn message_adapter(&self) -> &MessageProtocolAdapter {
148        &self.adapter
149    }
150}
151
152impl SyncTransport for CompatibilityTransport {
153    type Error = TransportError;
154
155    fn send<'a>(&'a self, data: &'a [u8]) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), Self::Error>> + Send + 'a>> {
156        Box::pin(async move {
157        // Try to parse as a sync message first
158        if let Ok(message) = serde_json::from_slice::<serde_json::Value>(data) {
159            if let Some(msg_type) = message.get("type").and_then(|t| t.as_str()) {
160                match msg_type {
161                    "sync" => {
162                        if let (Some(peer_id), Some(data)) = (
163                            message.get("peer_id").and_then(|p| p.as_str()),
164                            message.get("data")
165                        ) {
166                            return self.send_sync(peer_id, data.clone()).await
167                                .map_err(|e| match e {
168                                    CompatibilityError::Transport(t) => t,
169                                    _ => TransportError::SendFailed(e.to_string()),
170                                });
171                        }
172                    }
173                    "presence" => {
174                        if let (Some(peer_id), Some(action)) = (
175                            message.get("peer_id").and_then(|p| p.as_str()),
176                            message.get("action").and_then(|a| a.as_str())
177                        ) {
178                            return self.send_presence(peer_id, action).await
179                                .map_err(|e| match e {
180                                    CompatibilityError::Transport(t) => t,
181                                    _ => TransportError::SendFailed(e.to_string()),
182                                });
183                        }
184                    }
185                    "heartbeat" => {
186                        return self.send_heartbeat().await
187                            .map_err(|e| match e {
188                                CompatibilityError::Transport(t) => t,
189                                _ => TransportError::SendFailed(e.to_string()),
190                            });
191                    }
192                    _ => {
193                        // Unknown message type, send as raw data
194                    }
195                }
196            }
197        }
198
199        // Fall back to raw data sending
200        self.leptos_ws_pro.send(data).await
201        })
202    }
203
204    fn receive(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<u8>>, Self::Error>> + Send + '_>> {
205        Box::pin(async move {
206        // Get parsed messages and convert back to raw bytes
207        let messages = self.receive_messages().await
208            .map_err(|e| match e {
209                CompatibilityError::Transport(t) => t,
210                _ => TransportError::ReceiveFailed(e.to_string()),
211            })?;
212
213        let mut raw_messages = Vec::new();
214        for message in messages {
215            match serde_json::to_vec(&message) {
216                Ok(raw) => raw_messages.push(raw),
217                Err(e) => {
218                    tracing::warn!("Failed to serialize message: {}", e);
219                    continue;
220                }
221            }
222        }
223
224        Ok(raw_messages)
225        })
226    }
227
228    fn is_connected(&self) -> bool {
229        self.leptos_ws_pro.is_connected()
230    }
231}
232
233impl Clone for CompatibilityTransport {
234    fn clone(&self) -> Self {
235        Self {
236            leptos_ws_pro: self.leptos_ws_pro.clone(),
237            adapter: MessageProtocolAdapter::new(self.leptos_ws_pro.clone()),
238            message_buffer: std::collections::VecDeque::new(),
239        }
240    }
241}
242
243/// Migration helper for existing WebSocket transport
244/// 
245/// Note: This is a simplified version that doesn't use trait objects
246/// due to the SyncTransport trait not being dyn compatible.
247pub struct MigrationHelper {
248    new_transport: CompatibilityTransport,
249    migration_complete: bool,
250}
251
252impl MigrationHelper {
253    /// Create a new migration helper
254    pub fn new(config: LeptosWsProConfig) -> Self {
255        Self {
256            new_transport: CompatibilityTransport::new(config),
257            migration_complete: false,
258        }
259    }
260
261    /// Migrate to the new transport
262    pub async fn migrate(&mut self) -> Result<(), CompatibilityError> {
263        // Connect to new transport
264        self.new_transport.connect().await?;
265        
266        // Mark migration as complete
267        self.migration_complete = true;
268        
269        Ok(())
270    }
271
272    /// Check if migration is complete
273    pub fn is_migration_complete(&self) -> bool {
274        self.migration_complete
275    }
276
277    /// Get the new transport
278    pub fn new_transport(&self) -> &CompatibilityTransport {
279        &self.new_transport
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::transport::InMemoryTransport;
287
288    #[tokio::test]
289    async fn test_compatibility_transport_creation() {
290        let config = LeptosWsProConfig::default();
291        let transport = CompatibilityTransport::new(config);
292        
293        assert!(!transport.is_connected());
294    }
295
296    #[tokio::test]
297    async fn test_sync_message_parsing() {
298        let config = LeptosWsProConfig::default();
299        let transport = CompatibilityTransport::new(config);
300        
301        // Test sync message
302        let sync_data = serde_json::json!({
303            "changes": ["change1", "change2"],
304            "client_id": "test_client"
305        });
306        
307        let result = transport.send_sync("test_peer", sync_data).await;
308        assert!(result.is_err()); // Expected when not connected
309    }
310
311    #[tokio::test]
312    async fn test_presence_message_parsing() {
313        let config = LeptosWsProConfig::default();
314        let transport = CompatibilityTransport::new(config);
315        
316        let result = transport.send_presence("test_peer", "connected").await;
317        assert!(result.is_err()); // Expected when not connected
318    }
319
320    #[tokio::test]
321    async fn test_heartbeat_message_parsing() {
322        let config = LeptosWsProConfig::default();
323        let transport = CompatibilityTransport::new(config);
324        
325        let result = transport.send_heartbeat().await;
326        assert!(result.is_err()); // Expected when not connected
327    }
328
329    #[tokio::test]
330    async fn test_migration_helper() {
331        let config = LeptosWsProConfig {
332            url: "ws://invalid-url-that-does-not-exist:9999".to_string(),
333            ..Default::default()
334        };
335        let mut helper = MigrationHelper::new(config);
336        
337        assert!(!helper.is_migration_complete());
338        
339        // Migration should fail without connection, but helper should be set up
340        let result = helper.migrate().await;
341        assert!(result.is_err()); // Expected when not connected
342    }
343
344    #[tokio::test]
345    async fn test_sync_transport_trait_compliance() {
346        let config = LeptosWsProConfig::default();
347        let transport = CompatibilityTransport::new(config);
348        
349        // Should implement SyncTransport trait
350        assert!(!transport.is_connected());
351        
352        // Should handle trait methods without panicking
353        let data = b"trait compliance test";
354        let send_result = transport.send(data).await;
355        assert!(send_result.is_err()); // Expected when not connected
356        
357        let receive_result = transport.receive().await;
358        assert!(receive_result.is_ok());
359    }
360}