Skip to main content

chasm/sync/
bidirectional.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: AGPL-3.0-only
3//! Bidirectional Sync Module
4//!
5//! Provides two-way synchronization between CSM and provider-native storage.
6//! Handles conflict resolution, change tracking, and state reconciliation.
7
8use crate::models::ChatSession;
9use crate::providers::ProviderType;
10use anyhow::{anyhow, Result};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14use std::collections::HashMap;
15use std::path::PathBuf;
16
17// =============================================================================
18// Sync State and Tracking
19// =============================================================================
20
21/// Sync state for a single session
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct SessionSyncState {
24    pub session_id: String,
25    pub provider: ProviderType,
26    pub last_sync: DateTime<Utc>,
27    pub local_hash: String,
28    pub remote_hash: String,
29    pub status: SyncStatus,
30    pub pending_changes: Vec<SyncChange>,
31}
32
33/// Sync status for a session
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum SyncStatus {
37    Synced,
38    LocalAhead,
39    RemoteAhead,
40    Conflict,
41    Unsynced,
42    Error,
43}
44
45/// Represents a single change to sync
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SyncChange {
48    pub id: String,
49    pub change_type: ChangeType,
50    pub entity_type: EntityType,
51    pub entity_id: String,
52    pub timestamp: DateTime<Utc>,
53    pub payload: serde_json::Value,
54    pub origin: ChangeOrigin,
55}
56
57/// Type of change
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59#[serde(rename_all = "snake_case")]
60pub enum ChangeType {
61    Create,
62    Update,
63    Delete,
64    Merge,
65}
66
67/// Entity type for changes
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum EntityType {
71    Session,
72    Message,
73    Metadata,
74    Tag,
75}
76
77/// Origin of a change
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum ChangeOrigin {
81    Local,
82    Remote,
83}
84
85// =============================================================================
86// Conflict Resolution
87// =============================================================================
88
89/// Conflict resolution strategy
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
91#[serde(rename_all = "snake_case")]
92pub enum ConflictStrategy {
93    LocalWins,
94    RemoteWins,
95    KeepBoth,
96    AutoMerge,
97    Manual,
98    MostRecent,
99}
100
101/// A sync conflict that needs resolution
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct SyncConflict {
104    pub id: String,
105    pub session_id: String,
106    pub local_version: ConflictVersion,
107    pub remote_version: ConflictVersion,
108    pub conflict_type: ConflictType,
109    pub suggested_strategy: ConflictStrategy,
110    pub created_at: DateTime<Utc>,
111    pub resolved: bool,
112}
113
114/// Version info for conflict
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ConflictVersion {
117    pub hash: String,
118    pub timestamp: DateTime<Utc>,
119    pub message_count: usize,
120}
121
122/// Type of conflict
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
124#[serde(rename_all = "snake_case")]
125pub enum ConflictType {
126    MessageEdit,
127    SessionMetadata,
128    Deletion,
129    ConcurrentAdd,
130}
131
132/// Result of a sync operation
133#[derive(Debug, Clone)]
134pub enum SyncResult {
135    NoChanges,
136    Pushed,
137    Pulled,
138    Merged,
139    ConflictDetected(SyncConflict),
140}
141
142// =============================================================================
143// Hash Computation
144// =============================================================================
145
146/// Compute a content hash for a session
147pub fn compute_session_hash(session: &ChatSession) -> String {
148    let mut hasher = Sha256::new();
149
150    // Hash session metadata
151    let session_id = session.session_id.clone().unwrap_or_default();
152    hasher.update(session_id.as_bytes());
153
154    if let Some(title) = &session.custom_title {
155        hasher.update(title.as_bytes());
156    }
157
158    hasher.update(session.last_message_date.to_le_bytes());
159
160    // Hash requests content
161    for request in &session.requests {
162        if let Some(msg) = &request.message {
163            if let Some(text) = &msg.text {
164                hasher.update(text.as_bytes());
165            }
166        }
167        if let Some(resp) = &request.response {
168            if let Some(result) = resp.get("result").and_then(|v| v.as_str()) {
169                hasher.update(result.as_bytes());
170            }
171        }
172    }
173
174    format!("{:x}", hasher.finalize())
175}
176
177// =============================================================================
178// Bidirectional Sync Engine
179// =============================================================================
180
181/// Configuration for bidirectional sync
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct BidirectionalSyncConfig {
184    pub conflict_strategy: ConflictStrategy,
185    pub auto_sync_interval_secs: u64,
186    pub max_retries: u32,
187    pub retry_delay_ms: u64,
188    pub batch_size: usize,
189}
190
191impl Default for BidirectionalSyncConfig {
192    fn default() -> Self {
193        Self {
194            conflict_strategy: ConflictStrategy::MostRecent,
195            auto_sync_interval_secs: 300,
196            max_retries: 3,
197            retry_delay_ms: 1000,
198            batch_size: 50,
199        }
200    }
201}
202
203/// Bidirectional sync engine
204pub struct BidirectionalSyncEngine {
205    config: BidirectionalSyncConfig,
206    state: HashMap<String, SessionSyncState>,
207    conflicts: Vec<SyncConflict>,
208}
209
210impl BidirectionalSyncEngine {
211    pub fn new(config: BidirectionalSyncConfig) -> Self {
212        Self {
213            config,
214            state: HashMap::new(),
215            conflicts: Vec::new(),
216        }
217    }
218
219    /// Check sync status for a session
220    pub fn check_status(
221        &self,
222        session_id: &str,
223        local_session: &ChatSession,
224        remote_session: Option<&ChatSession>,
225    ) -> SyncStatus {
226        let local_hash = compute_session_hash(local_session);
227        let remote_hash = remote_session.map(compute_session_hash).unwrap_or_default();
228
229        match self.state.get(session_id) {
230            None => SyncStatus::Unsynced,
231            Some(prev_state) => {
232                let local_changed = local_hash != prev_state.local_hash;
233                let remote_changed = remote_hash != prev_state.remote_hash;
234
235                match (local_changed, remote_changed) {
236                    (false, false) => SyncStatus::Synced,
237                    (true, false) => SyncStatus::LocalAhead,
238                    (false, true) => SyncStatus::RemoteAhead,
239                    (true, true) => SyncStatus::Conflict,
240                }
241            }
242        }
243    }
244
245    /// Sync a session bidirectionally
246    pub fn sync_session(
247        &mut self,
248        local_session: &mut ChatSession,
249        remote_session: Option<&ChatSession>,
250        push_fn: impl FnOnce(&ChatSession) -> Result<()>,
251        pull_fn: impl FnOnce() -> Result<Option<ChatSession>>,
252    ) -> Result<SyncResult> {
253        let session_id = local_session.session_id.clone().unwrap_or_default();
254        let status = self.check_status(&session_id, local_session, remote_session);
255
256        match status {
257            SyncStatus::Synced => Ok(SyncResult::NoChanges),
258
259            SyncStatus::LocalAhead => {
260                push_fn(local_session)?;
261                self.update_state(&session_id, local_session, local_session);
262                Ok(SyncResult::Pushed)
263            }
264
265            SyncStatus::RemoteAhead => {
266                if let Some(remote) = pull_fn()? {
267                    *local_session = remote.clone();
268                    self.update_state(&session_id, local_session, &remote);
269                    Ok(SyncResult::Pulled)
270                } else {
271                    Ok(SyncResult::NoChanges)
272                }
273            }
274
275            SyncStatus::Conflict => {
276                let remote = remote_session
277                    .ok_or_else(|| anyhow!("Remote required for conflict resolution"))?;
278                self.resolve_conflict(local_session, remote)
279            }
280
281            SyncStatus::Unsynced => {
282                push_fn(local_session)?;
283                self.update_state(&session_id, local_session, local_session);
284                Ok(SyncResult::Pushed)
285            }
286
287            SyncStatus::Error => Err(anyhow!("Sync in error state")),
288        }
289    }
290
291    /// Update sync state after successful sync
292    fn update_state(&mut self, session_id: &str, local: &ChatSession, remote: &ChatSession) {
293        let state = SessionSyncState {
294            session_id: session_id.to_string(),
295            provider: ProviderType::Custom,
296            last_sync: Utc::now(),
297            local_hash: compute_session_hash(local),
298            remote_hash: compute_session_hash(remote),
299            status: SyncStatus::Synced,
300            pending_changes: Vec::new(),
301        };
302        self.state.insert(session_id.to_string(), state);
303    }
304
305    /// Resolve a conflict between local and remote versions
306    fn resolve_conflict(
307        &mut self,
308        local_session: &mut ChatSession,
309        remote_session: &ChatSession,
310    ) -> Result<SyncResult> {
311        let session_id = local_session.session_id.clone().unwrap_or_default();
312
313        let conflict = SyncConflict {
314            id: uuid::Uuid::new_v4().to_string(),
315            session_id: session_id.clone(),
316            local_version: ConflictVersion {
317                hash: compute_session_hash(local_session),
318                timestamp: DateTime::from_timestamp_millis(local_session.last_message_date)
319                    .unwrap_or_else(Utc::now),
320                message_count: local_session.requests.len(),
321            },
322            remote_version: ConflictVersion {
323                hash: compute_session_hash(remote_session),
324                timestamp: DateTime::from_timestamp_millis(remote_session.last_message_date)
325                    .unwrap_or_else(Utc::now),
326                message_count: remote_session.requests.len(),
327            },
328            conflict_type: ConflictType::ConcurrentAdd,
329            suggested_strategy: self.config.conflict_strategy,
330            created_at: Utc::now(),
331            resolved: false,
332        };
333
334        match self.config.conflict_strategy {
335            ConflictStrategy::LocalWins => {
336                self.update_state(&session_id, local_session, local_session);
337                Ok(SyncResult::Pushed)
338            }
339            ConflictStrategy::RemoteWins => {
340                *local_session = remote_session.clone();
341                self.update_state(&session_id, local_session, remote_session);
342                Ok(SyncResult::Pulled)
343            }
344            ConflictStrategy::MostRecent => {
345                if local_session.last_message_date >= remote_session.last_message_date {
346                    self.update_state(&session_id, local_session, local_session);
347                    Ok(SyncResult::Pushed)
348                } else {
349                    *local_session = remote_session.clone();
350                    self.update_state(&session_id, local_session, remote_session);
351                    Ok(SyncResult::Pulled)
352                }
353            }
354            ConflictStrategy::Manual => {
355                self.conflicts.push(conflict.clone());
356                Ok(SyncResult::ConflictDetected(conflict))
357            }
358            _ => {
359                self.conflicts.push(conflict.clone());
360                Ok(SyncResult::ConflictDetected(conflict))
361            }
362        }
363    }
364
365    /// Get all unresolved conflicts
366    pub fn get_conflicts(&self) -> &[SyncConflict] {
367        &self.conflicts
368    }
369
370    /// Resolve a conflict manually
371    pub fn resolve_conflict_manually(
372        &mut self,
373        conflict_id: &str,
374        _resolution: ConflictStrategy,
375        resolved_session: ChatSession,
376    ) -> Result<()> {
377        if let Some(conflict) = self.conflicts.iter_mut().find(|c| c.id == conflict_id) {
378            conflict.resolved = true;
379            let session_id = resolved_session.session_id.clone().unwrap_or_default();
380            self.update_state(&session_id, &resolved_session, &resolved_session);
381            Ok(())
382        } else {
383            Err(anyhow!("Conflict not found: {}", conflict_id))
384        }
385    }
386
387    /// Get sync state for a session
388    pub fn get_state(&self, session_id: &str) -> Option<&SessionSyncState> {
389        self.state.get(session_id)
390    }
391}
392
393// =============================================================================
394// Provider-specific Sync Adapters
395// =============================================================================
396
397/// Trait for provider-specific sync operations
398pub trait ProviderSyncAdapter: Send + Sync {
399    fn provider_type(&self) -> ProviderType;
400    fn push_session(&self, session: &ChatSession) -> Result<()>;
401    fn pull_session(&self, session_id: &str) -> Result<Option<ChatSession>>;
402    fn list_remote_sessions(&self) -> Result<Vec<String>>;
403    fn delete_remote_session(&self, session_id: &str) -> Result<()>;
404}
405
406/// VSCode Copilot Chat sync adapter
407pub struct VSCodeSyncAdapter {
408    workspace_path: PathBuf,
409}
410
411impl VSCodeSyncAdapter {
412    pub fn new(workspace_path: PathBuf) -> Self {
413        Self { workspace_path }
414    }
415
416    fn sessions_dir(&self) -> PathBuf {
417        self.workspace_path.join("chatSessions")
418    }
419}
420
421impl ProviderSyncAdapter for VSCodeSyncAdapter {
422    fn provider_type(&self) -> ProviderType {
423        ProviderType::Copilot
424    }
425
426    fn push_session(&self, session: &ChatSession) -> Result<()> {
427        let session_id = session.session_id.clone().unwrap_or_default();
428        let path = self.sessions_dir().join(format!("{}.json", session_id));
429        std::fs::create_dir_all(self.sessions_dir())?;
430        let json = serde_json::to_string_pretty(session)?;
431        std::fs::write(path, json)?;
432        Ok(())
433    }
434
435    fn pull_session(&self, session_id: &str) -> Result<Option<ChatSession>> {
436        let path = self.sessions_dir().join(format!("{}.json", session_id));
437        if !path.exists() {
438            return Ok(None);
439        }
440        let content = std::fs::read_to_string(path)?;
441        let session: ChatSession = serde_json::from_str(&content)?;
442        Ok(Some(session))
443    }
444
445    fn list_remote_sessions(&self) -> Result<Vec<String>> {
446        let dir = self.sessions_dir();
447        if !dir.exists() {
448            return Ok(Vec::new());
449        }
450        let mut sessions = Vec::new();
451        for entry in std::fs::read_dir(dir)? {
452            let entry = entry?;
453            if let Some(name) = entry.file_name().to_str() {
454                if name.ends_with(".json") {
455                    sessions.push(name.trim_end_matches(".json").to_string());
456                }
457            }
458        }
459        Ok(sessions)
460    }
461
462    fn delete_remote_session(&self, session_id: &str) -> Result<()> {
463        let path = self.sessions_dir().join(format!("{}.json", session_id));
464        if path.exists() {
465            std::fs::remove_file(path)?;
466        }
467        Ok(())
468    }
469}
470
471// =============================================================================
472// Tests
473// =============================================================================
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478
479    #[test]
480    fn test_sync_status() {
481        assert_eq!(SyncStatus::Synced, SyncStatus::Synced);
482    }
483
484    #[test]
485    fn test_config_default() {
486        let config = BidirectionalSyncConfig::default();
487        assert_eq!(config.auto_sync_interval_secs, 300);
488    }
489
490    #[test]
491    fn test_engine_creation() {
492        let engine = BidirectionalSyncEngine::new(BidirectionalSyncConfig::default());
493        assert!(engine.conflicts.is_empty());
494    }
495}