1use crate::api::handlers::AppState;
2use axum::extract::{Json, Path, State};
3use axum::http::StatusCode;
4use axum::response::IntoResponse;
5use serde::{Deserialize, Serialize};
6use spec_ai_core::sync::{GraphSyncPayload, SyncEngine, SyncType, VectorClock};
7
8#[derive(Debug, Deserialize)]
10pub struct SyncRequest {
11 pub session_id: String,
12 pub graph_name: Option<String>,
13 pub requesting_instance: String,
14 pub vector_clock: Option<String>,
15}
16
17#[derive(Debug, Serialize)]
19pub struct SyncResponse {
20 pub success: bool,
21 pub message: String,
22 pub payload: Option<GraphSyncPayload>,
23}
24
25#[derive(Debug, Serialize)]
27pub struct SyncStatus {
28 pub session_id: String,
29 pub graph_name: String,
30 pub sync_enabled: bool,
31 pub vector_clock: String,
32 pub last_sync_at: Option<String>,
33 pub pending_changes: usize,
34}
35
36#[derive(Debug, Deserialize)]
38pub struct SyncToggleRequest {
39 pub enabled: bool,
40}
41
42#[derive(Debug, Serialize)]
44pub struct ConflictInfo {
45 pub session_id: String,
46 pub entity_type: String,
47 pub entity_id: i64,
48 pub local_version: String,
49 pub remote_version: String,
50 pub detected_at: String,
51}
52
53pub async fn handle_sync_request(
55 State(state): State<AppState>,
56 Json(request): Json<SyncRequest>,
57) -> impl IntoResponse {
58 let persistence = state.persistence.clone();
59 let instance_id = crate::api::mesh::MeshClient::generate_instance_id();
60 let sync_engine = SyncEngine::new(persistence.clone(), instance_id);
61
62 let their_vc = if let Some(ref vc_str) = request.vector_clock {
64 match VectorClock::from_json(vc_str) {
65 Ok(vc) => vc,
66 Err(e) => {
67 return (
68 StatusCode::BAD_REQUEST,
69 Json(SyncResponse {
70 success: false,
71 message: format!("Invalid vector clock: {}", e),
72 payload: None,
73 }),
74 )
75 }
76 }
77 } else {
78 VectorClock::new()
79 };
80
81 let sync_type = match sync_engine
83 .decide_sync_strategy(
84 &request.session_id,
85 request.graph_name.as_deref().unwrap_or("default"),
86 &their_vc,
87 )
88 .await
89 {
90 Ok(st) => st,
91 Err(e) => {
92 return (
93 StatusCode::INTERNAL_SERVER_ERROR,
94 Json(SyncResponse {
95 success: false,
96 message: format!("Failed to determine sync strategy: {}", e),
97 payload: None,
98 }),
99 )
100 }
101 };
102
103 let payload = match sync_type {
105 SyncType::Full => {
106 match sync_engine
107 .sync_full(
108 &request.session_id,
109 request.graph_name.as_deref().unwrap_or("default"),
110 )
111 .await
112 {
113 Ok(p) => p,
114 Err(e) => {
115 return (
116 StatusCode::INTERNAL_SERVER_ERROR,
117 Json(SyncResponse {
118 success: false,
119 message: format!("Full sync failed: {}", e),
120 payload: None,
121 }),
122 )
123 }
124 }
125 }
126 SyncType::Incremental => {
127 match sync_engine
128 .sync_incremental(
129 &request.session_id,
130 request.graph_name.as_deref().unwrap_or("default"),
131 &their_vc,
132 )
133 .await
134 {
135 Ok(p) => p,
136 Err(e) => {
137 return (
138 StatusCode::INTERNAL_SERVER_ERROR,
139 Json(SyncResponse {
140 success: false,
141 message: format!("Incremental sync failed: {}", e),
142 payload: None,
143 }),
144 )
145 }
146 }
147 }
148 _ => {
149 return (
150 StatusCode::BAD_REQUEST,
151 Json(SyncResponse {
152 success: false,
153 message: "Unsupported sync type".to_string(),
154 payload: None,
155 }),
156 )
157 }
158 };
159
160 (
161 StatusCode::OK,
162 Json(SyncResponse {
163 success: true,
164 message: format!("{:?} sync completed", sync_type),
165 payload: Some(payload),
166 }),
167 )
168}
169
170pub async fn handle_sync_apply(
172 State(state): State<AppState>,
173 Json(payload): Json<GraphSyncPayload>,
174) -> impl IntoResponse {
175 let persistence = state.persistence.clone();
176 let instance_id = crate::api::mesh::MeshClient::generate_instance_id();
177 let sync_engine = SyncEngine::new(persistence.clone(), instance_id);
178
179 let graph_name = payload.graph_name.as_deref().unwrap_or("default");
180
181 match sync_engine.apply_sync(&payload, graph_name).await {
182 Ok(stats) => (
183 StatusCode::OK,
184 Json(serde_json::json!({
185 "success": true,
186 "message": "Sync applied successfully",
187 "stats": {
188 "nodes_applied": stats.nodes_applied,
189 "edges_applied": stats.edges_applied,
190 "tombstones_applied": stats.tombstones_applied,
191 "conflicts_detected": stats.conflicts_detected,
192 "conflicts_resolved": stats.conflicts_resolved,
193 "sync_type": stats.sync_type
194 }
195 })),
196 ),
197 Err(e) => (
198 StatusCode::INTERNAL_SERVER_ERROR,
199 Json(serde_json::json!({
200 "success": false,
201 "message": format!("Failed to apply sync: {}", e)
202 })),
203 ),
204 }
205}
206
207pub async fn get_sync_status(
209 State(state): State<AppState>,
210 Path((session_id, graph_name)): Path<(String, String)>,
211) -> impl IntoResponse {
212 let persistence = &state.persistence;
213 let instance_id = crate::api::mesh::MeshClient::generate_instance_id();
214
215 let sync_enabled = match persistence.graph_get_sync_enabled(&session_id, &graph_name) {
217 Ok(enabled) => enabled,
218 Err(e) => {
219 return (
220 StatusCode::INTERNAL_SERVER_ERROR,
221 Json(serde_json::json!({
222 "error": format!("Failed to get sync status: {}", e)
223 })),
224 )
225 .into_response()
226 }
227 };
228
229 let vector_clock =
231 match persistence.graph_sync_state_get(&instance_id, &session_id, &graph_name) {
232 Ok(Some(vc)) => vc,
233 Ok(None) => "{}".to_string(),
234 Err(e) => {
235 return (
236 StatusCode::INTERNAL_SERVER_ERROR,
237 Json(serde_json::json!({
238 "error": format!("Failed to get vector clock: {}", e)
239 })),
240 )
241 .into_response()
242 }
243 };
244
245 let since_timestamp = chrono::Utc::now()
247 .checked_sub_signed(chrono::Duration::hours(1))
248 .unwrap()
249 .to_rfc3339();
250
251 let pending_changes = match persistence.graph_changelog_get_since(&session_id, &since_timestamp)
252 {
253 Ok(entries) => entries.len(),
254 Err(_) => 0,
255 };
256
257 (
258 StatusCode::OK,
259 Json(SyncStatus {
260 session_id,
261 graph_name,
262 sync_enabled,
263 vector_clock,
264 last_sync_at: None, pending_changes,
266 }),
267 )
268 .into_response()
269}
270
271pub async fn toggle_sync(
273 State(state): State<AppState>,
274 Path((session_id, graph_name)): Path<(String, String)>,
275 Json(request): Json<SyncToggleRequest>,
276) -> impl IntoResponse {
277 let persistence = &state.persistence;
278
279 match persistence.graph_set_sync_enabled(&session_id, &graph_name, request.enabled) {
280 Ok(_) => (
281 StatusCode::OK,
282 Json(serde_json::json!({
283 "success": true,
284 "message": format!("Sync {} for graph {}/{}",
285 if request.enabled { "enabled" } else { "disabled" },
286 session_id, graph_name),
287 "enabled": request.enabled
288 })),
289 ),
290 Err(e) => (
291 StatusCode::INTERNAL_SERVER_ERROR,
292 Json(serde_json::json!({
293 "success": false,
294 "message": format!("Failed to toggle sync: {}", e)
295 })),
296 ),
297 }
298}
299
300pub async fn list_sync_configs(
302 State(state): State<AppState>,
303 Path(session_id): Path<String>,
304) -> impl IntoResponse {
305 let persistence = &state.persistence;
306
307 match persistence.graph_list(&session_id) {
309 Ok(graphs) => {
310 let mut configs = Vec::new();
311 for graph_name in graphs {
312 let sync_enabled = persistence
313 .graph_get_sync_enabled(&session_id, &graph_name)
314 .unwrap_or(false);
315
316 configs.push(serde_json::json!({
317 "graph_name": graph_name,
318 "sync_enabled": sync_enabled,
319 }));
320 }
321
322 (
323 StatusCode::OK,
324 Json(serde_json::json!({
325 "success": true,
326 "session_id": session_id,
327 "graphs": configs
328 })),
329 )
330 }
331 Err(e) => (
332 StatusCode::INTERNAL_SERVER_ERROR,
333 Json(serde_json::json!({
334 "success": false,
335 "message": format!("Failed to list sync configs: {}", e)
336 })),
337 ),
338 }
339}
340
341#[derive(Debug, Deserialize)]
343pub struct BulkSyncRequest {
344 pub graphs: Vec<String>,
345 pub enabled: bool,
346}
347
348pub async fn bulk_toggle_sync(
349 State(state): State<AppState>,
350 Path(session_id): Path<String>,
351 Json(request): Json<BulkSyncRequest>,
352) -> impl IntoResponse {
353 let persistence = &state.persistence;
354 let mut results = Vec::new();
355 let mut failed = Vec::new();
356
357 for graph_name in &request.graphs {
358 match persistence.graph_set_sync_enabled(&session_id, graph_name, request.enabled) {
359 Ok(_) => results.push(graph_name.clone()),
360 Err(e) => failed.push(serde_json::json!({
361 "graph": graph_name,
362 "error": e.to_string()
363 })),
364 }
365 }
366
367 (
368 StatusCode::OK,
369 Json(serde_json::json!({
370 "success": failed.is_empty(),
371 "message": format!("Sync {} for {} graphs",
372 if request.enabled { "enabled" } else { "disabled" },
373 results.len()),
374 "updated": results,
375 "failed": failed
376 })),
377 )
378}
379
380#[derive(Debug, Deserialize)]
382pub struct SyncConfig {
383 pub sync_enabled: bool,
384 pub conflict_resolution_strategy: Option<String>, pub sync_interval_seconds: Option<u64>,
386}
387
388pub async fn configure_sync(
389 State(state): State<AppState>,
390 Path((session_id, graph_name)): Path<(String, String)>,
391 Json(config): Json<SyncConfig>,
392) -> impl IntoResponse {
393 let persistence = &state.persistence;
394
395 match persistence.graph_set_sync_enabled(&session_id, &graph_name, config.sync_enabled) {
397 Ok(_) => {
398 (
401 StatusCode::OK,
402 Json(serde_json::json!({
403 "success": true,
404 "message": format!("Sync configuration updated for graph {}/{}", session_id, graph_name),
405 "config": {
406 "sync_enabled": config.sync_enabled,
407 "conflict_resolution_strategy": config.conflict_resolution_strategy.unwrap_or_else(|| "vector_clock".to_string()),
408 "sync_interval_seconds": config.sync_interval_seconds.unwrap_or(60),
409 }
410 })),
411 )
412 }
413 Err(e) => (
414 StatusCode::INTERNAL_SERVER_ERROR,
415 Json(serde_json::json!({
416 "success": false,
417 "message": format!("Failed to configure sync: {}", e)
418 })),
419 ),
420 }
421}
422
423pub async fn list_conflicts(State(_state): State<AppState>) -> impl IntoResponse {
425 let conflicts: Vec<ConflictInfo> = Vec::new();
428
429 (StatusCode::OK, Json(conflicts))
430}