1use crate::spec_ai_api::api::handlers::AppState;
2use axum::extract::{Json, Path, State};
3use axum::http::StatusCode;
4use axum::response::IntoResponse;
5use serde::{Deserialize, Serialize};
6use crate::spec_ai_core::sync::{
7 GraphSyncPayload, SyncEngine, SyncPersistenceAdapter, SyncType, VectorClock,
8};
9
10#[derive(Debug, Deserialize)]
12pub struct SyncRequest {
13 pub session_id: String,
14 pub graph_name: Option<String>,
15 pub requesting_instance: String,
16 pub vector_clock: Option<String>,
17}
18
19#[derive(Debug, Serialize)]
21pub struct SyncResponse {
22 pub success: bool,
23 pub message: String,
24 pub payload: Option<GraphSyncPayload>,
25}
26
27#[derive(Debug, Serialize)]
29pub struct SyncStatus {
30 pub session_id: String,
31 pub graph_name: String,
32 pub sync_enabled: bool,
33 pub vector_clock: String,
34 pub last_sync_at: Option<String>,
35 pub pending_changes: usize,
36}
37
38#[derive(Debug, Deserialize)]
40pub struct SyncToggleRequest {
41 pub enabled: bool,
42}
43
44#[derive(Debug, Serialize)]
46pub struct ConflictInfo {
47 pub session_id: String,
48 pub graph_name: Option<String>,
49 pub entity_type: String,
50 pub entity_id: i64,
51 pub resolution: Option<String>,
52 pub local_version: String,
53 pub remote_version: String,
54 pub detected_at: String,
55}
56
57pub async fn handle_sync_request(
59 State(state): State<AppState>,
60 Json(request): Json<SyncRequest>,
61) -> impl IntoResponse {
62 let persistence = state.persistence.clone();
63 let instance_id = persistence.instance_id().to_string();
64 let adapter = SyncPersistenceAdapter::new(persistence.clone());
65 let sync_engine = SyncEngine::new(adapter, instance_id);
66
67 let their_vc = if let Some(ref vc_str) = request.vector_clock {
69 match VectorClock::from_json(vc_str) {
70 Ok(vc) => vc,
71 Err(e) => {
72 return (
73 StatusCode::BAD_REQUEST,
74 Json(SyncResponse {
75 success: false,
76 message: format!("Invalid vector clock: {}", e),
77 payload: None,
78 }),
79 );
80 }
81 }
82 } else {
83 VectorClock::new()
84 };
85
86 let sync_type = match sync_engine
88 .decide_sync_strategy(
89 &request.session_id,
90 request.graph_name.as_deref().unwrap_or("default"),
91 &their_vc,
92 )
93 .await
94 {
95 Ok(st) => st,
96 Err(e) => {
97 return (
98 StatusCode::INTERNAL_SERVER_ERROR,
99 Json(SyncResponse {
100 success: false,
101 message: format!("Failed to determine sync strategy: {}", e),
102 payload: None,
103 }),
104 );
105 }
106 };
107
108 let payload = match sync_type {
110 SyncType::Full => {
111 match sync_engine
112 .sync_full(
113 &request.session_id,
114 request.graph_name.as_deref().unwrap_or("default"),
115 )
116 .await
117 {
118 Ok(p) => p,
119 Err(e) => {
120 return (
121 StatusCode::INTERNAL_SERVER_ERROR,
122 Json(SyncResponse {
123 success: false,
124 message: format!("Full sync failed: {}", e),
125 payload: None,
126 }),
127 );
128 }
129 }
130 }
131 SyncType::Incremental => {
132 match sync_engine
133 .sync_incremental(
134 &request.session_id,
135 request.graph_name.as_deref().unwrap_or("default"),
136 &their_vc,
137 )
138 .await
139 {
140 Ok(p) => p,
141 Err(e) => {
142 return (
143 StatusCode::INTERNAL_SERVER_ERROR,
144 Json(SyncResponse {
145 success: false,
146 message: format!("Incremental sync failed: {}", e),
147 payload: None,
148 }),
149 );
150 }
151 }
152 }
153 _ => {
154 return (
155 StatusCode::BAD_REQUEST,
156 Json(SyncResponse {
157 success: false,
158 message: "Unsupported sync type".to_string(),
159 payload: None,
160 }),
161 );
162 }
163 };
164
165 (
166 StatusCode::OK,
167 Json(SyncResponse {
168 success: true,
169 message: format!("{:?} sync completed", sync_type),
170 payload: Some(payload),
171 }),
172 )
173}
174
175pub async fn handle_sync_apply(
177 State(state): State<AppState>,
178 Json(payload): Json<GraphSyncPayload>,
179) -> impl IntoResponse {
180 let persistence = state.persistence.clone();
181 let instance_id = persistence.instance_id().to_string();
182 let adapter = SyncPersistenceAdapter::new(persistence.clone());
183 let sync_engine = SyncEngine::new(adapter, instance_id);
184
185 let graph_name = payload.graph_name.as_deref().unwrap_or("default");
186
187 match sync_engine.apply_sync(&payload, graph_name).await {
188 Ok(stats) => (
189 StatusCode::OK,
190 Json(serde_json::json!({
191 "success": true,
192 "message": "Sync applied successfully",
193 "stats": {
194 "nodes_applied": stats.nodes_applied,
195 "edges_applied": stats.edges_applied,
196 "tombstones_applied": stats.tombstones_applied,
197 "conflicts_detected": stats.conflicts_detected,
198 "conflicts_resolved": stats.conflicts_resolved,
199 "sync_type": stats.sync_type
200 }
201 })),
202 ),
203 Err(e) => (
204 StatusCode::INTERNAL_SERVER_ERROR,
205 Json(serde_json::json!({
206 "success": false,
207 "message": format!("Failed to apply sync: {}", e)
208 })),
209 ),
210 }
211}
212
213pub async fn get_sync_status(
215 State(state): State<AppState>,
216 Path((session_id, graph_name)): Path<(String, String)>,
217) -> impl IntoResponse {
218 let persistence = &state.persistence;
219 let instance_id = persistence.instance_id().to_string();
220
221 let sync_enabled = match persistence.graph_get_sync_enabled(&session_id, &graph_name) {
223 Ok(enabled) => enabled,
224 Err(e) => {
225 return (
226 StatusCode::INTERNAL_SERVER_ERROR,
227 Json(serde_json::json!({
228 "error": format!("Failed to get sync status: {}", e)
229 })),
230 )
231 .into_response();
232 }
233 };
234
235 let sync_state =
237 match persistence.graph_sync_state_get_metadata(&instance_id, &session_id, &graph_name) {
238 Ok(state) => state,
239 Err(e) => {
240 return (
241 StatusCode::INTERNAL_SERVER_ERROR,
242 Json(serde_json::json!({
243 "error": format!("Failed to get vector clock: {}", e)
244 })),
245 )
246 .into_response();
247 }
248 };
249
250 let vector_clock = sync_state
251 .as_ref()
252 .map(|s| s.vector_clock.clone())
253 .unwrap_or_else(|| "{}".to_string());
254 let last_sync_at = sync_state.and_then(|s| s.last_sync_at);
255
256 let since_timestamp = chrono::Utc::now()
258 .checked_sub_signed(chrono::Duration::hours(1))
259 .unwrap()
260 .to_rfc3339();
261
262 let pending_changes = match persistence.graph_changelog_get_since(&session_id, &since_timestamp)
263 {
264 Ok(entries) => entries.len(),
265 Err(_) => 0,
266 };
267
268 (
269 StatusCode::OK,
270 Json(SyncStatus {
271 session_id,
272 graph_name,
273 sync_enabled,
274 vector_clock,
275 last_sync_at,
276 pending_changes,
277 }),
278 )
279 .into_response()
280}
281
282pub async fn toggle_sync(
284 State(state): State<AppState>,
285 Path((session_id, graph_name)): Path<(String, String)>,
286 Json(request): Json<SyncToggleRequest>,
287) -> impl IntoResponse {
288 let persistence = &state.persistence;
289
290 match persistence.graph_set_sync_enabled(&session_id, &graph_name, request.enabled) {
291 Ok(_) => (
292 StatusCode::OK,
293 Json(serde_json::json!({
294 "success": true,
295 "message": format!("Sync {} for graph {}/{}",
296 if request.enabled { "enabled" } else { "disabled" },
297 session_id, graph_name),
298 "enabled": request.enabled
299 })),
300 ),
301 Err(e) => (
302 StatusCode::INTERNAL_SERVER_ERROR,
303 Json(serde_json::json!({
304 "success": false,
305 "message": format!("Failed to toggle sync: {}", e)
306 })),
307 ),
308 }
309}
310
311pub async fn list_sync_configs(
313 State(state): State<AppState>,
314 Path(session_id): Path<String>,
315) -> impl IntoResponse {
316 let persistence = &state.persistence;
317 let instance_id = persistence.instance_id().to_string();
318
319 match persistence.graph_list(&session_id) {
321 Ok(graphs) => {
322 let mut configs = Vec::new();
323 for graph_name in graphs {
324 let config = match persistence.graph_get_sync_config(&session_id, &graph_name) {
325 Ok(cfg) => cfg,
326 Err(e) => {
327 tracing::warn!(
328 "Failed to load sync config for {}/{}: {}",
329 session_id,
330 graph_name,
331 e
332 );
333 Default::default()
334 }
335 };
336
337 let last_sync_at = match persistence.graph_sync_state_get_metadata(
338 &instance_id,
339 &session_id,
340 &graph_name,
341 ) {
342 Ok(state) => state.and_then(|s| s.last_sync_at),
343 Err(e) => {
344 tracing::warn!(
345 "Failed to load sync state for {}/{}: {}",
346 session_id,
347 graph_name,
348 e
349 );
350 None
351 }
352 };
353
354 configs.push(serde_json::json!({
355 "graph_name": graph_name,
356 "sync_enabled": config.sync_enabled,
357 "conflict_resolution_strategy": config.conflict_resolution_strategy.unwrap_or_else(|| "vector_clock".to_string()),
358 "sync_interval_seconds": config.sync_interval_seconds.unwrap_or(60),
359 "last_sync_at": last_sync_at,
360 }));
361 }
362
363 (
364 StatusCode::OK,
365 Json(serde_json::json!({
366 "success": true,
367 "session_id": session_id,
368 "graphs": configs
369 })),
370 )
371 }
372 Err(e) => (
373 StatusCode::INTERNAL_SERVER_ERROR,
374 Json(serde_json::json!({
375 "success": false,
376 "message": format!("Failed to list sync configs: {}", e)
377 })),
378 ),
379 }
380}
381
382#[derive(Debug, Deserialize)]
384pub struct BulkSyncRequest {
385 pub graphs: Vec<String>,
386 pub enabled: bool,
387}
388
389pub async fn bulk_toggle_sync(
390 State(state): State<AppState>,
391 Path(session_id): Path<String>,
392 Json(request): Json<BulkSyncRequest>,
393) -> impl IntoResponse {
394 let persistence = &state.persistence;
395 let mut results = Vec::new();
396 let mut failed = Vec::new();
397
398 for graph_name in &request.graphs {
399 match persistence.graph_set_sync_enabled(&session_id, graph_name, request.enabled) {
400 Ok(_) => results.push(graph_name.clone()),
401 Err(e) => failed.push(serde_json::json!({
402 "graph": graph_name,
403 "error": e.to_string()
404 })),
405 }
406 }
407
408 (
409 StatusCode::OK,
410 Json(serde_json::json!({
411 "success": failed.is_empty(),
412 "message": format!("Sync {} for {} graphs",
413 if request.enabled { "enabled" } else { "disabled" },
414 results.len()),
415 "updated": results,
416 "failed": failed
417 })),
418 )
419}
420
421#[derive(Debug, Deserialize)]
423pub struct SyncConfig {
424 pub sync_enabled: bool,
425 pub conflict_resolution_strategy: Option<String>, pub sync_interval_seconds: Option<u64>,
427}
428
429pub async fn configure_sync(
430 State(state): State<AppState>,
431 Path((session_id, graph_name)): Path<(String, String)>,
432 Json(config): Json<SyncConfig>,
433) -> impl IntoResponse {
434 let persistence = &state.persistence;
435
436 match persistence.graph_set_sync_config(
437 &session_id,
438 &graph_name,
439 config.sync_enabled,
440 config.conflict_resolution_strategy.as_deref(),
441 config.sync_interval_seconds,
442 ) {
443 Ok(saved) => (
444 StatusCode::OK,
445 Json(serde_json::json!({
446 "success": true,
447 "message": format!("Sync configuration updated for graph {}/{}", session_id, graph_name),
448 "config": {
449 "sync_enabled": saved.sync_enabled,
450 "conflict_resolution_strategy": saved.conflict_resolution_strategy.unwrap_or_else(|| "vector_clock".to_string()),
451 "sync_interval_seconds": saved.sync_interval_seconds.unwrap_or(60),
452 }
453 })),
454 ),
455 Err(e) => (
456 StatusCode::INTERNAL_SERVER_ERROR,
457 Json(serde_json::json!({
458 "success": false,
459 "message": format!("Failed to configure sync: {}", e)
460 })),
461 ),
462 }
463}
464
465pub async fn list_conflicts(State(state): State<AppState>) -> impl IntoResponse {
467 match state.persistence.graph_list_conflicts(None) {
468 Ok(entries) => {
469 let conflicts: Vec<ConflictInfo> = entries
470 .into_iter()
471 .map(|entry| {
472 let parsed_data = entry
473 .data
474 .as_deref()
475 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok());
476
477 let local_version = parsed_data
478 .as_ref()
479 .and_then(|val| val.get("local_version"))
480 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
481 .unwrap_or_else(|| "null".to_string());
482
483 let remote_version = parsed_data
484 .as_ref()
485 .and_then(|val| val.get("remote_version"))
486 .map(|v| serde_json::to_string(v).unwrap_or_else(|_| "null".to_string()))
487 .unwrap_or_else(|| "null".to_string());
488
489 ConflictInfo {
490 session_id: entry.session_id,
491 graph_name: parsed_data
492 .as_ref()
493 .and_then(|val| val.get("graph_name"))
494 .and_then(|v| v.as_str().map(|s| s.to_string())),
495 entity_type: entry.entity_type,
496 entity_id: entry.entity_id,
497 resolution: parsed_data
498 .as_ref()
499 .and_then(|val| val.get("resolution"))
500 .and_then(|v| v.as_str().map(|s| s.to_string())),
501 local_version,
502 remote_version,
503 detected_at: entry.created_at.to_rfc3339(),
504 }
505 })
506 .collect();
507
508 (StatusCode::OK, Json(conflicts)).into_response()
509 }
510 Err(e) => (
511 StatusCode::INTERNAL_SERVER_ERROR,
512 Json(serde_json::json!({
513 "error": format!("Failed to list conflicts: {}", e)
514 })),
515 )
516 .into_response(),
517 }
518}