use std::io::BufReader;
use chrono::{Duration, Utc};
static DB_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
use second_brain_core::{
kuzu_store::KuzuStore,
schema::{
Conversation, Entity, Memory, MemoryType, Relation, RelationType, SyncNodeType, SyncOp,
},
store::Store,
};
use second_brain_sync::{export::export_changes, import::import_changes};
fn make_memory(content: &str, machine_id: &str) -> Memory {
Memory {
machine_id: machine_id.to_string(),
..Memory::new(
content.to_string(),
MemoryType::Semantic,
"test".to_string(),
String::new(),
)
}
}
fn export_to_string(store: &KuzuStore, after_seq: u64) -> String {
let mut buf = Vec::new();
export_changes(store, after_seq, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
}
fn import_from_str(store: &KuzuStore, data: &str) -> (second_brain_sync::import::ImportStats, u64) {
let reader = BufReader::new(data.as_bytes());
import_changes(store, reader).unwrap()
}
#[test]
fn newer_updated_at_wins() {
let _db_guard = DB_LOCK.lock().unwrap();
let store_a = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let store_b = KuzuStore::in_memory("machine-b".to_string()).unwrap();
let mut mem = make_memory("original content", "machine-a");
mem.updated_at = Utc::now() - Duration::hours(2);
store_a.store_memory(&mem).unwrap();
let exported = export_to_string(&store_a, 0);
import_from_str(&store_b, &exported);
let mem_on_b = store_b.get_memory(mem.id).unwrap().unwrap();
assert_eq!(mem_on_b.content, "original content");
let newer_updated_at = Utc::now();
let mut updated = mem_on_b.clone();
updated.content = "newer content wins".to_string();
updated.updated_at = newer_updated_at;
let (outcome, _) = store_b
.apply_update_memory(&updated, "machine-b", 99)
.unwrap();
assert!(matches!(
outcome,
second_brain_core::kuzu_store::ApplyOutcome::Updated
));
let after_newer = store_b.get_memory(mem.id).unwrap().unwrap();
assert_eq!(after_newer.content, "newer content wins");
let mut older = after_newer.clone();
older.content = "older content should lose".to_string();
older.updated_at = Utc::now() - Duration::hours(1);
let (outcome2, _) = store_b
.apply_update_memory(&older, "machine-a", 100)
.unwrap();
assert!(matches!(
outcome2,
second_brain_core::kuzu_store::ApplyOutcome::Skipped
));
let final_state = store_b.get_memory(mem.id).unwrap().unwrap();
assert_eq!(final_state.content, "newer content wins");
}
#[test]
fn reinforce_does_not_win_conflict() {
let _db_guard = DB_LOCK.lock().unwrap();
let store_a = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let store_b = KuzuStore::in_memory("machine-b".to_string()).unwrap();
let base_time = Utc::now() - Duration::hours(3);
let mut mem = make_memory("base content", "machine-a");
mem.updated_at = base_time;
store_a.store_memory(&mem).unwrap();
let exported = export_to_string(&store_a, 0);
import_from_str(&store_b, &exported);
let mut b_copy = store_b.get_memory(mem.id).unwrap().unwrap();
b_copy.content = "b genuine edit".to_string();
b_copy.updated_at = Utc::now();
let (_, _) = store_b
.apply_update_memory(&b_copy, "machine-b", 10)
.unwrap();
let b_state_after_edit = store_b.get_memory(mem.id).unwrap().unwrap();
let b_edit_time = b_state_after_edit.updated_at;
let mut a_reinforced = mem.clone();
a_reinforced.updated_at = base_time;
a_reinforced.access_count += 5;
a_reinforced.last_accessed = Utc::now();
let (outcome, _) = store_b
.apply_update_memory(&a_reinforced, "machine-a", 20)
.unwrap();
assert!(matches!(
outcome,
second_brain_core::kuzu_store::ApplyOutcome::Skipped
));
let final_b = store_b.get_memory(mem.id).unwrap().unwrap();
assert_eq!(final_b.content, "b genuine edit");
assert_eq!(final_b.updated_at, b_edit_time);
}
#[test]
fn tombstone_blocks_resurrection() {
let _db_guard = DB_LOCK.lock().unwrap();
let store_a = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let store_b = KuzuStore::in_memory("machine-b".to_string()).unwrap();
let mem = make_memory("memory to be deleted", "machine-a");
let mem_id = mem.id;
store_a.store_memory(&mem).unwrap();
let exported_create = export_to_string(&store_a, 0);
let (create_stats, _) = import_from_str(&store_b, &exported_create);
assert_eq!(create_stats.created, 1);
store_a.delete_memory(mem_id).unwrap();
let after_create_seq = {
let entries = store_a.sync_log_since(0).unwrap();
entries
.iter()
.filter(|e| e.op == SyncOp::Create)
.map(|e| e.local_seq as u64)
.max()
.unwrap_or(0)
};
let exported_delete = export_to_string(&store_a, after_create_seq);
let (delete_stats, _) = import_from_str(&store_b, &exported_delete);
assert_eq!(delete_stats.deleted, 1);
assert!(
store_b.tombstone_exists(mem_id).unwrap(),
"tombstone must be on B after delete sync"
);
assert!(
store_b.get_memory(mem_id).unwrap().is_none(),
"memory must be gone from B"
);
let create_record_jsonl = {
let entries = store_a.sync_log_since(0).unwrap();
let create_entry = entries.iter().find(|e| e.op == SyncOp::Create).unwrap();
let data: Option<serde_json::Value> = create_entry
.data
.as_ref()
.and_then(|d| serde_json::from_str(d).ok());
let record = serde_json::json!({
"local_seq": create_entry.local_seq,
"origin_machine_id": create_entry.origin_machine_id,
"origin_seq": create_entry.origin_seq,
"op": create_entry.op,
"node_type": create_entry.node_type,
"node_id": create_entry.node_id,
"timestamp": create_entry.timestamp,
"data": data,
});
format!("{}\n", serde_json::to_string(&record).unwrap())
};
let reader = BufReader::new(create_record_jsonl.as_bytes());
let (resurrection_stats, _) = import_changes(&store_b, reader).unwrap();
assert_eq!(
resurrection_stats.skipped, 1,
"create on tombstoned id must be skipped"
);
assert!(
store_b.get_memory(mem_id).unwrap().is_none(),
"tombstone must block resurrection"
);
}
#[test]
fn transitive_relay() {
let _db_guard = DB_LOCK.lock().unwrap();
let store_a = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let store_b = KuzuStore::in_memory("machine-b".to_string()).unwrap();
let store_c = KuzuStore::in_memory("machine-c".to_string()).unwrap();
let mem = make_memory("transitive relay test", "machine-a");
let mem_id = mem.id;
store_a.store_memory(&mem).unwrap();
let a_export = export_to_string(&store_a, 0);
import_from_str(&store_b, &a_export);
assert!(
store_b.get_memory(mem_id).unwrap().is_some(),
"B must have the memory"
);
let b_export = export_to_string(&store_b, 0);
import_from_str(&store_c, &b_export);
assert!(
store_c.get_memory(mem_id).unwrap().is_some(),
"C must have the memory via relay"
);
let c_log = store_c.sync_log_since(0).unwrap();
let create_entry = c_log
.iter()
.find(|e| e.node_id == mem_id.to_string() && e.op == SyncOp::Create)
.unwrap();
assert_eq!(
create_entry.origin_machine_id, "machine-a",
"C's log must carry A's origin_machine_id, not B's"
);
}
#[test]
fn import_is_idempotent() {
let _db_guard = DB_LOCK.lock().unwrap();
let store = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let mem = make_memory("idempotent test", "machine-a");
store.store_memory(&mem).unwrap();
let exported = export_to_string(&store, 0);
let target = KuzuStore::in_memory("machine-b".to_string()).unwrap();
let (first_stats, _) = import_from_str(&target, &exported);
let count_after_first = target.memory_count().unwrap();
let log_after_first = target.sync_log_since(0).unwrap().len();
let (second_stats, _) = import_from_str(&target, &exported);
let count_after_second = target.memory_count().unwrap();
let log_after_second = target.sync_log_since(0).unwrap().len();
assert_eq!(first_stats.created, 1);
assert_eq!(second_stats.created, 0);
assert_eq!(second_stats.errors, 0);
assert_eq!(second_stats.skipped, 1);
assert_eq!(count_after_first, count_after_second);
assert_eq!(log_after_first, log_after_second);
}
#[test]
fn entity_and_conversation_ops_apply() {
let _db_guard = DB_LOCK.lock().unwrap();
let store_src = KuzuStore::in_memory("machine-src".to_string()).unwrap();
let store_dst = KuzuStore::in_memory("machine-dst".to_string()).unwrap();
let conv = Conversation {
id: uuid::Uuid::new_v4(),
source: "test".to_string(),
machine_id: "machine-src".to_string(),
started_at: Utc::now(),
project_path: None,
};
store_src.store_conversation(&conv).unwrap();
let entity = Entity::new("TestEntity".to_string(), "person".to_string());
let entity_id = entity.id;
store_src.store_entity(&entity).unwrap();
let conv_id = conv.id;
let exported = export_to_string(&store_src, 0);
let (stats, _) = import_from_str(&store_dst, &exported);
assert_eq!(
stats.created, 2,
"conversation and entity must both be created"
);
assert!(store_dst.get_conversation(conv_id).unwrap().is_some());
assert!(store_dst.get_entity(entity_id).unwrap().is_some());
let mut updated_entity = entity.clone();
updated_entity.name = "UpdatedEntity".to_string();
let update_jsonl = {
let data = serde_json::to_value(&updated_entity).unwrap();
let record = serde_json::json!({
"local_seq": 100u64,
"origin_machine_id": "machine-src",
"origin_seq": 100u64,
"op": SyncOp::Update,
"node_type": SyncNodeType::Entity,
"node_id": entity_id.to_string(),
"timestamp": Utc::now(),
"data": data,
});
format!("{}\n", serde_json::to_string(&record).unwrap())
};
let (update_stats, _) = import_from_str(&store_dst, &update_jsonl);
assert_eq!(update_stats.updated, 1);
let delete_conv_jsonl = {
let record = serde_json::json!({
"local_seq": 101u64,
"origin_machine_id": "machine-src",
"origin_seq": 101u64,
"op": SyncOp::Delete,
"node_type": SyncNodeType::Conversation,
"node_id": conv_id.to_string(),
"timestamp": Utc::now(),
"data": null,
});
format!("{}\n", serde_json::to_string(&record).unwrap())
};
let (delete_stats, _) = import_from_str(&store_dst, &delete_conv_jsonl);
assert_eq!(delete_stats.deleted, 1);
assert!(store_dst.get_conversation(conv_id).unwrap().is_none());
}
#[test]
fn error_holds_watermark() {
let _db_guard = DB_LOCK.lock().unwrap();
let store = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let mem1 = make_memory("memory one", "machine-a");
let mem3 = make_memory("memory three", "machine-a");
let record1 = serde_json::json!({
"local_seq": 1u64,
"origin_machine_id": "machine-a",
"origin_seq": 1u64,
"op": SyncOp::Create,
"node_type": SyncNodeType::Memory,
"node_id": mem1.id.to_string(),
"timestamp": Utc::now(),
"data": serde_json::to_value(&mem1).unwrap(),
});
let record2_malformed = serde_json::json!({
"local_seq": 2u64,
"origin_machine_id": "machine-a",
"origin_seq": 2u64,
"op": SyncOp::Create,
"node_type": SyncNodeType::Memory,
"node_id": "some-id",
"timestamp": Utc::now(),
"data": null,
});
let record3 = serde_json::json!({
"local_seq": 3u64,
"origin_machine_id": "machine-a",
"origin_seq": 3u64,
"op": SyncOp::Create,
"node_type": SyncNodeType::Memory,
"node_id": mem3.id.to_string(),
"timestamp": Utc::now(),
"data": serde_json::to_value(&mem3).unwrap(),
});
let jsonl = format!(
"{}\n{}\n{}\n",
serde_json::to_string(&record1).unwrap(),
serde_json::to_string(&record2_malformed).unwrap(),
serde_json::to_string(&record3).unwrap(),
);
let reader = BufReader::new(jsonl.as_bytes());
let (stats, watermark) = import_changes(&store, reader).unwrap();
assert_eq!(stats.errors, 1);
assert!(
watermark < 2,
"watermark must be below the errored record's local_seq=2, got {watermark}"
);
}
#[test]
fn delete_consolidated_raw_is_tombstoned() {
let _db_guard = DB_LOCK.lock().unwrap();
let store = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let raw = make_memory("raw memory to consolidate", "machine-a");
let distilled = make_memory("distilled result", "machine-a");
let raw_id = raw.id;
store.store_memory(&raw).unwrap();
store.store_memory(&distilled).unwrap();
store
.mark_consolidated(raw_id, distilled.id, "test-model")
.unwrap();
let deleted = store.delete_consolidated_raw().unwrap();
assert_eq!(deleted, 1);
assert!(
store.get_memory(raw_id).unwrap().is_none(),
"raw memory must be deleted"
);
assert!(
store.tombstone_exists(raw_id).unwrap(),
"tombstone must exist so the delete syncs"
);
let log = store.sync_log_since(0).unwrap();
let delete_entries: Vec<_> = log
.iter()
.filter(|e| e.op == SyncOp::Delete && e.node_id == raw_id.to_string())
.collect();
assert!(
!delete_entries.is_empty(),
"sync log must have a Delete entry for the consolidated raw memory"
);
}
#[test]
fn synced_update_preserves_relationships() {
let _db_guard = DB_LOCK.lock().unwrap();
let store = KuzuStore::in_memory("machine-a".to_string()).unwrap();
let mem = make_memory("original content", "machine-a");
let mem_id = mem.id;
store.store_memory(&mem).unwrap();
let entity = Entity::new("TestEntity".to_string(), "person".to_string());
let entity_id = entity.id;
store.store_entity(&entity).unwrap();
let rel = Relation {
from_id: mem_id,
to_id: entity_id,
relation_type: RelationType::Mentions,
strength: 1.0,
context: None,
};
store.store_relation(&rel).unwrap();
let relations_before = store
.get_relations(mem_id, Some(RelationType::Mentions))
.unwrap();
assert!(
!relations_before.is_empty(),
"MENTIONS relation must exist before the update"
);
let mut updated = mem.clone();
updated.content = "updated content".to_string();
updated.updated_at = Utc::now() + Duration::seconds(1);
let (outcome, _) = store
.apply_update_memory(&updated, "machine-b", 99)
.unwrap();
assert!(
matches!(
outcome,
second_brain_core::kuzu_store::ApplyOutcome::Updated
),
"apply_update_memory must report Updated"
);
let relations_after = store
.get_relations(mem_id, Some(RelationType::Mentions))
.unwrap();
assert!(
!relations_after.is_empty(),
"MENTIONS relation must still exist after a synced update — DETACH DELETE destroys it"
);
assert_eq!(
relations_after[0].to_id, entity_id,
"MENTIONS relation must still point to the original entity"
);
}