1use actix_web::{web, HttpResponse};
12use serde::{Deserialize, Serialize};
13use std::sync::{Arc, RwLock};
14use tokio::sync::broadcast;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum SyncEntityType {
23 Workspace,
24 Session,
25 Message,
26 Agent,
27 Swarm,
28 Provider,
29 Settings,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33#[serde(rename_all = "snake_case")]
34pub enum SyncOperation {
35 Create,
36 Update,
37 Delete,
38 Sync,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SyncEvent {
43 pub id: String,
44 #[serde(rename = "type")]
45 pub entity_type: SyncEntityType,
46 pub operation: SyncOperation,
47 pub entity_id: String,
48 pub data: Option<serde_json::Value>,
49 pub timestamp: i64,
50 pub client_id: String,
51 pub version: u64,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct SyncSnapshot {
56 pub workspaces: Vec<serde_json::Value>,
57 pub sessions: Vec<serde_json::Value>,
58 pub agents: Vec<serde_json::Value>,
59 pub swarms: Vec<serde_json::Value>,
60 pub providers: Vec<serde_json::Value>,
61 pub timestamp: i64,
62 pub version: u64,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct SyncDelta {
67 pub created: Vec<SyncEvent>,
68 pub updated: Vec<SyncEvent>,
69 pub deleted: Vec<SyncEvent>,
70 pub timestamp: i64,
71 pub from_version: u64,
72 pub to_version: u64,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76#[serde(tag = "type", rename_all = "snake_case")]
77pub enum ServerMessage {
78 Welcome { version: u64 },
79 SyncEvent { event: SyncEvent },
80 Ack { version: u64 },
81}
82
83pub struct SyncState {
89 pub version: u64,
91 pub events: Vec<SyncEvent>,
93 pub max_history: usize,
95 pub broadcast_tx: broadcast::Sender<ServerMessage>,
97}
98
99impl SyncState {
100 pub fn new() -> Self {
101 let (broadcast_tx, _) = broadcast::channel(1000);
102 Self {
103 version: 0,
104 events: Vec::new(),
105 max_history: 10000,
106 broadcast_tx,
107 }
108 }
109
110 pub fn next_version(&mut self) -> u64 {
112 self.version += 1;
113 self.version
114 }
115
116 pub fn add_event(&mut self, mut event: SyncEvent) -> u64 {
118 let version = self.next_version();
119 event.version = version;
120 event.timestamp = chrono::Utc::now().timestamp_millis();
121
122 self.events.push(event.clone());
123
124 if self.events.len() > self.max_history {
126 let trim_count = self.events.len() - self.max_history;
127 self.events.drain(0..trim_count);
128 }
129
130 let _ = self.broadcast_tx.send(ServerMessage::SyncEvent { event });
132
133 version
134 }
135
136 pub fn get_delta(&self, from_version: u64) -> SyncDelta {
138 let events: Vec<_> = self
139 .events
140 .iter()
141 .filter(|e| e.version > from_version)
142 .cloned()
143 .collect();
144
145 let mut created = Vec::new();
146 let mut updated = Vec::new();
147 let mut deleted = Vec::new();
148
149 for event in events {
150 match event.operation {
151 SyncOperation::Create => created.push(event),
152 SyncOperation::Update | SyncOperation::Sync => updated.push(event),
153 SyncOperation::Delete => deleted.push(event),
154 }
155 }
156
157 SyncDelta {
158 created,
159 updated,
160 deleted,
161 timestamp: chrono::Utc::now().timestamp_millis(),
162 from_version,
163 to_version: self.version,
164 }
165 }
166
167 pub fn subscribe(&self) -> broadcast::Receiver<ServerMessage> {
169 self.broadcast_tx.subscribe()
170 }
171}
172
173impl Default for SyncState {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179pub type SharedSyncState = Arc<RwLock<SyncState>>;
180
181pub fn create_sync_state() -> SharedSyncState {
183 Arc::new(RwLock::new(SyncState::new()))
184}
185
186#[derive(Debug, Deserialize)]
192pub struct DeltaQuery {
193 pub from: Option<u64>,
194}
195
196pub async fn get_sync_version(sync_state: web::Data<SharedSyncState>) -> HttpResponse {
198 let state = sync_state.read().unwrap();
199 HttpResponse::Ok().json(serde_json::json!({
200 "success": true,
201 "data": {
202 "version": state.version,
203 "eventCount": state.events.len(),
204 }
205 }))
206}
207
208pub async fn get_sync_delta(
210 sync_state: web::Data<SharedSyncState>,
211 query: web::Query<DeltaQuery>,
212) -> HttpResponse {
213 let from_version = query.from.unwrap_or(0);
214
215 let state = sync_state.read().unwrap();
216 let delta = state.get_delta(from_version);
217
218 HttpResponse::Ok().json(serde_json::json!({
219 "success": true,
220 "data": delta,
221 }))
222}
223
224pub async fn post_sync_event(
226 sync_state: web::Data<SharedSyncState>,
227 body: web::Json<SyncEvent>,
228) -> HttpResponse {
229 let mut state = sync_state.write().unwrap();
230 let version = state.add_event(body.into_inner());
231
232 HttpResponse::Ok().json(serde_json::json!({
233 "success": true,
234 "data": {
235 "version": version,
236 }
237 }))
238}
239
240#[derive(Debug, Deserialize)]
242pub struct BatchSyncRequest {
243 pub events: Vec<SyncEvent>,
244}
245
246pub async fn post_sync_batch(
248 sync_state: web::Data<SharedSyncState>,
249 body: web::Json<BatchSyncRequest>,
250) -> HttpResponse {
251 let mut state = sync_state.write().unwrap();
252 let mut last_version = 0;
253
254 for event in body.into_inner().events {
255 last_version = state.add_event(event);
256 }
257
258 HttpResponse::Ok().json(serde_json::json!({
259 "success": true,
260 "data": {
261 "version": last_version,
262 }
263 }))
264}
265
266pub async fn get_sync_snapshot(
268 sync_state: web::Data<SharedSyncState>,
269 app_state: web::Data<crate::api::state::AppState>,
270) -> HttpResponse {
271 let db = app_state.db.lock().unwrap();
272 let sync = sync_state.read().unwrap();
273
274 let workspaces = db
276 .list_workspaces()
277 .unwrap_or_default()
278 .into_iter()
279 .map(|w| serde_json::to_value(w).unwrap_or_default())
280 .collect();
281
282 let sessions = db
284 .list_sessions(None, None, 1000)
285 .unwrap_or_default()
286 .into_iter()
287 .map(|s| serde_json::to_value(s).unwrap_or_default())
288 .collect();
289
290 let agents: Vec<serde_json::Value> = query_agents_from_db(&db.conn).unwrap_or_default();
292
293 let swarms: Vec<serde_json::Value> = query_swarms_from_db(&db.conn).unwrap_or_default();
295
296 let providers: Vec<serde_json::Value> = Vec::new();
299
300 let snapshot = SyncSnapshot {
301 workspaces,
302 sessions,
303 agents,
304 swarms,
305 providers,
306 timestamp: chrono::Utc::now().timestamp_millis(),
307 version: sync.version,
308 };
309
310 HttpResponse::Ok().json(serde_json::json!({
311 "success": true,
312 "data": snapshot,
313 }))
314}
315
316fn query_agents_from_db(
318 conn: &rusqlite::Connection,
319) -> Result<Vec<serde_json::Value>, rusqlite::Error> {
320 conn.execute(
322 "CREATE TABLE IF NOT EXISTS agents (
323 id TEXT PRIMARY KEY,
324 name TEXT NOT NULL,
325 description TEXT,
326 instruction TEXT NOT NULL,
327 role TEXT,
328 model TEXT,
329 provider TEXT,
330 temperature REAL DEFAULT 0.7,
331 max_tokens INTEGER,
332 tools TEXT,
333 sub_agents TEXT,
334 is_active INTEGER DEFAULT 1,
335 created_at INTEGER NOT NULL,
336 updated_at INTEGER NOT NULL,
337 metadata TEXT
338 )",
339 [],
340 )?;
341
342 let mut stmt = conn.prepare(
343 "SELECT id, name, description, instruction, role, model, provider,
344 temperature, max_tokens, tools, sub_agents, is_active,
345 created_at, updated_at, metadata
346 FROM agents ORDER BY updated_at DESC",
347 )?;
348
349 let agents: Vec<serde_json::Value> = stmt
350 .query_map([], |row| {
351 let tools_str: String = row.get::<_, Option<String>>(9)?.unwrap_or_default();
352 let tools: Vec<String> = serde_json::from_str(&tools_str).unwrap_or_default();
353 let sub_agents_str: String = row.get::<_, Option<String>>(10)?.unwrap_or_default();
354 let sub_agents: Vec<String> = serde_json::from_str(&sub_agents_str).unwrap_or_default();
355 Ok(serde_json::json!({
356 "id": row.get::<_, String>(0)?,
357 "name": row.get::<_, String>(1)?,
358 "description": row.get::<_, Option<String>>(2)?,
359 "instruction": row.get::<_, String>(3)?,
360 "role": row.get::<_, Option<String>>(4)?,
361 "model": row.get::<_, Option<String>>(5)?,
362 "provider": row.get::<_, Option<String>>(6)?,
363 "temperature": row.get::<_, f64>(7)?,
364 "maxTokens": row.get::<_, Option<i32>>(8)?,
365 "tools": tools,
366 "subAgents": sub_agents,
367 "isActive": row.get::<_, i32>(11)? == 1,
368 "createdAt": row.get::<_, i64>(12)?,
369 "updatedAt": row.get::<_, i64>(13)?,
370 "metadata": row.get::<_, Option<String>>(14)?,
371 }))
372 })?
373 .collect::<Result<Vec<_>, _>>()?;
374
375 Ok(agents)
376}
377
378fn query_swarms_from_db(
380 conn: &rusqlite::Connection,
381) -> Result<Vec<serde_json::Value>, rusqlite::Error> {
382 conn.execute(
384 "CREATE TABLE IF NOT EXISTS swarms (
385 id TEXT PRIMARY KEY,
386 name TEXT NOT NULL,
387 description TEXT,
388 agents TEXT NOT NULL,
389 orchestrator TEXT,
390 is_active INTEGER DEFAULT 1,
391 created_at INTEGER NOT NULL,
392 updated_at INTEGER NOT NULL,
393 metadata TEXT
394 )",
395 [],
396 )?;
397
398 let mut stmt = conn.prepare(
399 "SELECT id, name, description, agents, orchestrator, is_active,
400 created_at, updated_at, metadata
401 FROM swarms ORDER BY updated_at DESC",
402 )?;
403
404 let swarms: Vec<serde_json::Value> = stmt
405 .query_map([], |row| {
406 let agents_str: String = row.get::<_, String>(3)?;
407 let agents: Vec<String> = serde_json::from_str(&agents_str).unwrap_or_default();
408 Ok(serde_json::json!({
409 "id": row.get::<_, String>(0)?,
410 "name": row.get::<_, String>(1)?,
411 "description": row.get::<_, Option<String>>(2)?,
412 "agents": agents,
413 "orchestrator": row.get::<_, Option<String>>(4)?,
414 "isActive": row.get::<_, i32>(5)? == 1,
415 "createdAt": row.get::<_, i64>(6)?,
416 "updatedAt": row.get::<_, i64>(7)?,
417 "metadata": row.get::<_, Option<String>>(8)?,
418 }))
419 })?
420 .collect::<Result<Vec<_>, _>>()?;
421
422 Ok(swarms)
423}
424
425pub async fn sync_sse(sync_state: web::Data<SharedSyncState>) -> HttpResponse {
427 let state = sync_state.read().unwrap();
428 let mut rx = state.subscribe();
429 let current_version = state.version;
430 drop(state);
431
432 let stream = async_stream::stream! {
433 let welcome = ServerMessage::Welcome { version: current_version };
435 if let Ok(json) = serde_json::to_string(&welcome) {
436 yield Ok::<_, std::io::Error>(web::Bytes::from(format!("data: {}\n\n", json)));
437 }
438
439 loop {
441 match rx.recv().await {
442 Ok(msg) => {
443 if let Ok(json) = serde_json::to_string(&msg) {
444 yield Ok(web::Bytes::from(format!("data: {}\n\n", json)));
445 }
446 }
447 Err(broadcast::error::RecvError::Lagged(_)) => {
448 continue;
450 }
451 Err(broadcast::error::RecvError::Closed) => {
452 break;
453 }
454 }
455 }
456 };
457
458 HttpResponse::Ok()
459 .content_type("text/event-stream")
460 .insert_header(("Cache-Control", "no-cache"))
461 .insert_header(("Connection", "keep-alive"))
462 .streaming(stream)
463}
464
465pub fn configure_sync_routes(cfg: &mut web::ServiceConfig) {
471 cfg.service(
472 web::scope("/sync")
473 .route("/version", web::get().to(get_sync_version))
474 .route("/delta", web::get().to(get_sync_delta))
475 .route("/event", web::post().to(post_sync_event))
476 .route("/batch", web::post().to(post_sync_batch))
477 .route("/snapshot", web::get().to(get_sync_snapshot))
478 .route("/subscribe", web::get().to(sync_sse)),
479 );
480}