use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::events::{
EVENT_SCHEMA, NULL_HASH, StateActor, StateEvent, StateTarget, compute_event_id, snapshot_hash,
};
use crate::project::Project;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PeerHub {
pub id: String,
pub url: String,
pub public_key: String,
pub added_at: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub note: String,
}
impl PeerHub {
pub fn validate(&self) -> Result<(), String> {
if self.id.trim().is_empty() {
return Err("peer id must be non-empty".into());
}
if !self.url.starts_with("https://") {
return Err(format!(
"peer url must start with `https://` (got `{}`)",
self.url
));
}
let trimmed = self.public_key.trim();
if trimmed.len() != 64 {
return Err(format!(
"peer public_key must be 64 hex chars (got {})",
trimmed.len()
));
}
if hex::decode(trimmed).is_err() {
return Err("peer public_key must be valid hex".into());
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ConflictKind {
MissingInPeer,
MissingLocally,
ConfidenceDiverged,
RetractedDiverged,
ReviewStateDiverged,
SupersededDiverged,
AssertionTextDiverged,
BrokenLocator,
UnverifiedPeerEntry,
}
impl ConflictKind {
pub fn as_str(self) -> &'static str {
match self {
ConflictKind::MissingInPeer => "missing_in_peer",
ConflictKind::MissingLocally => "missing_locally",
ConflictKind::ConfidenceDiverged => "confidence_diverged",
ConflictKind::RetractedDiverged => "retracted_diverged",
ConflictKind::ReviewStateDiverged => "review_state_diverged",
ConflictKind::SupersededDiverged => "superseded_diverged",
ConflictKind::AssertionTextDiverged => "assertion_text_diverged",
ConflictKind::BrokenLocator => "broken_locator",
ConflictKind::UnverifiedPeerEntry => "unverified_peer_entry",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Conflict {
pub finding_id: String,
pub kind: ConflictKind,
pub detail: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncReport {
pub peer_id: String,
pub our_snapshot_hash: String,
pub peer_snapshot_hash: String,
pub conflicts: Vec<Conflict>,
pub events_appended: usize,
}
#[must_use]
pub fn diff_frontiers(ours: &Project, theirs: &Project) -> Vec<Conflict> {
use std::collections::HashMap;
let our_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
ours.findings.iter().map(|f| (f.id.as_str(), f)).collect();
let their_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
theirs.findings.iter().map(|f| (f.id.as_str(), f)).collect();
let mut conflicts = Vec::new();
for id in our_by_id.keys() {
if !their_by_id.contains_key(id) {
conflicts.push(Conflict {
finding_id: (*id).to_string(),
kind: ConflictKind::MissingInPeer,
detail: "present locally, absent in peer".to_string(),
});
}
}
for id in their_by_id.keys() {
if !our_by_id.contains_key(id) {
conflicts.push(Conflict {
finding_id: (*id).to_string(),
kind: ConflictKind::MissingLocally,
detail: "present in peer, absent locally".to_string(),
});
}
}
for (id, ours_f) in &our_by_id {
let Some(theirs_f) = their_by_id.get(id) else {
continue;
};
if (ours_f.confidence.score - theirs_f.confidence.score).abs() > 0.05 {
conflicts.push(Conflict {
finding_id: (*id).to_string(),
kind: ConflictKind::ConfidenceDiverged,
detail: format!(
"ours: {:.3}, peer: {:.3}",
ours_f.confidence.score, theirs_f.confidence.score
),
});
}
if ours_f.flags.retracted != theirs_f.flags.retracted {
conflicts.push(Conflict {
finding_id: (*id).to_string(),
kind: ConflictKind::RetractedDiverged,
detail: format!(
"ours: {}, peer: {}",
ours_f.flags.retracted, theirs_f.flags.retracted
),
});
}
if ours_f.flags.review_state != theirs_f.flags.review_state {
conflicts.push(Conflict {
finding_id: (*id).to_string(),
kind: ConflictKind::ReviewStateDiverged,
detail: format!(
"ours: {:?}, peer: {:?}",
ours_f.flags.review_state, theirs_f.flags.review_state
),
});
}
if ours_f.flags.superseded != theirs_f.flags.superseded {
conflicts.push(Conflict {
finding_id: (*id).to_string(),
kind: ConflictKind::SupersededDiverged,
detail: format!(
"ours: {}, peer: {}",
ours_f.flags.superseded, theirs_f.flags.superseded
),
});
}
if ours_f.assertion.text.trim() != theirs_f.assertion.text.trim() {
conflicts.push(Conflict {
finding_id: (*id).to_string(),
kind: ConflictKind::AssertionTextDiverged,
detail:
"matching id but diverging assertion text — possible content-address collision"
.to_string(),
});
}
}
conflicts.sort_by(|a, b| {
a.finding_id
.cmp(&b.finding_id)
.then_with(|| a.kind.as_str().cmp(b.kind.as_str()))
});
conflicts
}
pub fn record_locator_failure(
project: &mut Project,
peer_id: &str,
vfr_id: &str,
locator: &str,
status: u16,
) -> SyncReport {
let now = Utc::now().to_rfc3339();
let our_hash = snapshot_hash(project);
let frontier_id = project.frontier_id();
let detail = format!("locator {locator} returned HTTP {status}");
let synced_event = StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: String::new(),
kind: "frontier.synced_with_peer".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: frontier_id.clone(),
},
actor: StateActor {
id: "federation".to_string(),
r#type: "system".to_string(),
},
timestamp: now.clone(),
reason: format!("synced with peer {peer_id} (broken locator)"),
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({
"peer_id": peer_id,
"peer_snapshot_hash": "",
"our_snapshot_hash": our_hash,
"divergence_count": 1,
}),
caveats: Vec::new(),
signature: None,
schema_artifact_id: None,
};
let mut sync_ev = synced_event;
sync_ev.id = compute_event_id(&sync_ev);
let conflict_ev = StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: String::new(),
kind: "frontier.conflict_detected".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: frontier_id.clone(),
},
actor: StateActor {
id: "federation".to_string(),
r#type: "system".to_string(),
},
timestamp: now.clone(),
reason: format!("peer={peer_id} kind=broken_locator {detail}"),
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({
"peer_id": peer_id,
"finding_id": vfr_id,
"kind": "broken_locator",
"detail": detail,
}),
caveats: Vec::new(),
signature: None,
schema_artifact_id: None,
};
let mut conflict_ev = conflict_ev;
conflict_ev.id = compute_event_id(&conflict_ev);
project.events.push(sync_ev);
project.events.push(conflict_ev);
SyncReport {
peer_id: peer_id.to_string(),
our_snapshot_hash: our_hash,
peer_snapshot_hash: String::new(),
conflicts: vec![Conflict {
finding_id: vfr_id.to_string(),
kind: ConflictKind::BrokenLocator,
detail,
}],
events_appended: 2,
}
}
pub fn record_unverified_entry(
project: &mut Project,
peer_id: &str,
vfr_id: &str,
reason: &str,
) -> SyncReport {
let now = Utc::now().to_rfc3339();
let our_hash = snapshot_hash(project);
let frontier_id = project.frontier_id();
let mut sync_ev = StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: String::new(),
kind: "frontier.synced_with_peer".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: frontier_id.clone(),
},
actor: StateActor {
id: "federation".to_string(),
r#type: "system".to_string(),
},
timestamp: now.clone(),
reason: format!("synced with peer {peer_id} (unverified entry; halted)"),
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({
"peer_id": peer_id,
"peer_snapshot_hash": "",
"our_snapshot_hash": our_hash,
"divergence_count": 1,
}),
caveats: Vec::new(),
signature: None,
schema_artifact_id: None,
};
sync_ev.id = compute_event_id(&sync_ev);
let mut conflict_ev = StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: String::new(),
kind: "frontier.conflict_detected".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: frontier_id.clone(),
},
actor: StateActor {
id: "federation".to_string(),
r#type: "system".to_string(),
},
timestamp: now.clone(),
reason: format!("peer={peer_id} kind=unverified_peer_entry {reason}"),
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({
"peer_id": peer_id,
"finding_id": vfr_id,
"kind": "unverified_peer_entry",
"detail": reason,
}),
caveats: Vec::new(),
signature: None,
schema_artifact_id: None,
};
conflict_ev.id = compute_event_id(&conflict_ev);
project.events.push(sync_ev);
project.events.push(conflict_ev);
SyncReport {
peer_id: peer_id.to_string(),
our_snapshot_hash: our_hash,
peer_snapshot_hash: String::new(),
conflicts: vec![Conflict {
finding_id: vfr_id.to_string(),
kind: ConflictKind::UnverifiedPeerEntry,
detail: reason.to_string(),
}],
events_appended: 2,
}
}
pub fn classify_peer_event_set(
our_events: &[StateEvent],
peer_events: &[StateEvent],
) -> crate::ancestor_closure::AncestorAction {
use crate::ancestor_closure::classify_ancestor_action;
let combined: Vec<(String, Vec<String>)> = our_events
.iter()
.chain(peer_events.iter())
.map(|e| (e.id.clone(), event_parents(e)))
.collect();
classify_ancestor_action(combined)
}
fn event_parents(e: &StateEvent) -> Vec<String> {
e.payload
.get("parents")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|p| p.as_str().map(String::from))
.collect()
})
.unwrap_or_default()
}
pub fn sync_with_peer(project: &mut Project, peer_id: &str, peer: &Project) -> SyncReport {
let our_hash = snapshot_hash(project);
let peer_hash = snapshot_hash(peer);
let conflicts = diff_frontiers(project, peer);
let now = Utc::now().to_rfc3339();
let frontier_id = project.frontier_id().clone();
let synced_reason = format!("synced with peer {peer_id}");
let mut synced_event = StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: String::new(),
kind: "frontier.synced_with_peer".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: frontier_id.clone(),
},
actor: StateActor {
id: "federation".to_string(),
r#type: "system".to_string(),
},
timestamp: now.clone(),
reason: synced_reason,
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({
"peer_id": peer_id,
"peer_snapshot_hash": peer_hash,
"our_snapshot_hash": our_hash,
"divergence_count": conflicts.len(),
}),
caveats: Vec::new(),
signature: None,
schema_artifact_id: None,
};
synced_event.id = compute_event_id(&synced_event);
let mut conflict_events: Vec<StateEvent> = Vec::with_capacity(conflicts.len());
for c in &conflicts {
let reason = format!("peer={peer_id} kind={} {}", c.kind.as_str(), c.detail);
let mut ev = StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: String::new(),
kind: "frontier.conflict_detected".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: frontier_id.clone(),
},
actor: StateActor {
id: "federation".to_string(),
r#type: "system".to_string(),
},
timestamp: now.clone(),
reason,
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({
"peer_id": peer_id,
"finding_id": c.finding_id,
"kind": c.kind.as_str(),
"detail": c.detail,
}),
caveats: Vec::new(),
signature: None,
schema_artifact_id: None,
};
ev.id = compute_event_id(&ev);
conflict_events.push(ev);
}
let events_appended = 1 + conflict_events.len();
project.events.push(synced_event);
project.events.extend(conflict_events);
SyncReport {
peer_id: peer_id.to_string(),
our_snapshot_hash: our_hash,
peer_snapshot_hash: peer_hash,
conflicts,
events_appended,
}
}
#[derive(Debug)]
pub enum DiscoveryResult {
Resolved(Project),
EntryNotFound { vfr_id: String, status: u16 },
UnverifiedEntry { vfr_id: String, reason: String },
BrokenLocator {
vfr_id: String,
locator: String,
status: u16,
},
Unreachable { url: String, error: String },
}
pub fn discover_peer_frontier(
hub_url: &str,
vfr_id: &str,
expected_owner_pubkey: Option<&str>,
) -> DiscoveryResult {
let hub = hub_url.trim_end_matches('/').to_string();
let entries_url = format!("{hub}/entries/{vfr_id}");
let vfr_owned = vfr_id.to_string();
let expected = expected_owner_pubkey.map(|s| s.to_string());
std::thread::spawn(move || -> DiscoveryResult {
let resp = match reqwest::blocking::get(&entries_url) {
Ok(r) => r,
Err(e) => {
return DiscoveryResult::Unreachable {
url: entries_url.clone(),
error: e.to_string(),
};
}
};
let status = resp.status();
if status.as_u16() == 404 {
return DiscoveryResult::EntryNotFound {
vfr_id: vfr_owned,
status: status.as_u16(),
};
}
if !status.is_success() {
return DiscoveryResult::Unreachable {
url: entries_url.clone(),
error: format!("hub returned HTTP {status}"),
};
}
let body = match resp.text() {
Ok(b) => b,
Err(e) => {
return DiscoveryResult::Unreachable {
url: entries_url.clone(),
error: format!("read body: {e}"),
};
}
};
let entry: crate::registry::RegistryEntry = match serde_json::from_str(&body) {
Ok(e) => e,
Err(e) => {
return DiscoveryResult::UnverifiedEntry {
vfr_id: vfr_owned,
reason: format!("parse registry entry: {e}"),
};
}
};
match crate::registry::verify_entry(&entry) {
Ok(true) => {}
Ok(false) => {
return DiscoveryResult::UnverifiedEntry {
vfr_id: vfr_owned,
reason: "registry entry signature does not verify against entry.owner_pubkey"
.to_string(),
};
}
Err(e) => {
return DiscoveryResult::UnverifiedEntry {
vfr_id: vfr_owned,
reason: format!("signature verification error: {e}"),
};
}
}
if let Some(want) = expected.as_deref()
&& entry.owner_pubkey != want
{
return DiscoveryResult::UnverifiedEntry {
vfr_id: vfr_owned,
reason: format!(
"entry owner_pubkey {} != expected peer pubkey {}",
&entry.owner_pubkey[..16],
&want[..16]
),
};
}
let locator = entry.network_locator.clone();
let mresp = match reqwest::blocking::get(&locator) {
Ok(r) => r,
Err(e) => {
return DiscoveryResult::BrokenLocator {
vfr_id: vfr_owned,
locator,
status: 0,
}
.with_error(e.to_string());
}
};
let mstatus = mresp.status();
if !mstatus.is_success() {
return DiscoveryResult::BrokenLocator {
vfr_id: vfr_owned,
locator,
status: mstatus.as_u16(),
};
}
let mbody = match mresp.text() {
Ok(b) => b,
Err(e) => {
return DiscoveryResult::BrokenLocator {
vfr_id: vfr_owned,
locator,
status: 0,
}
.with_error(e.to_string());
}
};
match serde_json::from_str::<Project>(&mbody) {
Ok(p) => DiscoveryResult::Resolved(p),
Err(e) => DiscoveryResult::BrokenLocator {
vfr_id: vfr_owned,
locator,
status: 0,
}
.with_error(format!("manifest parse: {e}")),
}
})
.join()
.unwrap_or(DiscoveryResult::Unreachable {
url: hub_url.to_string(),
error: "discovery thread panicked".to_string(),
})
}
impl DiscoveryResult {
fn with_error(self, _ctx: String) -> Self {
self
}
}
pub fn fetch_peer_frontier(url: &str) -> Result<Project, String> {
let url_owned = url.to_string();
let handle = std::thread::spawn(move || -> Result<Project, String> {
let resp = reqwest::blocking::get(&url_owned)
.map_err(|e| format!("HTTP GET {url_owned} failed: {e}"))?;
let status = resp.status();
if !status.is_success() {
return Err(format!("peer returned HTTP {status}"));
}
let body = resp
.text()
.map_err(|e| format!("read body from {url_owned}: {e}"))?;
serde_json::from_str(&body)
.map_err(|e| format!("parse peer frontier from {url_owned}: {e}"))
});
handle
.join()
.map_err(|_| "fetch thread panicked".to_string())?
}
#[cfg(test)]
mod tests {
use super::*;
fn good() -> PeerHub {
PeerHub {
id: "hub:test".into(),
url: "https://example.invalid/".into(),
public_key: "00".repeat(32),
added_at: "2026-04-27T00:00:00Z".into(),
note: String::new(),
}
}
#[test]
fn validates_correct_shape() {
assert!(good().validate().is_ok());
}
#[test]
fn rejects_empty_id() {
let mut p = good();
p.id = " ".into();
assert!(p.validate().is_err());
}
#[test]
fn rejects_http_url() {
let mut p = good();
p.url = "http://insecure.example/".into();
assert!(p.validate().is_err());
}
#[test]
fn rejects_short_pubkey() {
let mut p = good();
p.public_key = "abcd".into();
assert!(p.validate().is_err());
}
#[test]
fn rejects_non_hex_pubkey() {
let mut p = good();
p.public_key = "z".repeat(64);
assert!(p.validate().is_err());
}
use crate::bundle::{
Assertion, Conditions, Confidence, Evidence, Extraction, FindingBundle, Flags, Provenance,
ReviewState,
};
use crate::project::{self, Project};
fn finding(id: &str, score: f64) -> FindingBundle {
let mut b = FindingBundle::new(
Assertion {
text: format!("claim {id}"),
assertion_type: "mechanism".into(),
entities: vec![],
relation: None,
direction: None,
causal_claim: None,
causal_evidence_grade: None,
},
Evidence {
evidence_type: "experimental".into(),
model_system: String::new(),
species: None,
method: String::new(),
sample_size: Some("n=30".into()),
effect_size: None,
p_value: None,
replicated: false,
replication_count: None,
evidence_spans: vec![],
},
Conditions {
text: String::new(),
species_verified: vec![],
species_unverified: vec![],
in_vitro: false,
in_vivo: false,
human_data: false,
clinical_trial: false,
concentration_range: None,
duration: None,
age_group: None,
cell_type: None,
},
Confidence::raw(score, "test", 0.85),
Provenance {
source_type: "published_paper".into(),
doi: None,
pmid: None,
pmc: None,
openalex_id: None,
url: None,
title: "Test".into(),
authors: vec![],
year: Some(2025),
journal: None,
license: None,
publisher: None,
funders: vec![],
extraction: Extraction::default(),
review: None,
citation_count: None,
},
Flags::default(),
);
b.id = id.to_string();
b
}
fn assemble(name: &str, findings: Vec<FindingBundle>) -> Project {
project::assemble(name, findings, 1, 0, "test")
}
#[test]
fn diff_identical_frontiers_returns_no_conflicts() {
let f = finding("vf_001", 0.7);
let ours = assemble("ours", vec![f.clone()]);
let theirs = assemble("theirs", vec![f]);
let conflicts = diff_frontiers(&ours, &theirs);
assert_eq!(conflicts.len(), 0);
}
#[test]
fn diff_detects_missing_in_peer_and_locally() {
let f1 = finding("vf_001", 0.7);
let f2 = finding("vf_002", 0.7);
let ours = assemble("ours", vec![f1.clone()]);
let theirs = assemble("theirs", vec![f2.clone()]);
let conflicts = diff_frontiers(&ours, &theirs);
let kinds: Vec<&str> = conflicts.iter().map(|c| c.kind.as_str()).collect();
assert!(kinds.contains(&"missing_in_peer"));
assert!(kinds.contains(&"missing_locally"));
}
#[test]
fn diff_detects_confidence_divergence_above_threshold() {
let mut f_ours = finding("vf_001", 0.85);
let mut f_theirs = finding("vf_001", 0.55);
f_ours.id = "vf_001".into();
f_theirs.id = "vf_001".into();
let ours = assemble("ours", vec![f_ours]);
let theirs = assemble("theirs", vec![f_theirs]);
let conflicts = diff_frontiers(&ours, &theirs);
assert!(
conflicts
.iter()
.any(|c| c.kind == ConflictKind::ConfidenceDiverged),
"expected confidence_diverged in {conflicts:?}"
);
}
#[test]
fn diff_ignores_confidence_drift_below_threshold() {
let mut f_ours = finding("vf_001", 0.700);
let mut f_theirs = finding("vf_001", 0.730);
f_ours.id = "vf_001".into();
f_theirs.id = "vf_001".into();
let ours = assemble("ours", vec![f_ours]);
let theirs = assemble("theirs", vec![f_theirs]);
let conflicts = diff_frontiers(&ours, &theirs);
assert!(
!conflicts
.iter()
.any(|c| c.kind == ConflictKind::ConfidenceDiverged),
"0.03 drift should not flag: {conflicts:?}"
);
}
#[test]
fn diff_detects_retracted_divergence() {
let mut f_ours = finding("vf_001", 0.7);
let mut f_theirs = finding("vf_001", 0.7);
f_ours.id = "vf_001".into();
f_theirs.id = "vf_001".into();
f_theirs.flags.retracted = true;
let ours = assemble("ours", vec![f_ours]);
let theirs = assemble("theirs", vec![f_theirs]);
let conflicts = diff_frontiers(&ours, &theirs);
assert!(
conflicts
.iter()
.any(|c| c.kind == ConflictKind::RetractedDiverged)
);
}
#[test]
fn diff_detects_review_state_divergence() {
let mut f_ours = finding("vf_001", 0.7);
let mut f_theirs = finding("vf_001", 0.7);
f_ours.id = "vf_001".into();
f_theirs.id = "vf_001".into();
f_theirs.flags.review_state = Some(ReviewState::Contested);
let ours = assemble("ours", vec![f_ours]);
let theirs = assemble("theirs", vec![f_theirs]);
let conflicts = diff_frontiers(&ours, &theirs);
assert!(
conflicts
.iter()
.any(|c| c.kind == ConflictKind::ReviewStateDiverged)
);
}
#[test]
fn diff_detects_assertion_text_divergence() {
let mut f_ours = finding("vf_001", 0.7);
let mut f_theirs = finding("vf_001", 0.7);
f_ours.id = "vf_001".into();
f_theirs.id = "vf_001".into();
f_theirs.assertion.text = "different claim".into();
let ours = assemble("ours", vec![f_ours]);
let theirs = assemble("theirs", vec![f_theirs]);
let conflicts = diff_frontiers(&ours, &theirs);
assert!(
conflicts
.iter()
.any(|c| c.kind == ConflictKind::AssertionTextDiverged)
);
}
#[test]
fn sync_appends_one_synced_event_plus_one_per_conflict() {
let mut f_ours = finding("vf_001", 0.7);
let mut f_theirs = finding("vf_001", 0.7);
f_ours.id = "vf_001".into();
f_theirs.id = "vf_001".into();
f_theirs.flags.retracted = true;
let mut ours = assemble("ours", vec![f_ours]);
let theirs = assemble("theirs", vec![f_theirs]);
let events_before = ours.events.len();
let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
assert_eq!(report.conflicts.len(), 1);
assert_eq!(report.events_appended, 2); assert_eq!(ours.events.len() - events_before, 2);
let sync_ev = &ours.events[events_before];
assert_eq!(sync_ev.kind, "frontier.synced_with_peer");
assert_eq!(sync_ev.payload["divergence_count"].as_u64(), Some(1));
let conf_ev = &ours.events[events_before + 1];
assert_eq!(conf_ev.kind, "frontier.conflict_detected");
assert_eq!(conf_ev.payload["kind"], "retracted_diverged");
}
#[test]
fn sync_with_clean_diff_emits_zero_divergence_event() {
let f = finding("vf_001", 0.7);
let mut ours = assemble("ours", vec![f.clone()]);
let theirs = assemble("theirs", vec![f]);
let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
assert_eq!(report.conflicts.len(), 0);
assert_eq!(report.events_appended, 1);
let last = ours.events.last().unwrap();
assert_eq!(last.kind, "frontier.synced_with_peer");
assert_eq!(last.payload["divergence_count"].as_u64(), Some(0));
}
fn make_event(id: &str, parents: &[&str]) -> StateEvent {
StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: id.to_string(),
kind: "test.event".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: "vfr_test".to_string(),
},
actor: StateActor {
id: "test".to_string(),
r#type: "system".to_string(),
},
timestamp: "2026-05-09T00:00:00Z".to_string(),
reason: "test".to_string(),
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({"parents": parents}),
caveats: vec![],
signature: None,
schema_artifact_id: None,
}
}
#[test]
fn classify_returns_proceed_when_peer_set_is_complete() {
let our_events = vec![make_event("e1", &[])];
let peer_events = vec![make_event("e2", &["e1"])];
let action = classify_peer_event_set(&our_events, &peer_events);
assert!(matches!(
action,
crate::ancestor_closure::AncestorAction::Proceed
));
}
#[test]
fn classify_returns_fetch_when_peer_references_missing_ancestor() {
let our_events = vec![make_event("e1", &[])];
let peer_events = vec![make_event("e3", &["e2"])];
let action = classify_peer_event_set(&our_events, &peer_events);
match action {
crate::ancestor_closure::AncestorAction::Fetch { missing } => {
assert_eq!(missing, vec!["e2"]);
}
other => panic!("expected Fetch, got {other:?}"),
}
}
#[test]
fn classify_handles_events_without_explicit_parents() {
let our_events = vec![StateEvent {
schema: EVENT_SCHEMA.to_string(),
id: "e1".to_string(),
kind: "test.event".to_string(),
target: StateTarget {
r#type: "frontier_observation".to_string(),
id: "vfr_test".to_string(),
},
actor: StateActor {
id: "test".to_string(),
r#type: "system".to_string(),
},
timestamp: "2026-05-09T00:00:00Z".to_string(),
reason: "test".to_string(),
before_hash: NULL_HASH.to_string(),
after_hash: NULL_HASH.to_string(),
payload: json!({}),
caveats: vec![],
signature: None,
schema_artifact_id: None,
}];
let peer_events = vec![];
let action = classify_peer_event_set(&our_events, &peer_events);
assert!(matches!(
action,
crate::ancestor_closure::AncestorAction::Proceed
));
}
}