use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::spec_ai_knowledge_graph::{EdgeType, NodeType, VectorClock};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum SyncType {
RequestFull,
RequestIncremental,
Full,
Incremental,
Ack,
Conflict,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphSyncPayload {
pub sync_type: SyncType,
pub session_id: String,
pub graph_name: Option<String>,
pub vector_clock: VectorClock,
#[serde(default)]
pub nodes: Vec<SyncedNode>,
#[serde(default)]
pub edges: Vec<SyncedEdge>,
#[serde(default)]
pub tombstones: Vec<Tombstone>,
pub correlation_id: Option<String>,
pub conflict_info: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncedNode {
pub id: i64,
pub session_id: String,
pub node_type: NodeType,
pub label: String,
pub properties: serde_json::Value,
pub embedding_id: Option<i64>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub vector_clock: VectorClock,
pub last_modified_by: Option<String>,
pub is_deleted: bool,
pub sync_enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncedEdge {
pub id: i64,
pub session_id: String,
pub source_id: i64,
pub target_id: i64,
pub edge_type: EdgeType,
pub predicate: Option<String>,
pub properties: Option<serde_json::Value>,
pub weight: f32,
pub temporal_start: Option<DateTime<Utc>>,
pub temporal_end: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub vector_clock: VectorClock,
pub last_modified_by: Option<String>,
pub is_deleted: bool,
pub sync_enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Tombstone {
pub entity_type: String,
pub entity_id: i64,
pub vector_clock: VectorClock,
pub deleted_by: String,
pub deleted_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncFullRequest {
pub session_id: String,
pub graph_name: Option<String>,
pub requesting_instance: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncIncrementalRequest {
pub session_id: String,
pub graph_name: Option<String>,
pub requesting_instance: String,
pub since_vector_clock: VectorClock,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResponse {
pub session_id: String,
pub graph_name: Option<String>,
pub vector_clock: VectorClock,
pub nodes: Vec<SyncedNode>,
pub edges: Vec<SyncedEdge>,
pub tombstones: Vec<Tombstone>,
pub is_incremental: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncAck {
pub session_id: String,
pub graph_name: Option<String>,
pub vector_clock: VectorClock,
pub nodes_applied: usize,
pub edges_applied: usize,
pub tombstones_applied: usize,
pub conflicts_detected: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncConflict {
pub session_id: String,
pub graph_name: Option<String>,
pub entity_type: String,
pub entity_id: i64,
pub local_vector_clock: VectorClock,
pub remote_vector_clock: VectorClock,
pub description: String,
}
impl GraphSyncPayload {
pub fn request_full(
session_id: String,
graph_name: Option<String>,
requesting_instance: String,
) -> Self {
let mut vector_clock = VectorClock::new();
vector_clock.increment(&requesting_instance);
Self {
sync_type: SyncType::RequestFull,
session_id,
graph_name,
vector_clock,
nodes: Vec::new(),
edges: Vec::new(),
tombstones: Vec::new(),
correlation_id: Some(uuid::Uuid::new_v4().to_string()),
conflict_info: None,
}
}
pub fn request_incremental(
session_id: String,
graph_name: Option<String>,
requesting_instance: String,
since_vector_clock: VectorClock,
) -> Self {
let mut vector_clock = since_vector_clock.clone();
vector_clock.increment(&requesting_instance);
Self {
sync_type: SyncType::RequestIncremental,
session_id,
graph_name,
vector_clock,
nodes: Vec::new(),
edges: Vec::new(),
tombstones: Vec::new(),
correlation_id: Some(uuid::Uuid::new_v4().to_string()),
conflict_info: None,
}
}
pub fn response_full(
session_id: String,
graph_name: Option<String>,
vector_clock: VectorClock,
nodes: Vec<SyncedNode>,
edges: Vec<SyncedEdge>,
tombstones: Vec<Tombstone>,
correlation_id: Option<String>,
) -> Self {
Self {
sync_type: SyncType::Full,
session_id,
graph_name,
vector_clock,
nodes,
edges,
tombstones,
correlation_id,
conflict_info: None,
}
}
pub fn response_incremental(
session_id: String,
graph_name: Option<String>,
vector_clock: VectorClock,
nodes: Vec<SyncedNode>,
edges: Vec<SyncedEdge>,
tombstones: Vec<Tombstone>,
correlation_id: Option<String>,
) -> Self {
Self {
sync_type: SyncType::Incremental,
session_id,
graph_name,
vector_clock,
nodes,
edges,
tombstones,
correlation_id,
conflict_info: None,
}
}
#[allow(clippy::too_many_arguments)]
pub fn ack(
session_id: String,
graph_name: Option<String>,
vector_clock: VectorClock,
nodes_applied: usize,
edges_applied: usize,
tombstones_applied: usize,
conflicts_detected: usize,
correlation_id: Option<String>,
) -> Self {
Self {
sync_type: SyncType::Ack,
session_id,
graph_name,
vector_clock,
nodes: Vec::new(),
edges: Vec::new(),
tombstones: Vec::new(),
correlation_id,
conflict_info: Some(format!(
"Applied {}/{}/{} (nodes/edges/tombstones), {} conflicts",
nodes_applied, edges_applied, tombstones_applied, conflicts_detected
)),
}
}
pub fn conflict(
session_id: String,
graph_name: Option<String>,
entity_type: String,
entity_id: i64,
local_vector_clock: VectorClock,
remote_vector_clock: VectorClock,
correlation_id: Option<String>,
) -> Self {
Self {
sync_type: SyncType::Conflict,
session_id,
graph_name,
vector_clock: local_vector_clock.clone(),
nodes: Vec::new(),
edges: Vec::new(),
tombstones: Vec::new(),
correlation_id,
conflict_info: Some(format!(
"Conflict detected for {} {}: local={}, remote={}",
entity_type, entity_id, local_vector_clock, remote_vector_clock
)),
}
}
}
impl Tombstone {
pub fn new(
entity_type: String,
entity_id: i64,
vector_clock: VectorClock,
deleted_by: String,
) -> Self {
Self {
entity_type,
entity_id,
vector_clock,
deleted_by,
deleted_at: Utc::now(),
}
}
}