Skip to main content

reasonkit_web/portal/
sync.rs

1//! # Real-Time Synchronization Module
2//!
3//! WebSocket-based settings synchronization across devices.
4//!
5//! ## Features
6//!
7//! - Real-time push notifications
8//! - Conflict resolution with vector clocks
9//! - Offline queue with automatic retry
10//! - Multi-device session management
11
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use uuid::Uuid;
16
17/// Sync event types
18#[derive(Debug, Clone, Serialize, Deserialize)]
19#[serde(tag = "type", rename_all = "snake_case")]
20pub enum SyncEvent {
21    /// Settings changed on another device
22    SettingsChanged {
23        version: u64,
24        changes: Vec<SettingsChange>,
25        device_id: String,
26    },
27    /// Full settings sync requested
28    FullSyncRequested { reason: String },
29    /// Conflict detected during sync
30    ConflictDetected {
31        path: String,
32        local_value: serde_json::Value,
33        remote_value: serde_json::Value,
34        local_timestamp: DateTime<Utc>,
35        remote_timestamp: DateTime<Utc>,
36    },
37    /// Session invalidated (logout from another device)
38    SessionInvalidated { device_id: String, reason: String },
39    /// API key rotated
40    ApiKeyRotated { key_id: Uuid },
41}
42
43/// Settings change for sync
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct SettingsChange {
46    pub path: String,
47    pub operation: ChangeOperation,
48    pub value: Option<serde_json::Value>,
49    pub timestamp: DateTime<Utc>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[serde(rename_all = "lowercase")]
54pub enum ChangeOperation {
55    Set,
56    Delete,
57    Merge,
58}
59
60/// Sync configuration
61#[derive(Debug, Clone)]
62pub struct SyncConfig {
63    /// WebSocket ping interval (seconds)
64    pub ping_interval_secs: u64,
65    /// Connection timeout (seconds)
66    pub connection_timeout_secs: u64,
67    /// Maximum offline queue size
68    pub max_offline_queue: usize,
69    /// Retry delay (seconds)
70    pub retry_delay_secs: u64,
71    /// Maximum retries
72    pub max_retries: u32,
73}
74
75impl Default for SyncConfig {
76    fn default() -> Self {
77        Self {
78            ping_interval_secs: 30,
79            connection_timeout_secs: 60,
80            max_offline_queue: 100,
81            retry_delay_secs: 5,
82            max_retries: 3,
83        }
84    }
85}
86
87/// Device session for sync
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DeviceSession {
90    /// Device ID
91    pub device_id: String,
92    /// Device name/type
93    pub device_name: String,
94    /// Platform (cli, web, ide)
95    pub platform: String,
96    /// Last active timestamp
97    pub last_active: DateTime<Utc>,
98    /// Current settings version
99    pub settings_version: u64,
100    /// Connection status
101    pub connected: bool,
102}
103
104/// Conflict resolution strategy
105#[derive(Debug, Clone, Serialize, Deserialize)]
106#[serde(rename_all = "snake_case")]
107pub enum ConflictResolution {
108    /// Use local value
109    UseLocal,
110    /// Use remote value
111    UseRemote,
112    /// Use most recent value
113    UseNewest,
114    /// Merge values (for objects/arrays)
115    Merge,
116    /// Prompt user to resolve
117    Manual,
118}
119
120/// Sync service for managing real-time synchronization
121pub struct SyncService {
122    #[allow(dead_code)]
123    config: SyncConfig,
124    sessions: HashMap<String, DeviceSession>,
125}
126
127impl SyncService {
128    pub fn new(config: SyncConfig) -> Self {
129        Self {
130            config,
131            sessions: HashMap::new(),
132        }
133    }
134
135    /// Register a device session
136    pub fn register_session(&mut self, session: DeviceSession) -> Result<(), SyncError> {
137        self.sessions.insert(session.device_id.clone(), session);
138        Ok(())
139    }
140
141    /// Remove a device session
142    pub fn remove_session(&mut self, device_id: &str) -> Result<(), SyncError> {
143        self.sessions.remove(device_id);
144        Ok(())
145    }
146
147    /// Get all active sessions for a user
148    pub fn get_active_sessions(&self, _user_id: Uuid) -> Vec<&DeviceSession> {
149        // TODO: Filter by user_id when sessions store user association
150        self.sessions.values().collect()
151    }
152
153    /// Broadcast sync event to all devices except sender
154    pub async fn broadcast_event(
155        &self,
156        _user_id: Uuid,
157        _event: SyncEvent,
158        _exclude_device: Option<&str>,
159    ) -> Result<(), SyncError> {
160        // TODO: Implement WebSocket broadcast
161        Ok(())
162    }
163
164    /// Resolve conflict using specified strategy
165    pub fn resolve_conflict(
166        &self,
167        local: &serde_json::Value,
168        remote: &serde_json::Value,
169        local_timestamp: DateTime<Utc>,
170        remote_timestamp: DateTime<Utc>,
171        strategy: ConflictResolution,
172    ) -> serde_json::Value {
173        match strategy {
174            ConflictResolution::UseLocal => local.clone(),
175            ConflictResolution::UseRemote => remote.clone(),
176            ConflictResolution::UseNewest => {
177                if local_timestamp > remote_timestamp {
178                    local.clone()
179                } else {
180                    remote.clone()
181                }
182            }
183            ConflictResolution::Merge => {
184                // Deep merge for objects
185                if let (Some(local_obj), Some(remote_obj)) = (local.as_object(), remote.as_object())
186                {
187                    let mut merged = local_obj.clone();
188                    for (key, value) in remote_obj {
189                        merged.insert(key.clone(), value.clone());
190                    }
191                    serde_json::Value::Object(merged)
192                } else {
193                    // Fall back to newest for non-objects
194                    if local_timestamp > remote_timestamp {
195                        local.clone()
196                    } else {
197                        remote.clone()
198                    }
199                }
200            }
201            ConflictResolution::Manual => {
202                // Return local and let client handle
203                local.clone()
204            }
205        }
206    }
207}
208
209impl Default for SyncService {
210    fn default() -> Self {
211        Self::new(SyncConfig::default())
212    }
213}
214
215/// Sync errors
216#[derive(Debug, thiserror::Error)]
217pub enum SyncError {
218    #[error("Session not found")]
219    SessionNotFound,
220    #[error("Connection failed: {0}")]
221    ConnectionFailed(String),
222    #[error("Sync conflict: {0}")]
223    Conflict(String),
224    #[error("Offline queue full")]
225    QueueFull,
226    #[error("Broadcast failed: {0}")]
227    BroadcastError(String),
228}
229
230/// WebSocket message types
231#[derive(Debug, Clone, Serialize, Deserialize)]
232#[serde(tag = "type", rename_all = "snake_case")]
233pub enum WsMessage {
234    /// Client -> Server: Subscribe to sync events
235    Subscribe {
236        device_id: String,
237        token: String,
238    },
239    /// Server -> Client: Subscription confirmed
240    Subscribed {
241        session_id: String,
242    },
243    /// Client -> Server: Push local changes
244    PushChanges {
245        version: u64,
246        changes: Vec<SettingsChange>,
247    },
248    /// Server -> Client: Changes accepted
249    ChangesAccepted {
250        new_version: u64,
251    },
252    /// Server -> Client: Sync event
253    Event(SyncEvent),
254    /// Client <-> Server: Heartbeat
255    Ping,
256    Pong,
257    /// Error response
258    Error {
259        code: String,
260        message: String,
261    },
262}