use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::embedded::{EmbeddedCore, Query};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncReport {
pub pushed: usize,
pub pulled: usize,
pub conflicts: Vec<SyncConflict>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncConflict {
pub entity_id: String,
pub conflict_type: ConflictType,
pub description: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ConflictType {
NodePropertyConflict,
EdgeContradiction,
DuplicateNode,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SyncPreview {
pub will_push: usize,
pub will_pull: usize,
pub potential_conflicts: Vec<SyncConflict>,
}
pub async fn sync(local: &EmbeddedCore, peer: &EmbeddedCore) -> crate::error::Result<SyncReport> {
let local_before = count_prime_events(local).await?;
let peer_before = count_prime_events(peer).await?;
let pre_sync_states = snapshot_node_states(local).await?;
local.sync_to(peer).await?;
peer.sync_to(local).await?;
let local_after = count_prime_events(local).await?;
let peer_after = count_prime_events(peer).await?;
let pushed = peer_after.saturating_sub(peer_before);
let pulled = local_after.saturating_sub(local_before);
let conflicts = detect_conflicts(local, &pre_sync_states).await?;
Ok(SyncReport {
pushed,
pulled,
conflicts,
})
}
pub async fn sync_preview(
local: &EmbeddedCore,
peer: &EmbeddedCore,
) -> crate::error::Result<SyncPreview> {
let local_count = count_prime_events(local).await?;
let peer_count = count_prime_events(peer).await?;
Ok(SyncPreview {
will_push: local_count.saturating_sub(peer_count),
will_pull: peer_count.saturating_sub(local_count),
potential_conflicts: vec![], })
}
async fn count_prime_events(core: &EmbeddedCore) -> crate::error::Result<usize> {
let events = core.query(Query::new().event_type_prefix("prime.")).await?;
Ok(events.len())
}
async fn snapshot_node_states(
core: &EmbeddedCore,
) -> crate::error::Result<std::collections::HashMap<String, Value>> {
let events = core
.query(Query::new().event_type_prefix("prime.node."))
.await?;
let mut states: std::collections::HashMap<String, Value> = std::collections::HashMap::new();
for event in &events {
states.insert(event.entity_id.clone(), event.payload.clone());
}
Ok(states)
}
async fn detect_conflicts(
core: &EmbeddedCore,
pre_sync_states: &std::collections::HashMap<String, Value>,
) -> crate::error::Result<Vec<SyncConflict>> {
let post_events = core
.query(Query::new().event_type_prefix("prime.node."))
.await?;
let mut post_states: std::collections::HashMap<String, Value> =
std::collections::HashMap::new();
let mut update_payloads: std::collections::HashMap<String, Vec<Value>> =
std::collections::HashMap::new();
for event in &post_events {
post_states.insert(event.entity_id.clone(), event.payload.clone());
if event.event_type == crate::prime::types::event_types::NODE_UPDATED {
update_payloads
.entry(event.entity_id.clone())
.or_default()
.push(event.payload.clone());
}
}
let mut conflicts = Vec::new();
for (entity_id, pre_state) in pre_sync_states {
if let Some(post_state) = post_states.get(entity_id) {
if pre_state != post_state
&& let Some(updates) = update_payloads.get(entity_id)
{
let distinct: std::collections::HashSet<String> = updates
.iter()
.map(|v| serde_json::to_string(v).unwrap_or_default())
.collect();
if distinct.len() > 1 {
conflicts.push(SyncConflict {
entity_id: entity_id.clone(),
conflict_type: ConflictType::NodePropertyConflict,
description: format!(
"Entity {entity_id} has {} distinct updates — concurrent modification merged",
distinct.len()
),
});
}
}
}
}
Ok(conflicts)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
embedded::{Config, IngestEvent},
prime::types::event_types,
};
async fn test_core(node_id: u32) -> EmbeddedCore {
let config = Config::builder().node_id(node_id).build().unwrap();
EmbeddedCore::open(config).await.unwrap()
}
#[tokio::test]
async fn test_sync_bidirectional() {
let local = test_core(1).await;
let remote = test_core(2).await;
local
.ingest(IngestEvent {
entity_id: "node:person:alice",
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "person", "properties": {"name": "Alice"}}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
remote
.ingest(IngestEvent {
entity_id: "node:person:bob",
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "person", "properties": {"name": "Bob"}}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
let report = sync(&local, &remote).await.unwrap();
assert!(report.pushed > 0, "should have pushed events");
assert!(report.pulled > 0, "should have pulled events");
let local_events = local
.query(Query::new().event_type(event_types::NODE_CREATED))
.await
.unwrap();
assert_eq!(local_events.len(), 2);
let remote_events = remote
.query(Query::new().event_type(event_types::NODE_CREATED))
.await
.unwrap();
assert_eq!(remote_events.len(), 2);
local.shutdown().await.unwrap();
remote.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_sync_preview_estimates() {
let local = test_core(3).await;
let remote = test_core(4).await;
for i in 0..3 {
local
.ingest(IngestEvent {
entity_id: &format!("node:n:l{i}"),
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "n"}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
}
remote
.ingest(IngestEvent {
entity_id: "node:n:r0",
event_type: event_types::NODE_CREATED,
payload: serde_json::json!({"node_type": "n"}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
let preview = sync_preview(&local, &remote).await.unwrap();
assert!(preview.will_push > 0);
local.shutdown().await.unwrap();
remote.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_sync_empty_cores() {
let local = test_core(5).await;
let remote = test_core(6).await;
let report = sync(&local, &remote).await.unwrap();
assert_eq!(report.pushed, 0);
assert_eq!(report.pulled, 0);
assert!(report.conflicts.is_empty());
local.shutdown().await.unwrap();
remote.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_conflict_type_serialization() {
let conflict = SyncConflict {
entity_id: "node:person:alice".to_string(),
conflict_type: ConflictType::NodePropertyConflict,
description: "test".to_string(),
};
let json = serde_json::to_string(&conflict).unwrap();
let parsed: SyncConflict = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.conflict_type, ConflictType::NodePropertyConflict);
}
}