use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::{debug, error, info, warn};
use crate::api::mesh::{MeshClient, MeshRegistry};
use crate::persistence::Persistence;
use spec_ai_core::sync::{GraphSyncPayload, SyncEngine};
#[derive(Debug, Clone)]
pub struct SyncCoordinatorConfig {
pub sync_interval_secs: u64,
pub max_concurrent_syncs: usize,
pub retry_interval_secs: u64,
pub max_retries: usize,
}
impl Default for SyncCoordinatorConfig {
fn default() -> Self {
Self {
sync_interval_secs: 60, max_concurrent_syncs: 3, retry_interval_secs: 300, max_retries: 3, }
}
}
#[derive(Clone)]
pub struct SyncCoordinator {
persistence: Arc<Persistence>,
mesh_registry: Arc<MeshRegistry>,
mesh_client: Arc<MeshClient>,
config: SyncCoordinatorConfig,
instance_id: String,
}
impl SyncCoordinator {
pub fn new(
persistence: Arc<Persistence>,
mesh_registry: Arc<MeshRegistry>,
mesh_client: Arc<MeshClient>,
config: SyncCoordinatorConfig,
) -> Self {
let instance_id = MeshClient::generate_instance_id();
Self {
persistence,
mesh_registry,
mesh_client,
config,
instance_id,
}
}
pub async fn start(self: Arc<Self>) {
info!(
"Starting sync coordinator with interval {} seconds",
self.config.sync_interval_secs
);
let mut interval = time::interval(Duration::from_secs(self.config.sync_interval_secs));
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = self.run_sync_cycle().await {
error!("Sync cycle failed: {}", e);
}
}
}
async fn run_sync_cycle(&self) -> Result<()> {
debug!("Starting sync cycle");
let sessions = self.get_sync_enabled_sessions()?;
if sessions.is_empty() {
debug!("No sync-enabled graphs found");
return Ok(());
}
let peers = self.mesh_registry.list().await;
if peers.is_empty() {
debug!("No active peers found in mesh");
return Ok(());
}
let semaphore = Arc::new(tokio::sync::Semaphore::new(
self.config.max_concurrent_syncs,
));
let mut sync_tasks = Vec::new();
for (session_id, graph_name) in sessions {
if !self.should_sync(&session_id, &graph_name)? {
continue;
}
for peer in &peers {
if peer.instance_id == self.instance_id {
continue; }
let permit = semaphore.clone().acquire_owned().await?;
let self_clone = self.clone();
let session_id = session_id.clone();
let graph_name = graph_name.clone();
let peer_id = peer.instance_id.clone();
let peer_url = format!("http://{}:{}", peer.hostname, peer.port);
let task = tokio::spawn(async move {
let _permit = permit;
match self_clone
.sync_with_peer(&session_id, &graph_name, &peer_id, &peer_url)
.await
{
Ok(_) => {
info!(
"Successfully synced {}/{} with peer {}",
session_id, graph_name, peer_id
);
}
Err(e) => {
warn!(
"Failed to sync {}/{} with peer {}: {}",
session_id, graph_name, peer_id, e
);
}
}
});
sync_tasks.push(task);
}
}
for task in sync_tasks {
let _ = task.await;
}
debug!("Sync cycle completed");
Ok(())
}
fn get_sync_enabled_sessions(&self) -> Result<Vec<(String, String)>> {
let mut sessions = Vec::new();
sessions.push(("default_session".to_string(), "default".to_string()));
Ok(sessions)
}
fn should_sync(&self, session_id: &str, graph_name: &str) -> Result<bool> {
let sync_enabled = self
.persistence
.graph_get_sync_enabled(session_id, graph_name)?;
if !sync_enabled {
return Ok(false);
}
let since = chrono::Utc::now()
.checked_sub_signed(chrono::Duration::seconds(
self.config.sync_interval_secs as i64,
))
.unwrap()
.to_rfc3339();
let changes = self
.persistence
.graph_changelog_get_since(session_id, &since)?;
Ok(!changes.is_empty())
}
async fn sync_with_peer(
&self,
session_id: &str,
graph_name: &str,
peer_id: &str,
peer_url: &str,
) -> Result<()> {
debug!(
"Syncing {}/{} with peer {} at {}",
session_id, graph_name, peer_id, peer_url
);
let sync_engine = SyncEngine::new((*self.persistence).clone(), self.instance_id.clone());
let our_vc = self
.persistence
.graph_sync_state_get(&self.instance_id, session_id, graph_name)?
.unwrap_or_else(|| "{}".to_string());
let sync_request = serde_json::json!({
"session_id": session_id,
"graph_name": graph_name,
"requesting_instance": self.instance_id,
"vector_clock": our_vc,
});
let client = reqwest::Client::new();
let response = client
.post(format!("{}/sync/request", peer_url))
.json(&sync_request)
.timeout(Duration::from_secs(30))
.send()
.await?;
if !response.status().is_success() {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(anyhow::anyhow!("Sync request failed: {}", error_text));
}
let sync_response: serde_json::Value = response.json().await?;
if let Some(payload) = sync_response.get("payload") {
let sync_payload: GraphSyncPayload = serde_json::from_value(payload.clone())?;
let stats = sync_engine.apply_sync(&sync_payload, graph_name).await?;
info!(
"Applied sync from peer {}: {} nodes, {} edges, {} conflicts",
peer_id, stats.nodes_applied, stats.edges_applied, stats.conflicts_detected
);
}
Ok(())
}
pub async fn shutdown(&self) {
info!("Shutting down sync coordinator");
}
}
pub async fn start_sync_coordinator(
persistence: Arc<Persistence>,
mesh_registry: Arc<MeshRegistry>,
mesh_client: Arc<MeshClient>,
config: SyncCoordinatorConfig,
) -> tokio::task::JoinHandle<()> {
let coordinator = Arc::new(SyncCoordinator::new(
persistence,
mesh_registry,
mesh_client,
config,
));
tokio::spawn(async move {
coordinator.start().await;
})
}