reasonkit-web 0.1.7

High-performance MCP server for browser automation, web capture, and content extraction. Rust-powered CDP client for AI agents.
Documentation
//! # Real-Time Synchronization Module
//!
//! WebSocket-based settings synchronization across devices.
//!
//! ## Features
//!
//! - Real-time push notifications
//! - Conflict resolution with vector clocks
//! - Offline queue with automatic retry
//! - Multi-device session management

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;

/// Sync event types
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SyncEvent {
    /// Settings changed on another device
    SettingsChanged {
        version: u64,
        changes: Vec<SettingsChange>,
        device_id: String,
    },
    /// Full settings sync requested
    FullSyncRequested { reason: String },
    /// Conflict detected during sync
    ConflictDetected {
        path: String,
        local_value: serde_json::Value,
        remote_value: serde_json::Value,
        local_timestamp: DateTime<Utc>,
        remote_timestamp: DateTime<Utc>,
    },
    /// Session invalidated (logout from another device)
    SessionInvalidated { device_id: String, reason: String },
    /// API key rotated
    ApiKeyRotated { key_id: Uuid },
}

/// Settings change for sync
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SettingsChange {
    pub path: String,
    pub operation: ChangeOperation,
    pub value: Option<serde_json::Value>,
    pub timestamp: DateTime<Utc>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ChangeOperation {
    Set,
    Delete,
    Merge,
}

/// Sync configuration
#[derive(Debug, Clone)]
pub struct SyncConfig {
    /// WebSocket ping interval (seconds)
    pub ping_interval_secs: u64,
    /// Connection timeout (seconds)
    pub connection_timeout_secs: u64,
    /// Maximum offline queue size
    pub max_offline_queue: usize,
    /// Retry delay (seconds)
    pub retry_delay_secs: u64,
    /// Maximum retries
    pub max_retries: u32,
}

impl Default for SyncConfig {
    fn default() -> Self {
        Self {
            ping_interval_secs: 30,
            connection_timeout_secs: 60,
            max_offline_queue: 100,
            retry_delay_secs: 5,
            max_retries: 3,
        }
    }
}

/// Device session for sync
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceSession {
    /// Device ID
    pub device_id: String,
    /// Device name/type
    pub device_name: String,
    /// Platform (cli, web, ide)
    pub platform: String,
    /// Last active timestamp
    pub last_active: DateTime<Utc>,
    /// Current settings version
    pub settings_version: u64,
    /// Connection status
    pub connected: bool,
}

/// Conflict resolution strategy
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConflictResolution {
    /// Use local value
    UseLocal,
    /// Use remote value
    UseRemote,
    /// Use most recent value
    UseNewest,
    /// Merge values (for objects/arrays)
    Merge,
    /// Prompt user to resolve
    Manual,
}

/// Sync service for managing real-time synchronization
pub struct SyncService {
    #[allow(dead_code)]
    config: SyncConfig,
    sessions: HashMap<String, DeviceSession>,
}

impl SyncService {
    pub fn new(config: SyncConfig) -> Self {
        Self {
            config,
            sessions: HashMap::new(),
        }
    }

    /// Register a device session
    pub fn register_session(&mut self, session: DeviceSession) -> Result<(), SyncError> {
        self.sessions.insert(session.device_id.clone(), session);
        Ok(())
    }

    /// Remove a device session
    pub fn remove_session(&mut self, device_id: &str) -> Result<(), SyncError> {
        self.sessions.remove(device_id);
        Ok(())
    }

    /// Get all active sessions for a user
    pub fn get_active_sessions(&self, _user_id: Uuid) -> Vec<&DeviceSession> {
        // TODO: Filter by user_id when sessions store user association
        self.sessions.values().collect()
    }

    /// Broadcast sync event to all devices except sender
    pub async fn broadcast_event(
        &self,
        _user_id: Uuid,
        _event: SyncEvent,
        _exclude_device: Option<&str>,
    ) -> Result<(), SyncError> {
        // TODO: Implement WebSocket broadcast
        Ok(())
    }

    /// Resolve conflict using specified strategy
    pub fn resolve_conflict(
        &self,
        local: &serde_json::Value,
        remote: &serde_json::Value,
        local_timestamp: DateTime<Utc>,
        remote_timestamp: DateTime<Utc>,
        strategy: ConflictResolution,
    ) -> serde_json::Value {
        match strategy {
            ConflictResolution::UseLocal => local.clone(),
            ConflictResolution::UseRemote => remote.clone(),
            ConflictResolution::UseNewest => {
                if local_timestamp > remote_timestamp {
                    local.clone()
                } else {
                    remote.clone()
                }
            }
            ConflictResolution::Merge => {
                // Deep merge for objects
                if let (Some(local_obj), Some(remote_obj)) = (local.as_object(), remote.as_object())
                {
                    let mut merged = local_obj.clone();
                    for (key, value) in remote_obj {
                        merged.insert(key.clone(), value.clone());
                    }
                    serde_json::Value::Object(merged)
                } else {
                    // Fall back to newest for non-objects
                    if local_timestamp > remote_timestamp {
                        local.clone()
                    } else {
                        remote.clone()
                    }
                }
            }
            ConflictResolution::Manual => {
                // Return local and let client handle
                local.clone()
            }
        }
    }
}

impl Default for SyncService {
    fn default() -> Self {
        Self::new(SyncConfig::default())
    }
}

/// Sync errors
#[derive(Debug, thiserror::Error)]
pub enum SyncError {
    #[error("Session not found")]
    SessionNotFound,
    #[error("Connection failed: {0}")]
    ConnectionFailed(String),
    #[error("Sync conflict: {0}")]
    Conflict(String),
    #[error("Offline queue full")]
    QueueFull,
    #[error("Broadcast failed: {0}")]
    BroadcastError(String),
}

/// WebSocket message types
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WsMessage {
    /// Client -> Server: Subscribe to sync events
    Subscribe {
        device_id: String,
        token: String,
    },
    /// Server -> Client: Subscription confirmed
    Subscribed {
        session_id: String,
    },
    /// Client -> Server: Push local changes
    PushChanges {
        version: u64,
        changes: Vec<SettingsChange>,
    },
    /// Server -> Client: Changes accepted
    ChangesAccepted {
        new_version: u64,
    },
    /// Server -> Client: Sync event
    Event(SyncEvent),
    /// Client <-> Server: Heartbeat
    Ping,
    Pong,
    /// Error response
    Error {
        code: String,
        message: String,
    },
}