Skip to main content

chasm/api/
sync.rs

1// Copyright (c) 2024-2026 Nervosys LLC
2// SPDX-License-Identifier: AGPL-3.0-only
3//! Sync Handler for real-time data synchronization
4//!
5//! This module provides HTTP/SSE-based synchronization between
6//! csm-rust (backend), csm-web, and csm-app clients.
7//!
8//! Uses Server-Sent Events (SSE) for real-time push updates instead of
9//! WebSockets for better compatibility with various deployment scenarios.
10
11use actix_web::{web, HttpResponse};
12use serde::{Deserialize, Serialize};
13use std::sync::{Arc, RwLock};
14use tokio::sync::broadcast;
15
16// =============================================================================
17// Sync Message Types
18// =============================================================================
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum SyncEntityType {
23    Workspace,
24    Session,
25    Message,
26    Agent,
27    Swarm,
28    Provider,
29    Settings,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33#[serde(rename_all = "snake_case")]
34pub enum SyncOperation {
35    Create,
36    Update,
37    Delete,
38    Sync,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SyncEvent {
43    pub id: String,
44    #[serde(rename = "type")]
45    pub entity_type: SyncEntityType,
46    pub operation: SyncOperation,
47    pub entity_id: String,
48    pub data: Option<serde_json::Value>,
49    pub timestamp: i64,
50    pub client_id: String,
51    pub version: u64,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SyncSnapshot {
56    pub workspaces: Vec<serde_json::Value>,
57    pub sessions: Vec<serde_json::Value>,
58    pub agents: Vec<serde_json::Value>,
59    pub swarms: Vec<serde_json::Value>,
60    pub providers: Vec<serde_json::Value>,
61    pub timestamp: i64,
62    pub version: u64,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct SyncDelta {
67    pub created: Vec<SyncEvent>,
68    pub updated: Vec<SyncEvent>,
69    pub deleted: Vec<SyncEvent>,
70    pub timestamp: i64,
71    pub from_version: u64,
72    pub to_version: u64,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(tag = "type", rename_all = "snake_case")]
77pub enum ServerMessage {
78    Welcome { version: u64 },
79    SyncEvent { event: SyncEvent },
80    Ack { version: u64 },
81}
82
83// =============================================================================
84// Sync State
85// =============================================================================
86
87/// Global sync state shared across all connections
88pub struct SyncState {
89    /// Current version counter
90    pub version: u64,
91    /// Event history for delta sync
92    pub events: Vec<SyncEvent>,
93    /// Maximum events to keep in history
94    pub max_history: usize,
95    /// Broadcast channel for events
96    pub broadcast_tx: broadcast::Sender<ServerMessage>,
97}
98
99impl SyncState {
100    pub fn new() -> Self {
101        let (broadcast_tx, _) = broadcast::channel(1000);
102        Self {
103            version: 0,
104            events: Vec::new(),
105            max_history: 10000,
106            broadcast_tx,
107        }
108    }
109
110    /// Increment version and return new version
111    pub fn next_version(&mut self) -> u64 {
112        self.version += 1;
113        self.version
114    }
115
116    /// Add event to history and broadcast
117    pub fn add_event(&mut self, mut event: SyncEvent) -> u64 {
118        let version = self.next_version();
119        event.version = version;
120        event.timestamp = chrono::Utc::now().timestamp_millis();
121
122        self.events.push(event.clone());
123
124        // Trim history if needed
125        if self.events.len() > self.max_history {
126            let trim_count = self.events.len() - self.max_history;
127            self.events.drain(0..trim_count);
128        }
129
130        // Broadcast to all clients
131        let _ = self.broadcast_tx.send(ServerMessage::SyncEvent { event });
132
133        version
134    }
135
136    /// Get events since a version
137    pub fn get_delta(&self, from_version: u64) -> SyncDelta {
138        let events: Vec<_> = self
139            .events
140            .iter()
141            .filter(|e| e.version > from_version)
142            .cloned()
143            .collect();
144
145        let mut created = Vec::new();
146        let mut updated = Vec::new();
147        let mut deleted = Vec::new();
148
149        for event in events {
150            match event.operation {
151                SyncOperation::Create => created.push(event),
152                SyncOperation::Update | SyncOperation::Sync => updated.push(event),
153                SyncOperation::Delete => deleted.push(event),
154            }
155        }
156
157        SyncDelta {
158            created,
159            updated,
160            deleted,
161            timestamp: chrono::Utc::now().timestamp_millis(),
162            from_version,
163            to_version: self.version,
164        }
165    }
166
167    /// Get broadcast receiver
168    pub fn subscribe(&self) -> broadcast::Receiver<ServerMessage> {
169        self.broadcast_tx.subscribe()
170    }
171}
172
173impl Default for SyncState {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179pub type SharedSyncState = Arc<RwLock<SyncState>>;
180
181/// Create shared sync state
182pub fn create_sync_state() -> SharedSyncState {
183    Arc::new(RwLock::new(SyncState::new()))
184}
185
186// =============================================================================
187// HTTP/REST Sync Endpoints
188// =============================================================================
189
190/// Query parameters for delta request
191#[derive(Debug, Deserialize)]
192pub struct DeltaQuery {
193    pub from: Option<u64>,
194}
195
196/// Get current sync version
197pub async fn get_sync_version(sync_state: web::Data<SharedSyncState>) -> HttpResponse {
198    let state = sync_state.read().unwrap();
199    HttpResponse::Ok().json(serde_json::json!({
200        "success": true,
201        "data": {
202            "version": state.version,
203            "eventCount": state.events.len(),
204        }
205    }))
206}
207
208/// Get sync delta since a version
209pub async fn get_sync_delta(
210    sync_state: web::Data<SharedSyncState>,
211    query: web::Query<DeltaQuery>,
212) -> HttpResponse {
213    let from_version = query.from.unwrap_or(0);
214
215    let state = sync_state.read().unwrap();
216    let delta = state.get_delta(from_version);
217
218    HttpResponse::Ok().json(serde_json::json!({
219        "success": true,
220        "data": delta,
221    }))
222}
223
224/// Post a sync event
225pub async fn post_sync_event(
226    sync_state: web::Data<SharedSyncState>,
227    body: web::Json<SyncEvent>,
228) -> HttpResponse {
229    let mut state = sync_state.write().unwrap();
230    let version = state.add_event(body.into_inner());
231
232    HttpResponse::Ok().json(serde_json::json!({
233        "success": true,
234        "data": {
235            "version": version,
236        }
237    }))
238}
239
240/// Batch sync events request
241#[derive(Debug, Deserialize)]
242pub struct BatchSyncRequest {
243    pub events: Vec<SyncEvent>,
244}
245
246/// Post multiple sync events
247pub async fn post_sync_batch(
248    sync_state: web::Data<SharedSyncState>,
249    body: web::Json<BatchSyncRequest>,
250) -> HttpResponse {
251    let mut state = sync_state.write().unwrap();
252    let mut last_version = 0;
253
254    for event in body.into_inner().events {
255        last_version = state.add_event(event);
256    }
257
258    HttpResponse::Ok().json(serde_json::json!({
259        "success": true,
260        "data": {
261            "version": last_version,
262        }
263    }))
264}
265
266/// Get full snapshot
267pub async fn get_sync_snapshot(
268    sync_state: web::Data<SharedSyncState>,
269    app_state: web::Data<crate::api::state::AppState>,
270) -> HttpResponse {
271    let db = app_state.db.lock().unwrap();
272    let sync = sync_state.read().unwrap();
273
274    // Get workspaces from database
275    let workspaces = db
276        .list_workspaces()
277        .unwrap_or_default()
278        .into_iter()
279        .map(|w| serde_json::to_value(w).unwrap_or_default())
280        .collect();
281
282    // Get sessions (3 args: workspace_id, provider, limit)
283    let sessions = db
284        .list_sessions(None, None, 1000)
285        .unwrap_or_default()
286        .into_iter()
287        .map(|s| serde_json::to_value(s).unwrap_or_default())
288        .collect();
289
290    // Query agents directly from database
291    let agents: Vec<serde_json::Value> = query_agents_from_db(&db.conn).unwrap_or_default();
292
293    // Query swarms directly from database
294    let swarms: Vec<serde_json::Value> = query_swarms_from_db(&db.conn).unwrap_or_default();
295
296    // Providers are hardcoded, return empty for snapshot
297    // (clients should call /api/providers for the full list)
298    let providers: Vec<serde_json::Value> = Vec::new();
299
300    let snapshot = SyncSnapshot {
301        workspaces,
302        sessions,
303        agents,
304        swarms,
305        providers,
306        timestamp: chrono::Utc::now().timestamp_millis(),
307        version: sync.version,
308    };
309
310    HttpResponse::Ok().json(serde_json::json!({
311        "success": true,
312        "data": snapshot,
313    }))
314}
315
316/// Query agents directly from database
317fn query_agents_from_db(
318    conn: &rusqlite::Connection,
319) -> Result<Vec<serde_json::Value>, rusqlite::Error> {
320    // Ensure table exists
321    conn.execute(
322        "CREATE TABLE IF NOT EXISTS agents (
323            id TEXT PRIMARY KEY,
324            name TEXT NOT NULL,
325            description TEXT,
326            instruction TEXT NOT NULL,
327            role TEXT,
328            model TEXT,
329            provider TEXT,
330            temperature REAL DEFAULT 0.7,
331            max_tokens INTEGER,
332            tools TEXT,
333            sub_agents TEXT,
334            is_active INTEGER DEFAULT 1,
335            created_at INTEGER NOT NULL,
336            updated_at INTEGER NOT NULL,
337            metadata TEXT
338        )",
339        [],
340    )?;
341
342    let mut stmt = conn.prepare(
343        "SELECT id, name, description, instruction, role, model, provider, 
344                temperature, max_tokens, tools, sub_agents, is_active, 
345                created_at, updated_at, metadata 
346         FROM agents ORDER BY updated_at DESC",
347    )?;
348
349    let agents: Vec<serde_json::Value> = stmt
350        .query_map([], |row| {
351            let tools_str: String = row.get::<_, Option<String>>(9)?.unwrap_or_default();
352            let tools: Vec<String> = serde_json::from_str(&tools_str).unwrap_or_default();
353            let sub_agents_str: String = row.get::<_, Option<String>>(10)?.unwrap_or_default();
354            let sub_agents: Vec<String> = serde_json::from_str(&sub_agents_str).unwrap_or_default();
355            Ok(serde_json::json!({
356                "id": row.get::<_, String>(0)?,
357                "name": row.get::<_, String>(1)?,
358                "description": row.get::<_, Option<String>>(2)?,
359                "instruction": row.get::<_, String>(3)?,
360                "role": row.get::<_, Option<String>>(4)?,
361                "model": row.get::<_, Option<String>>(5)?,
362                "provider": row.get::<_, Option<String>>(6)?,
363                "temperature": row.get::<_, f64>(7)?,
364                "maxTokens": row.get::<_, Option<i32>>(8)?,
365                "tools": tools,
366                "subAgents": sub_agents,
367                "isActive": row.get::<_, i32>(11)? == 1,
368                "createdAt": row.get::<_, i64>(12)?,
369                "updatedAt": row.get::<_, i64>(13)?,
370                "metadata": row.get::<_, Option<String>>(14)?,
371            }))
372        })?
373        .collect::<Result<Vec<_>, _>>()?;
374
375    Ok(agents)
376}
377
378/// Query swarms directly from database  
379fn query_swarms_from_db(
380    conn: &rusqlite::Connection,
381) -> Result<Vec<serde_json::Value>, rusqlite::Error> {
382    // Ensure table exists
383    conn.execute(
384        "CREATE TABLE IF NOT EXISTS swarms (
385            id TEXT PRIMARY KEY,
386            name TEXT NOT NULL,
387            description TEXT,
388            agents TEXT NOT NULL,
389            orchestrator TEXT,
390            is_active INTEGER DEFAULT 1,
391            created_at INTEGER NOT NULL,
392            updated_at INTEGER NOT NULL,
393            metadata TEXT
394        )",
395        [],
396    )?;
397
398    let mut stmt = conn.prepare(
399        "SELECT id, name, description, agents, orchestrator, is_active, 
400                created_at, updated_at, metadata 
401         FROM swarms ORDER BY updated_at DESC",
402    )?;
403
404    let swarms: Vec<serde_json::Value> = stmt
405        .query_map([], |row| {
406            let agents_str: String = row.get::<_, String>(3)?;
407            let agents: Vec<String> = serde_json::from_str(&agents_str).unwrap_or_default();
408            Ok(serde_json::json!({
409                "id": row.get::<_, String>(0)?,
410                "name": row.get::<_, String>(1)?,
411                "description": row.get::<_, Option<String>>(2)?,
412                "agents": agents,
413                "orchestrator": row.get::<_, Option<String>>(4)?,
414                "isActive": row.get::<_, i32>(5)? == 1,
415                "createdAt": row.get::<_, i64>(6)?,
416                "updatedAt": row.get::<_, i64>(7)?,
417                "metadata": row.get::<_, Option<String>>(8)?,
418            }))
419        })?
420        .collect::<Result<Vec<_>, _>>()?;
421
422    Ok(swarms)
423}
424
425/// Subscribe to sync events via Server-Sent Events (SSE)
426pub async fn sync_sse(sync_state: web::Data<SharedSyncState>) -> HttpResponse {
427    let state = sync_state.read().unwrap();
428    let mut rx = state.subscribe();
429    let current_version = state.version;
430    drop(state);
431
432    let stream = async_stream::stream! {
433        // Send initial welcome with version
434        let welcome = ServerMessage::Welcome { version: current_version };
435        if let Ok(json) = serde_json::to_string(&welcome) {
436            yield Ok::<_, std::io::Error>(web::Bytes::from(format!("data: {}\n\n", json)));
437        }
438
439        // Stream events
440        loop {
441            match rx.recv().await {
442                Ok(msg) => {
443                    if let Ok(json) = serde_json::to_string(&msg) {
444                        yield Ok(web::Bytes::from(format!("data: {}\n\n", json)));
445                    }
446                }
447                Err(broadcast::error::RecvError::Lagged(_)) => {
448                    // Client missed some messages, they should request a delta
449                    continue;
450                }
451                Err(broadcast::error::RecvError::Closed) => {
452                    break;
453                }
454            }
455        }
456    };
457
458    HttpResponse::Ok()
459        .content_type("text/event-stream")
460        .insert_header(("Cache-Control", "no-cache"))
461        .insert_header(("Connection", "keep-alive"))
462        .streaming(stream)
463}
464
465// =============================================================================
466// Route Configuration
467// =============================================================================
468
469/// Configure sync routes
470pub fn configure_sync_routes(cfg: &mut web::ServiceConfig) {
471    cfg.service(
472        web::scope("/sync")
473            .route("/version", web::get().to(get_sync_version))
474            .route("/delta", web::get().to(get_sync_delta))
475            .route("/event", web::post().to(post_sync_event))
476            .route("/batch", web::post().to(post_sync_batch))
477            .route("/snapshot", web::get().to(get_sync_snapshot))
478            .route("/subscribe", web::get().to(sync_sse)),
479    );
480}