spec_ai_api/api/
sync_handlers.rs

1use crate::api::handlers::AppState;
2use axum::extract::{Json, Path, State};
3use axum::http::StatusCode;
4use axum::response::IntoResponse;
5use serde::{Deserialize, Serialize};
6use spec_ai_core::sync::{GraphSyncPayload, SyncEngine, SyncType, VectorClock};
7
8/// Request to initiate a sync
9#[derive(Debug, Deserialize)]
10pub struct SyncRequest {
11    pub session_id: String,
12    pub graph_name: Option<String>,
13    pub requesting_instance: String,
14    pub vector_clock: Option<String>,
15}
16
17/// Response from a sync request
18#[derive(Debug, Serialize)]
19pub struct SyncResponse {
20    pub success: bool,
21    pub message: String,
22    pub payload: Option<GraphSyncPayload>,
23}
24
25/// Status of sync for a graph
26#[derive(Debug, Serialize)]
27pub struct SyncStatus {
28    pub session_id: String,
29    pub graph_name: String,
30    pub sync_enabled: bool,
31    pub vector_clock: String,
32    pub last_sync_at: Option<String>,
33    pub pending_changes: usize,
34}
35
36/// Request to enable/disable sync
37#[derive(Debug, Deserialize)]
38pub struct SyncToggleRequest {
39    pub enabled: bool,
40}
41
42/// Conflict information
43#[derive(Debug, Serialize)]
44pub struct ConflictInfo {
45    pub session_id: String,
46    pub entity_type: String,
47    pub entity_id: i64,
48    pub local_version: String,
49    pub remote_version: String,
50    pub detected_at: String,
51}
52
53/// Handle sync request from a peer
54pub async fn handle_sync_request(
55    State(state): State<AppState>,
56    Json(request): Json<SyncRequest>,
57) -> impl IntoResponse {
58    let persistence = state.persistence.clone();
59    let instance_id = crate::api::mesh::MeshClient::generate_instance_id();
60    let sync_engine = SyncEngine::new(persistence.clone(), instance_id);
61
62    // Parse their vector clock
63    let their_vc = if let Some(ref vc_str) = request.vector_clock {
64        match VectorClock::from_json(vc_str) {
65            Ok(vc) => vc,
66            Err(e) => {
67                return (
68                    StatusCode::BAD_REQUEST,
69                    Json(SyncResponse {
70                        success: false,
71                        message: format!("Invalid vector clock: {}", e),
72                        payload: None,
73                    }),
74                )
75            }
76        }
77    } else {
78        VectorClock::new()
79    };
80
81    // Decide sync strategy
82    let sync_type = match sync_engine
83        .decide_sync_strategy(
84            &request.session_id,
85            request.graph_name.as_deref().unwrap_or("default"),
86            &their_vc,
87        )
88        .await
89    {
90        Ok(st) => st,
91        Err(e) => {
92            return (
93                StatusCode::INTERNAL_SERVER_ERROR,
94                Json(SyncResponse {
95                    success: false,
96                    message: format!("Failed to determine sync strategy: {}", e),
97                    payload: None,
98                }),
99            )
100        }
101    };
102
103    // Perform sync based on strategy
104    let payload = match sync_type {
105        SyncType::Full => {
106            match sync_engine
107                .sync_full(
108                    &request.session_id,
109                    request.graph_name.as_deref().unwrap_or("default"),
110                )
111                .await
112            {
113                Ok(p) => p,
114                Err(e) => {
115                    return (
116                        StatusCode::INTERNAL_SERVER_ERROR,
117                        Json(SyncResponse {
118                            success: false,
119                            message: format!("Full sync failed: {}", e),
120                            payload: None,
121                        }),
122                    )
123                }
124            }
125        }
126        SyncType::Incremental => {
127            match sync_engine
128                .sync_incremental(
129                    &request.session_id,
130                    request.graph_name.as_deref().unwrap_or("default"),
131                    &their_vc,
132                )
133                .await
134            {
135                Ok(p) => p,
136                Err(e) => {
137                    return (
138                        StatusCode::INTERNAL_SERVER_ERROR,
139                        Json(SyncResponse {
140                            success: false,
141                            message: format!("Incremental sync failed: {}", e),
142                            payload: None,
143                        }),
144                    )
145                }
146            }
147        }
148        _ => {
149            return (
150                StatusCode::BAD_REQUEST,
151                Json(SyncResponse {
152                    success: false,
153                    message: "Unsupported sync type".to_string(),
154                    payload: None,
155                }),
156            )
157        }
158    };
159
160    (
161        StatusCode::OK,
162        Json(SyncResponse {
163            success: true,
164            message: format!("{:?} sync completed", sync_type),
165            payload: Some(payload),
166        }),
167    )
168}
169
170/// Apply incoming sync data
171pub async fn handle_sync_apply(
172    State(state): State<AppState>,
173    Json(payload): Json<GraphSyncPayload>,
174) -> impl IntoResponse {
175    let persistence = state.persistence.clone();
176    let instance_id = crate::api::mesh::MeshClient::generate_instance_id();
177    let sync_engine = SyncEngine::new(persistence.clone(), instance_id);
178
179    let graph_name = payload.graph_name.as_deref().unwrap_or("default");
180
181    match sync_engine.apply_sync(&payload, graph_name).await {
182        Ok(stats) => (
183            StatusCode::OK,
184            Json(serde_json::json!({
185                "success": true,
186                "message": "Sync applied successfully",
187                "stats": {
188                    "nodes_applied": stats.nodes_applied,
189                    "edges_applied": stats.edges_applied,
190                    "tombstones_applied": stats.tombstones_applied,
191                    "conflicts_detected": stats.conflicts_detected,
192                    "conflicts_resolved": stats.conflicts_resolved,
193                    "sync_type": stats.sync_type
194                }
195            })),
196        ),
197        Err(e) => (
198            StatusCode::INTERNAL_SERVER_ERROR,
199            Json(serde_json::json!({
200                "success": false,
201                "message": format!("Failed to apply sync: {}", e)
202            })),
203        ),
204    }
205}
206
207/// Get sync status for a graph
208pub async fn get_sync_status(
209    State(state): State<AppState>,
210    Path((session_id, graph_name)): Path<(String, String)>,
211) -> impl IntoResponse {
212    let persistence = &state.persistence;
213    let instance_id = crate::api::mesh::MeshClient::generate_instance_id();
214
215    // Check if sync is enabled
216    let sync_enabled = match persistence.graph_get_sync_enabled(&session_id, &graph_name) {
217        Ok(enabled) => enabled,
218        Err(e) => {
219            return (
220                StatusCode::INTERNAL_SERVER_ERROR,
221                Json(serde_json::json!({
222                    "error": format!("Failed to get sync status: {}", e)
223                })),
224            )
225                .into_response()
226        }
227    };
228
229    // Get vector clock
230    let vector_clock =
231        match persistence.graph_sync_state_get(&instance_id, &session_id, &graph_name) {
232            Ok(Some(vc)) => vc,
233            Ok(None) => "{}".to_string(),
234            Err(e) => {
235                return (
236                    StatusCode::INTERNAL_SERVER_ERROR,
237                    Json(serde_json::json!({
238                        "error": format!("Failed to get vector clock: {}", e)
239                    })),
240                )
241                    .into_response()
242            }
243        };
244
245    // Count pending changes (approximate)
246    let since_timestamp = chrono::Utc::now()
247        .checked_sub_signed(chrono::Duration::hours(1))
248        .unwrap()
249        .to_rfc3339();
250
251    let pending_changes = match persistence.graph_changelog_get_since(&session_id, &since_timestamp)
252    {
253        Ok(entries) => entries.len(),
254        Err(_) => 0,
255    };
256
257    (
258        StatusCode::OK,
259        Json(SyncStatus {
260            session_id,
261            graph_name,
262            sync_enabled,
263            vector_clock,
264            last_sync_at: None, // TODO: Track this
265            pending_changes,
266        }),
267    )
268        .into_response()
269}
270
271/// Enable or disable sync for a graph
272pub async fn toggle_sync(
273    State(state): State<AppState>,
274    Path((session_id, graph_name)): Path<(String, String)>,
275    Json(request): Json<SyncToggleRequest>,
276) -> impl IntoResponse {
277    let persistence = &state.persistence;
278
279    match persistence.graph_set_sync_enabled(&session_id, &graph_name, request.enabled) {
280        Ok(_) => (
281            StatusCode::OK,
282            Json(serde_json::json!({
283                "success": true,
284                "message": format!("Sync {} for graph {}/{}",
285                    if request.enabled { "enabled" } else { "disabled" },
286                    session_id, graph_name),
287                "enabled": request.enabled
288            })),
289        ),
290        Err(e) => (
291            StatusCode::INTERNAL_SERVER_ERROR,
292            Json(serde_json::json!({
293                "success": false,
294                "message": format!("Failed to toggle sync: {}", e)
295            })),
296        ),
297    }
298}
299
300/// List all graphs with their sync status
301pub async fn list_sync_configs(
302    State(state): State<AppState>,
303    Path(session_id): Path<String>,
304) -> impl IntoResponse {
305    let persistence = &state.persistence;
306
307    // Get all graphs for this session
308    match persistence.graph_list(&session_id) {
309        Ok(graphs) => {
310            let mut configs = Vec::new();
311            for graph_name in graphs {
312                let sync_enabled = persistence
313                    .graph_get_sync_enabled(&session_id, &graph_name)
314                    .unwrap_or(false);
315
316                configs.push(serde_json::json!({
317                    "graph_name": graph_name,
318                    "sync_enabled": sync_enabled,
319                }));
320            }
321
322            (
323                StatusCode::OK,
324                Json(serde_json::json!({
325                    "success": true,
326                    "session_id": session_id,
327                    "graphs": configs
328                })),
329            )
330        }
331        Err(e) => (
332            StatusCode::INTERNAL_SERVER_ERROR,
333            Json(serde_json::json!({
334                "success": false,
335                "message": format!("Failed to list sync configs: {}", e)
336            })),
337        ),
338    }
339}
340
341/// Bulk enable/disable sync for multiple graphs
342#[derive(Debug, Deserialize)]
343pub struct BulkSyncRequest {
344    pub graphs: Vec<String>,
345    pub enabled: bool,
346}
347
348pub async fn bulk_toggle_sync(
349    State(state): State<AppState>,
350    Path(session_id): Path<String>,
351    Json(request): Json<BulkSyncRequest>,
352) -> impl IntoResponse {
353    let persistence = &state.persistence;
354    let mut results = Vec::new();
355    let mut failed = Vec::new();
356
357    for graph_name in &request.graphs {
358        match persistence.graph_set_sync_enabled(&session_id, graph_name, request.enabled) {
359            Ok(_) => results.push(graph_name.clone()),
360            Err(e) => failed.push(serde_json::json!({
361                "graph": graph_name,
362                "error": e.to_string()
363            })),
364        }
365    }
366
367    (
368        StatusCode::OK,
369        Json(serde_json::json!({
370            "success": failed.is_empty(),
371            "message": format!("Sync {} for {} graphs",
372                if request.enabled { "enabled" } else { "disabled" },
373                results.len()),
374            "updated": results,
375            "failed": failed
376        })),
377    )
378}
379
380/// Configure sync parameters for a graph
381#[derive(Debug, Deserialize)]
382pub struct SyncConfig {
383    pub sync_enabled: bool,
384    pub conflict_resolution_strategy: Option<String>, // "vector_clock", "last_write_wins", "manual"
385    pub sync_interval_seconds: Option<u64>,
386}
387
388pub async fn configure_sync(
389    State(state): State<AppState>,
390    Path((session_id, graph_name)): Path<(String, String)>,
391    Json(config): Json<SyncConfig>,
392) -> impl IntoResponse {
393    let persistence = &state.persistence;
394
395    // First set the enabled status
396    match persistence.graph_set_sync_enabled(&session_id, &graph_name, config.sync_enabled) {
397        Ok(_) => {
398            // TODO: Store additional configuration parameters
399            // For now, we'll just acknowledge them
400            (
401                StatusCode::OK,
402                Json(serde_json::json!({
403                    "success": true,
404                    "message": format!("Sync configuration updated for graph {}/{}", session_id, graph_name),
405                    "config": {
406                        "sync_enabled": config.sync_enabled,
407                        "conflict_resolution_strategy": config.conflict_resolution_strategy.unwrap_or_else(|| "vector_clock".to_string()),
408                        "sync_interval_seconds": config.sync_interval_seconds.unwrap_or(60),
409                    }
410                })),
411            )
412        }
413        Err(e) => (
414            StatusCode::INTERNAL_SERVER_ERROR,
415            Json(serde_json::json!({
416                "success": false,
417                "message": format!("Failed to configure sync: {}", e)
418            })),
419        ),
420    }
421}
422
423/// List unresolved conflicts
424pub async fn list_conflicts(State(_state): State<AppState>) -> impl IntoResponse {
425    // TODO: Implement conflict tracking
426    // For now, return empty list
427    let conflicts: Vec<ConflictInfo> = Vec::new();
428
429    (StatusCode::OK, Json(conflicts))
430}