Skip to main content

spec_ai/spec_ai_api/api/
sync_handlers.rs

1use crate::spec_ai_api::api::handlers::AppState;
2use axum::extract::{Json, Path, State};
3use axum::http::StatusCode;
4use axum::response::IntoResponse;
5use serde::{Deserialize, Serialize};
6use crate::spec_ai_core::sync::{
7    GraphSyncPayload, SyncEngine, SyncPersistenceAdapter, SyncType, VectorClock,
8};
9
10/// Request to initiate a sync
11#[derive(Debug, Deserialize)]
12pub struct SyncRequest {
13    pub session_id: String,
14    pub graph_name: Option<String>,
15    pub requesting_instance: String,
16    pub vector_clock: Option<String>,
17}
18
19/// Response from a sync request
20#[derive(Debug, Serialize)]
21pub struct SyncResponse {
22    pub success: bool,
23    pub message: String,
24    pub payload: Option<GraphSyncPayload>,
25}
26
27/// Status of sync for a graph
28#[derive(Debug, Serialize)]
29pub struct SyncStatus {
30    pub session_id: String,
31    pub graph_name: String,
32    pub sync_enabled: bool,
33    pub vector_clock: String,
34    pub last_sync_at: Option<String>,
35    pub pending_changes: usize,
36}
37
38/// Request to enable/disable sync
39#[derive(Debug, Deserialize)]
40pub struct SyncToggleRequest {
41    pub enabled: bool,
42}
43
44/// Conflict information
45#[derive(Debug, Serialize)]
46pub struct ConflictInfo {
47    pub session_id: String,
48    pub graph_name: Option<String>,
49    pub entity_type: String,
50    pub entity_id: i64,
51    pub resolution: Option<String>,
52    pub local_version: String,
53    pub remote_version: String,
54    pub detected_at: String,
55}
56
57/// Handle sync request from a peer
58pub async fn handle_sync_request(
59    State(state): State<AppState>,
60    Json(request): Json<SyncRequest>,
61) -> impl IntoResponse {
62    let persistence = state.persistence.clone();
63    let instance_id = persistence.instance_id().to_string();
64    let adapter = SyncPersistenceAdapter::new(persistence.clone());
65    let sync_engine = SyncEngine::new(adapter, instance_id);
66
67    // Parse their vector clock
68    let their_vc = if let Some(ref vc_str) = request.vector_clock {
69        match VectorClock::from_json(vc_str) {
70            Ok(vc) => vc,
71            Err(e) => {
72                return (
73                    StatusCode::BAD_REQUEST,
74                    Json(SyncResponse {
75                        success: false,
76                        message: format!("Invalid vector clock: {}", e),
77                        payload: None,
78                    }),
79                );
80            }
81        }
82    } else {
83        VectorClock::new()
84    };
85
86    // Decide sync strategy
87    let sync_type = match sync_engine
88        .decide_sync_strategy(
89            &request.session_id,
90            request.graph_name.as_deref().unwrap_or("default"),
91            &their_vc,
92        )
93        .await
94    {
95        Ok(st) => st,
96        Err(e) => {
97            return (
98                StatusCode::INTERNAL_SERVER_ERROR,
99                Json(SyncResponse {
100                    success: false,
101                    message: format!("Failed to determine sync strategy: {}", e),
102                    payload: None,
103                }),
104            );
105        }
106    };
107
108    // Perform sync based on strategy
109    let payload = match sync_type {
110        SyncType::Full => {
111            match sync_engine
112                .sync_full(
113                    &request.session_id,
114                    request.graph_name.as_deref().unwrap_or("default"),
115                )
116                .await
117            {
118                Ok(p) => p,
119                Err(e) => {
120                    return (
121                        StatusCode::INTERNAL_SERVER_ERROR,
122                        Json(SyncResponse {
123                            success: false,
124                            message: format!("Full sync failed: {}", e),
125                            payload: None,
126                        }),
127                    );
128                }
129            }
130        }
131        SyncType::Incremental => {
132            match sync_engine
133                .sync_incremental(
134                    &request.session_id,
135                    request.graph_name.as_deref().unwrap_or("default"),
136                    &their_vc,
137                )
138                .await
139            {
140                Ok(p) => p,
141                Err(e) => {
142                    return (
143                        StatusCode::INTERNAL_SERVER_ERROR,
144                        Json(SyncResponse {
145                            success: false,
146                            message: format!("Incremental sync failed: {}", e),
147                            payload: None,
148                        }),
149                    );
150                }
151            }
152        }
153        _ => {
154            return (
155                StatusCode::BAD_REQUEST,
156                Json(SyncResponse {
157                    success: false,
158                    message: "Unsupported sync type".to_string(),
159                    payload: None,
160                }),
161            );
162        }
163    };
164
165    (
166        StatusCode::OK,
167        Json(SyncResponse {
168            success: true,
169            message: format!("{:?} sync completed", sync_type),
170            payload: Some(payload),
171        }),
172    )
173}
174
175/// Apply incoming sync data
176pub async fn handle_sync_apply(
177    State(state): State<AppState>,
178    Json(payload): Json<GraphSyncPayload>,
179) -> impl IntoResponse {
180    let persistence = state.persistence.clone();
181    let instance_id = persistence.instance_id().to_string();
182    let adapter = SyncPersistenceAdapter::new(persistence.clone());
183    let sync_engine = SyncEngine::new(adapter, instance_id);
184
185    let graph_name = payload.graph_name.as_deref().unwrap_or("default");
186
187    match sync_engine.apply_sync(&payload, graph_name).await {
188        Ok(stats) => (
189            StatusCode::OK,
190            Json(serde_json::json!({
191                "success": true,
192                "message": "Sync applied successfully",
193                "stats": {
194                    "nodes_applied": stats.nodes_applied,
195                    "edges_applied": stats.edges_applied,
196                    "tombstones_applied": stats.tombstones_applied,
197                    "conflicts_detected": stats.conflicts_detected,
198                    "conflicts_resolved": stats.conflicts_resolved,
199                    "sync_type": stats.sync_type
200                }
201            })),
202        ),
203        Err(e) => (
204            StatusCode::INTERNAL_SERVER_ERROR,
205            Json(serde_json::json!({
206                "success": false,
207                "message": format!("Failed to apply sync: {}", e)
208            })),
209        ),
210    }
211}
212
213/// Get sync status for a graph
214pub async fn get_sync_status(
215    State(state): State<AppState>,
216    Path((session_id, graph_name)): Path<(String, String)>,
217) -> impl IntoResponse {
218    let persistence = &state.persistence;
219    let instance_id = persistence.instance_id().to_string();
220
221    // Check if sync is enabled
222    let sync_enabled = match persistence.graph_get_sync_enabled(&session_id, &graph_name) {
223        Ok(enabled) => enabled,
224        Err(e) => {
225            return (
226                StatusCode::INTERNAL_SERVER_ERROR,
227                Json(serde_json::json!({
228                    "error": format!("Failed to get sync status: {}", e)
229                })),
230            )
231                .into_response();
232        }
233    };
234
235    // Get vector clock
236    let sync_state =
237        match persistence.graph_sync_state_get_metadata(&instance_id, &session_id, &graph_name) {
238            Ok(state) => state,
239            Err(e) => {
240                return (
241                    StatusCode::INTERNAL_SERVER_ERROR,
242                    Json(serde_json::json!({
243                        "error": format!("Failed to get vector clock: {}", e)
244                    })),
245                )
246                    .into_response();
247            }
248        };
249
250    let vector_clock = sync_state
251        .as_ref()
252        .map(|s| s.vector_clock.clone())
253        .unwrap_or_else(|| "{}".to_string());
254    let last_sync_at = sync_state.and_then(|s| s.last_sync_at);
255
256    // Count pending changes (approximate)
257    let since_timestamp = chrono::Utc::now()
258        .checked_sub_signed(chrono::Duration::hours(1))
259        .unwrap()
260        .to_rfc3339();
261
262    let pending_changes = match persistence.graph_changelog_get_since(&session_id, &since_timestamp)
263    {
264        Ok(entries) => entries.len(),
265        Err(_) => 0,
266    };
267
268    (
269        StatusCode::OK,
270        Json(SyncStatus {
271            session_id,
272            graph_name,
273            sync_enabled,
274            vector_clock,
275            last_sync_at,
276            pending_changes,
277        }),
278    )
279        .into_response()
280}
281
282/// Enable or disable sync for a graph
283pub async fn toggle_sync(
284    State(state): State<AppState>,
285    Path((session_id, graph_name)): Path<(String, String)>,
286    Json(request): Json<SyncToggleRequest>,
287) -> impl IntoResponse {
288    let persistence = &state.persistence;
289
290    match persistence.graph_set_sync_enabled(&session_id, &graph_name, request.enabled) {
291        Ok(_) => (
292            StatusCode::OK,
293            Json(serde_json::json!({
294                "success": true,
295                "message": format!("Sync {} for graph {}/{}",
296                    if request.enabled { "enabled" } else { "disabled" },
297                    session_id, graph_name),
298                "enabled": request.enabled
299            })),
300        ),
301        Err(e) => (
302            StatusCode::INTERNAL_SERVER_ERROR,
303            Json(serde_json::json!({
304                "success": false,
305                "message": format!("Failed to toggle sync: {}", e)
306            })),
307        ),
308    }
309}
310
311/// List all graphs with their sync status
312pub async fn list_sync_configs(
313    State(state): State<AppState>,
314    Path(session_id): Path<String>,
315) -> impl IntoResponse {
316    let persistence = &state.persistence;
317    let instance_id = persistence.instance_id().to_string();
318
319    // Get all graphs for this session
320    match persistence.graph_list(&session_id) {
321        Ok(graphs) => {
322            let mut configs = Vec::new();
323            for graph_name in graphs {
324                let config = match persistence.graph_get_sync_config(&session_id, &graph_name) {
325                    Ok(cfg) => cfg,
326                    Err(e) => {
327                        tracing::warn!(
328                            "Failed to load sync config for {}/{}: {}",
329                            session_id,
330                            graph_name,
331                            e
332                        );
333                        Default::default()
334                    }
335                };
336
337                let last_sync_at = match persistence.graph_sync_state_get_metadata(
338                    &instance_id,
339                    &session_id,
340                    &graph_name,
341                ) {
342                    Ok(state) => state.and_then(|s| s.last_sync_at),
343                    Err(e) => {
344                        tracing::warn!(
345                            "Failed to load sync state for {}/{}: {}",
346                            session_id,
347                            graph_name,
348                            e
349                        );
350                        None
351                    }
352                };
353
354                configs.push(serde_json::json!({
355                    "graph_name": graph_name,
356                    "sync_enabled": config.sync_enabled,
357                    "conflict_resolution_strategy": config.conflict_resolution_strategy.unwrap_or_else(|| "vector_clock".to_string()),
358                    "sync_interval_seconds": config.sync_interval_seconds.unwrap_or(60),
359                    "last_sync_at": last_sync_at,
360                }));
361            }
362
363            (
364                StatusCode::OK,
365                Json(serde_json::json!({
366                    "success": true,
367                    "session_id": session_id,
368                    "graphs": configs
369                })),
370            )
371        }
372        Err(e) => (
373            StatusCode::INTERNAL_SERVER_ERROR,
374            Json(serde_json::json!({
375                "success": false,
376                "message": format!("Failed to list sync configs: {}", e)
377            })),
378        ),
379    }
380}
381
382/// Bulk enable/disable sync for multiple graphs
383#[derive(Debug, Deserialize)]
384pub struct BulkSyncRequest {
385    pub graphs: Vec<String>,
386    pub enabled: bool,
387}
388
389pub async fn bulk_toggle_sync(
390    State(state): State<AppState>,
391    Path(session_id): Path<String>,
392    Json(request): Json<BulkSyncRequest>,
393) -> impl IntoResponse {
394    let persistence = &state.persistence;
395    let mut results = Vec::new();
396    let mut failed = Vec::new();
397
398    for graph_name in &request.graphs {
399        match persistence.graph_set_sync_enabled(&session_id, graph_name, request.enabled) {
400            Ok(_) => results.push(graph_name.clone()),
401            Err(e) => failed.push(serde_json::json!({
402                "graph": graph_name,
403                "error": e.to_string()
404            })),
405        }
406    }
407
408    (
409        StatusCode::OK,
410        Json(serde_json::json!({
411            "success": failed.is_empty(),
412            "message": format!("Sync {} for {} graphs",
413                if request.enabled { "enabled" } else { "disabled" },
414                results.len()),
415            "updated": results,
416            "failed": failed
417        })),
418    )
419}
420
421/// Configure sync parameters for a graph
422#[derive(Debug, Deserialize)]
423pub struct SyncConfig {
424    pub sync_enabled: bool,
425    pub conflict_resolution_strategy: Option<String>, // "vector_clock", "last_write_wins", "manual"
426    pub sync_interval_seconds: Option<u64>,
427}
428
429pub async fn configure_sync(
430    State(state): State<AppState>,
431    Path((session_id, graph_name)): Path<(String, String)>,
432    Json(config): Json<SyncConfig>,
433) -> impl IntoResponse {
434    let persistence = &state.persistence;
435
436    match persistence.graph_set_sync_config(
437        &session_id,
438        &graph_name,
439        config.sync_enabled,
440        config.conflict_resolution_strategy.as_deref(),
441        config.sync_interval_seconds,
442    ) {
443        Ok(saved) => (
444            StatusCode::OK,
445            Json(serde_json::json!({
446                "success": true,
447                "message": format!("Sync configuration updated for graph {}/{}", session_id, graph_name),
448                "config": {
449                    "sync_enabled": saved.sync_enabled,
450                    "conflict_resolution_strategy": saved.conflict_resolution_strategy.unwrap_or_else(|| "vector_clock".to_string()),
451                    "sync_interval_seconds": saved.sync_interval_seconds.unwrap_or(60),
452                }
453            })),
454        ),
455        Err(e) => (
456            StatusCode::INTERNAL_SERVER_ERROR,
457            Json(serde_json::json!({
458                "success": false,
459                "message": format!("Failed to configure sync: {}", e)
460            })),
461        ),
462    }
463}
464
465/// List unresolved conflicts
466pub async fn list_conflicts(State(state): State<AppState>) -> impl IntoResponse {
467    match state.persistence.graph_list_conflicts(None) {
468        Ok(entries) => {
469            let conflicts: Vec<ConflictInfo> = entries
470                .into_iter()
471                .map(|entry| {
472                    let parsed_data = entry
473                        .data
474                        .as_deref()
475                        .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok());
476
477                    let local_version = parsed_data
478                        .as_ref()
479                        .and_then(|val| val.get("local_version"))
480                        .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
481                        .unwrap_or_else(|| "null".to_string());
482
483                    let remote_version = parsed_data
484                        .as_ref()
485                        .and_then(|val| val.get("remote_version"))
486                        .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
487                        .unwrap_or_else(|| "null".to_string());
488
489                    ConflictInfo {
490                        session_id: entry.session_id,
491                        graph_name: parsed_data
492                            .as_ref()
493                            .and_then(|val| val.get("graph_name"))
494                            .and_then(|v| v.as_str().map(|s| s.to_string())),
495                        entity_type: entry.entity_type,
496                        entity_id: entry.entity_id,
497                        resolution: parsed_data
498                            .as_ref()
499                            .and_then(|val| val.get("resolution"))
500                            .and_then(|v| v.as_str().map(|s| s.to_string())),
501                        local_version,
502                        remote_version,
503                        detected_at: entry.created_at.to_rfc3339(),
504                    }
505                })
506                .collect();
507
508            (StatusCode::OK, Json(conflicts)).into_response()
509        }
510        Err(e) => (
511            StatusCode::INTERNAL_SERVER_ERROR,
512            Json(serde_json::json!({
513                "error": format!("Failed to list conflicts: {}", e)
514            })),
515        )
516            .into_response(),
517    }
518}