1use chrono::Utc;
40use serde::{Deserialize, Serialize};
41use serde_json::json;
42
43use crate::events::{
44 EVENT_SCHEMA, NULL_HASH, StateActor, StateEvent, StateTarget, compute_event_id, snapshot_hash,
45};
46use crate::project::Project;
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct PeerHub {
54 pub id: String,
58 pub url: String,
62 pub public_key: String,
66 pub added_at: String,
69 #[serde(default, skip_serializing_if = "String::is_empty")]
72 pub note: String,
73}
74
75impl PeerHub {
76 pub fn validate(&self) -> Result<(), String> {
80 if self.id.trim().is_empty() {
81 return Err("peer id must be non-empty".into());
82 }
83 if !self.url.starts_with("https://") {
84 return Err(format!(
85 "peer url must start with `https://` (got `{}`)",
86 self.url
87 ));
88 }
89 let trimmed = self.public_key.trim();
90 if trimmed.len() != 64 {
91 return Err(format!(
92 "peer public_key must be 64 hex chars (got {})",
93 trimmed.len()
94 ));
95 }
96 if hex::decode(trimmed).is_err() {
97 return Err("peer public_key must be valid hex".into());
98 }
99 Ok(())
100 }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
108#[serde(rename_all = "snake_case")]
109pub enum ConflictKind {
110 MissingInPeer,
112 MissingLocally,
114 ConfidenceDiverged,
117 RetractedDiverged,
119 ReviewStateDiverged,
121 SupersededDiverged,
123 AssertionTextDiverged,
128 BrokenLocator,
137 UnverifiedPeerEntry,
143}
144
145impl ConflictKind {
146 pub fn as_str(self) -> &'static str {
147 match self {
148 ConflictKind::MissingInPeer => "missing_in_peer",
149 ConflictKind::MissingLocally => "missing_locally",
150 ConflictKind::ConfidenceDiverged => "confidence_diverged",
151 ConflictKind::RetractedDiverged => "retracted_diverged",
152 ConflictKind::ReviewStateDiverged => "review_state_diverged",
153 ConflictKind::SupersededDiverged => "superseded_diverged",
154 ConflictKind::AssertionTextDiverged => "assertion_text_diverged",
155 ConflictKind::BrokenLocator => "broken_locator",
156 ConflictKind::UnverifiedPeerEntry => "unverified_peer_entry",
157 }
158 }
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct Conflict {
164 pub finding_id: String,
165 pub kind: ConflictKind,
166 pub detail: String,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct SyncReport {
174 pub peer_id: String,
175 pub our_snapshot_hash: String,
176 pub peer_snapshot_hash: String,
177 pub conflicts: Vec<Conflict>,
178 pub events_appended: usize,
181}
182
183#[must_use]
190pub fn diff_frontiers(ours: &Project, theirs: &Project) -> Vec<Conflict> {
191 use std::collections::HashMap;
192
193 let our_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
194 ours.findings.iter().map(|f| (f.id.as_str(), f)).collect();
195 let their_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
196 theirs.findings.iter().map(|f| (f.id.as_str(), f)).collect();
197
198 let mut conflicts = Vec::new();
199
200 for id in our_by_id.keys() {
202 if !their_by_id.contains_key(id) {
203 conflicts.push(Conflict {
204 finding_id: (*id).to_string(),
205 kind: ConflictKind::MissingInPeer,
206 detail: "present locally, absent in peer".to_string(),
207 });
208 }
209 }
210 for id in their_by_id.keys() {
212 if !our_by_id.contains_key(id) {
213 conflicts.push(Conflict {
214 finding_id: (*id).to_string(),
215 kind: ConflictKind::MissingLocally,
216 detail: "present in peer, absent locally".to_string(),
217 });
218 }
219 }
220 for (id, ours_f) in &our_by_id {
222 let Some(theirs_f) = their_by_id.get(id) else {
223 continue;
224 };
225 if (ours_f.confidence.score - theirs_f.confidence.score).abs() > 0.05 {
226 conflicts.push(Conflict {
227 finding_id: (*id).to_string(),
228 kind: ConflictKind::ConfidenceDiverged,
229 detail: format!(
230 "ours: {:.3}, peer: {:.3}",
231 ours_f.confidence.score, theirs_f.confidence.score
232 ),
233 });
234 }
235 if ours_f.flags.retracted != theirs_f.flags.retracted {
236 conflicts.push(Conflict {
237 finding_id: (*id).to_string(),
238 kind: ConflictKind::RetractedDiverged,
239 detail: format!(
240 "ours: {}, peer: {}",
241 ours_f.flags.retracted, theirs_f.flags.retracted
242 ),
243 });
244 }
245 if ours_f.flags.review_state != theirs_f.flags.review_state {
246 conflicts.push(Conflict {
247 finding_id: (*id).to_string(),
248 kind: ConflictKind::ReviewStateDiverged,
249 detail: format!(
250 "ours: {:?}, peer: {:?}",
251 ours_f.flags.review_state, theirs_f.flags.review_state
252 ),
253 });
254 }
255 if ours_f.flags.superseded != theirs_f.flags.superseded {
256 conflicts.push(Conflict {
257 finding_id: (*id).to_string(),
258 kind: ConflictKind::SupersededDiverged,
259 detail: format!(
260 "ours: {}, peer: {}",
261 ours_f.flags.superseded, theirs_f.flags.superseded
262 ),
263 });
264 }
265 if ours_f.assertion.text.trim() != theirs_f.assertion.text.trim() {
266 conflicts.push(Conflict {
267 finding_id: (*id).to_string(),
268 kind: ConflictKind::AssertionTextDiverged,
269 detail:
270 "matching id but diverging assertion text — possible content-address collision"
271 .to_string(),
272 });
273 }
274 }
275
276 conflicts.sort_by(|a, b| {
277 a.finding_id
278 .cmp(&b.finding_id)
279 .then_with(|| a.kind.as_str().cmp(b.kind.as_str()))
280 });
281 conflicts
282}
283
284pub fn record_locator_failure(
292 project: &mut Project,
293 peer_id: &str,
294 vfr_id: &str,
295 locator: &str,
296 status: u16,
297) -> SyncReport {
298 let now = Utc::now().to_rfc3339();
299 let our_hash = snapshot_hash(project);
300 let frontier_id = project.frontier_id();
301 let detail = format!("locator {locator} returned HTTP {status}");
302
303 let synced_event = StateEvent {
304 schema: EVENT_SCHEMA.to_string(),
305 id: String::new(),
306 kind: "frontier.synced_with_peer".to_string(),
307 target: StateTarget {
308 r#type: "frontier_observation".to_string(),
309 id: frontier_id.clone(),
310 },
311 actor: StateActor {
312 id: "federation".to_string(),
313 r#type: "system".to_string(),
314 },
315 timestamp: now.clone(),
316 reason: format!("synced with peer {peer_id} (broken locator)"),
317 before_hash: NULL_HASH.to_string(),
318 after_hash: NULL_HASH.to_string(),
319 payload: json!({
320 "peer_id": peer_id,
321 "peer_snapshot_hash": "",
322 "our_snapshot_hash": our_hash,
323 "divergence_count": 1,
324 }),
325 caveats: Vec::new(),
326 signature: None,
327 schema_artifact_id: None,
328 };
329 let mut sync_ev = synced_event;
330 sync_ev.id = compute_event_id(&sync_ev);
331
332 let conflict_ev = StateEvent {
333 schema: EVENT_SCHEMA.to_string(),
334 id: String::new(),
335 kind: "frontier.conflict_detected".to_string(),
336 target: StateTarget {
337 r#type: "frontier_observation".to_string(),
338 id: frontier_id.clone(),
339 },
340 actor: StateActor {
341 id: "federation".to_string(),
342 r#type: "system".to_string(),
343 },
344 timestamp: now.clone(),
345 reason: format!("peer={peer_id} kind=broken_locator {detail}"),
346 before_hash: NULL_HASH.to_string(),
347 after_hash: NULL_HASH.to_string(),
348 payload: json!({
349 "peer_id": peer_id,
350 "finding_id": vfr_id,
351 "kind": "broken_locator",
352 "detail": detail,
353 }),
354 caveats: Vec::new(),
355 signature: None,
356 schema_artifact_id: None,
357 };
358 let mut conflict_ev = conflict_ev;
359 conflict_ev.id = compute_event_id(&conflict_ev);
360
361 project.events.push(sync_ev);
362 project.events.push(conflict_ev);
363
364 SyncReport {
365 peer_id: peer_id.to_string(),
366 our_snapshot_hash: our_hash,
367 peer_snapshot_hash: String::new(),
368 conflicts: vec![Conflict {
369 finding_id: vfr_id.to_string(),
370 kind: ConflictKind::BrokenLocator,
371 detail,
372 }],
373 events_appended: 2,
374 }
375}
376
377pub fn record_unverified_entry(
383 project: &mut Project,
384 peer_id: &str,
385 vfr_id: &str,
386 reason: &str,
387) -> SyncReport {
388 let now = Utc::now().to_rfc3339();
389 let our_hash = snapshot_hash(project);
390 let frontier_id = project.frontier_id();
391
392 let mut sync_ev = StateEvent {
393 schema: EVENT_SCHEMA.to_string(),
394 id: String::new(),
395 kind: "frontier.synced_with_peer".to_string(),
396 target: StateTarget {
397 r#type: "frontier_observation".to_string(),
398 id: frontier_id.clone(),
399 },
400 actor: StateActor {
401 id: "federation".to_string(),
402 r#type: "system".to_string(),
403 },
404 timestamp: now.clone(),
405 reason: format!("synced with peer {peer_id} (unverified entry; halted)"),
406 before_hash: NULL_HASH.to_string(),
407 after_hash: NULL_HASH.to_string(),
408 payload: json!({
409 "peer_id": peer_id,
410 "peer_snapshot_hash": "",
411 "our_snapshot_hash": our_hash,
412 "divergence_count": 1,
413 }),
414 caveats: Vec::new(),
415 signature: None,
416 schema_artifact_id: None,
417 };
418 sync_ev.id = compute_event_id(&sync_ev);
419
420 let mut conflict_ev = StateEvent {
421 schema: EVENT_SCHEMA.to_string(),
422 id: String::new(),
423 kind: "frontier.conflict_detected".to_string(),
424 target: StateTarget {
425 r#type: "frontier_observation".to_string(),
426 id: frontier_id.clone(),
427 },
428 actor: StateActor {
429 id: "federation".to_string(),
430 r#type: "system".to_string(),
431 },
432 timestamp: now.clone(),
433 reason: format!("peer={peer_id} kind=unverified_peer_entry {reason}"),
434 before_hash: NULL_HASH.to_string(),
435 after_hash: NULL_HASH.to_string(),
436 payload: json!({
437 "peer_id": peer_id,
438 "finding_id": vfr_id,
439 "kind": "unverified_peer_entry",
440 "detail": reason,
441 }),
442 caveats: Vec::new(),
443 signature: None,
444 schema_artifact_id: None,
445 };
446 conflict_ev.id = compute_event_id(&conflict_ev);
447
448 project.events.push(sync_ev);
449 project.events.push(conflict_ev);
450
451 SyncReport {
452 peer_id: peer_id.to_string(),
453 our_snapshot_hash: our_hash,
454 peer_snapshot_hash: String::new(),
455 conflicts: vec![Conflict {
456 finding_id: vfr_id.to_string(),
457 kind: ConflictKind::UnverifiedPeerEntry,
458 detail: reason.to_string(),
459 }],
460 events_appended: 2,
461 }
462}
463
464pub fn classify_peer_event_set(
488 our_events: &[StateEvent],
489 peer_events: &[StateEvent],
490) -> crate::ancestor_closure::AncestorAction {
491 use crate::ancestor_closure::classify_ancestor_action;
492 let combined: Vec<(String, Vec<String>)> = our_events
500 .iter()
501 .chain(peer_events.iter())
502 .map(|e| (e.id.clone(), event_parents(e)))
503 .collect();
504 classify_ancestor_action(combined)
505}
506
507fn event_parents(e: &StateEvent) -> Vec<String> {
513 e.payload
514 .get("parents")
515 .and_then(|v| v.as_array())
516 .map(|arr| {
517 arr.iter()
518 .filter_map(|p| p.as_str().map(String::from))
519 .collect()
520 })
521 .unwrap_or_default()
522}
523
524pub fn sync_with_peer(project: &mut Project, peer_id: &str, peer: &Project) -> SyncReport {
534 let our_hash = snapshot_hash(project);
535 let peer_hash = snapshot_hash(peer);
536 let conflicts = diff_frontiers(project, peer);
537
538 let now = Utc::now().to_rfc3339();
539 let frontier_id = project.frontier_id().clone();
540
541 let synced_reason = format!("synced with peer {peer_id}");
552 let mut synced_event = StateEvent {
553 schema: EVENT_SCHEMA.to_string(),
554 id: String::new(),
555 kind: "frontier.synced_with_peer".to_string(),
556 target: StateTarget {
557 r#type: "frontier_observation".to_string(),
558 id: frontier_id.clone(),
559 },
560 actor: StateActor {
561 id: "federation".to_string(),
562 r#type: "system".to_string(),
563 },
564 timestamp: now.clone(),
565 reason: synced_reason,
566 before_hash: NULL_HASH.to_string(),
567 after_hash: NULL_HASH.to_string(),
568 payload: json!({
569 "peer_id": peer_id,
570 "peer_snapshot_hash": peer_hash,
571 "our_snapshot_hash": our_hash,
572 "divergence_count": conflicts.len(),
573 }),
574 caveats: Vec::new(),
575 signature: None,
576 schema_artifact_id: None,
577 };
578 synced_event.id = compute_event_id(&synced_event);
579
580 let mut conflict_events: Vec<StateEvent> = Vec::with_capacity(conflicts.len());
581 for c in &conflicts {
582 let reason = format!("peer={peer_id} kind={} {}", c.kind.as_str(), c.detail);
583 let mut ev = StateEvent {
584 schema: EVENT_SCHEMA.to_string(),
585 id: String::new(),
586 kind: "frontier.conflict_detected".to_string(),
587 target: StateTarget {
588 r#type: "frontier_observation".to_string(),
589 id: frontier_id.clone(),
590 },
591 actor: StateActor {
592 id: "federation".to_string(),
593 r#type: "system".to_string(),
594 },
595 timestamp: now.clone(),
596 reason,
597 before_hash: NULL_HASH.to_string(),
598 after_hash: NULL_HASH.to_string(),
599 payload: json!({
600 "peer_id": peer_id,
601 "finding_id": c.finding_id,
602 "kind": c.kind.as_str(),
603 "detail": c.detail,
604 }),
605 caveats: Vec::new(),
606 signature: None,
607 schema_artifact_id: None,
608 };
609 ev.id = compute_event_id(&ev);
610 conflict_events.push(ev);
611 }
612
613 let events_appended = 1 + conflict_events.len();
614 project.events.push(synced_event);
615 project.events.extend(conflict_events);
616
617 SyncReport {
618 peer_id: peer_id.to_string(),
619 our_snapshot_hash: our_hash,
620 peer_snapshot_hash: peer_hash,
621 conflicts,
622 events_appended,
623 }
624}
625
626#[derive(Debug)]
632pub enum DiscoveryResult {
633 Resolved(Project),
637 EntryNotFound { vfr_id: String, status: u16 },
640 UnverifiedEntry { vfr_id: String, reason: String },
643 BrokenLocator {
646 vfr_id: String,
647 locator: String,
648 status: u16,
649 },
650 Unreachable { url: String, error: String },
652}
653
654pub fn discover_peer_frontier(
666 hub_url: &str,
667 vfr_id: &str,
668 expected_owner_pubkey: Option<&str>,
669) -> DiscoveryResult {
670 let hub = hub_url.trim_end_matches('/').to_string();
671 let entries_url = format!("{hub}/entries/{vfr_id}");
672 let vfr_owned = vfr_id.to_string();
673 let expected = expected_owner_pubkey.map(|s| s.to_string());
674
675 std::thread::spawn(move || -> DiscoveryResult {
676 let resp = match reqwest::blocking::get(&entries_url) {
677 Ok(r) => r,
678 Err(e) => {
679 return DiscoveryResult::Unreachable {
680 url: entries_url.clone(),
681 error: e.to_string(),
682 };
683 }
684 };
685 let status = resp.status();
686 if status.as_u16() == 404 {
687 return DiscoveryResult::EntryNotFound {
688 vfr_id: vfr_owned,
689 status: status.as_u16(),
690 };
691 }
692 if !status.is_success() {
693 return DiscoveryResult::Unreachable {
694 url: entries_url.clone(),
695 error: format!("hub returned HTTP {status}"),
696 };
697 }
698 let body = match resp.text() {
699 Ok(b) => b,
700 Err(e) => {
701 return DiscoveryResult::Unreachable {
702 url: entries_url.clone(),
703 error: format!("read body: {e}"),
704 };
705 }
706 };
707 let entry: crate::registry::RegistryEntry = match serde_json::from_str(&body) {
708 Ok(e) => e,
709 Err(e) => {
710 return DiscoveryResult::UnverifiedEntry {
711 vfr_id: vfr_owned,
712 reason: format!("parse registry entry: {e}"),
713 };
714 }
715 };
716
717 match crate::registry::verify_entry(&entry) {
719 Ok(true) => {}
720 Ok(false) => {
721 return DiscoveryResult::UnverifiedEntry {
722 vfr_id: vfr_owned,
723 reason: "registry entry signature does not verify against entry.owner_pubkey"
724 .to_string(),
725 };
726 }
727 Err(e) => {
728 return DiscoveryResult::UnverifiedEntry {
729 vfr_id: vfr_owned,
730 reason: format!("signature verification error: {e}"),
731 };
732 }
733 }
734 if let Some(want) = expected.as_deref()
736 && entry.owner_pubkey != want
737 {
738 return DiscoveryResult::UnverifiedEntry {
739 vfr_id: vfr_owned,
740 reason: format!(
741 "entry owner_pubkey {} != expected peer pubkey {}",
742 &entry.owner_pubkey[..16],
743 &want[..16]
744 ),
745 };
746 }
747
748 let locator = entry.network_locator.clone();
750 let mresp = match reqwest::blocking::get(&locator) {
751 Ok(r) => r,
752 Err(e) => {
753 return DiscoveryResult::BrokenLocator {
754 vfr_id: vfr_owned,
755 locator,
756 status: 0,
757 }
758 .with_error(e.to_string());
759 }
760 };
761 let mstatus = mresp.status();
762 if !mstatus.is_success() {
763 return DiscoveryResult::BrokenLocator {
764 vfr_id: vfr_owned,
765 locator,
766 status: mstatus.as_u16(),
767 };
768 }
769 let mbody = match mresp.text() {
770 Ok(b) => b,
771 Err(e) => {
772 return DiscoveryResult::BrokenLocator {
773 vfr_id: vfr_owned,
774 locator,
775 status: 0,
776 }
777 .with_error(e.to_string());
778 }
779 };
780 match serde_json::from_str::<Project>(&mbody) {
781 Ok(p) => DiscoveryResult::Resolved(p),
782 Err(e) => DiscoveryResult::BrokenLocator {
783 vfr_id: vfr_owned,
784 locator,
785 status: 0,
786 }
787 .with_error(format!("manifest parse: {e}")),
788 }
789 })
790 .join()
791 .unwrap_or(DiscoveryResult::Unreachable {
792 url: hub_url.to_string(),
793 error: "discovery thread panicked".to_string(),
794 })
795}
796
797impl DiscoveryResult {
798 fn with_error(self, _ctx: String) -> Self {
799 self
802 }
803}
804
805pub fn fetch_peer_frontier(url: &str) -> Result<Project, String> {
820 let url_owned = url.to_string();
821 let handle = std::thread::spawn(move || -> Result<Project, String> {
822 let resp = reqwest::blocking::get(&url_owned)
823 .map_err(|e| format!("HTTP GET {url_owned} failed: {e}"))?;
824 let status = resp.status();
825 if !status.is_success() {
826 return Err(format!("peer returned HTTP {status}"));
827 }
828 let body = resp
829 .text()
830 .map_err(|e| format!("read body from {url_owned}: {e}"))?;
831 serde_json::from_str(&body)
832 .map_err(|e| format!("parse peer frontier from {url_owned}: {e}"))
833 });
834 handle
835 .join()
836 .map_err(|_| "fetch thread panicked".to_string())?
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842
843 fn good() -> PeerHub {
844 PeerHub {
845 id: "hub:test".into(),
846 url: "https://example.invalid/".into(),
847 public_key: "00".repeat(32),
848 added_at: "2026-04-27T00:00:00Z".into(),
849 note: String::new(),
850 }
851 }
852
853 #[test]
854 fn validates_correct_shape() {
855 assert!(good().validate().is_ok());
856 }
857
858 #[test]
859 fn rejects_empty_id() {
860 let mut p = good();
861 p.id = " ".into();
862 assert!(p.validate().is_err());
863 }
864
865 #[test]
866 fn rejects_http_url() {
867 let mut p = good();
868 p.url = "http://insecure.example/".into();
869 assert!(p.validate().is_err());
870 }
871
872 #[test]
873 fn rejects_short_pubkey() {
874 let mut p = good();
875 p.public_key = "abcd".into();
876 assert!(p.validate().is_err());
877 }
878
879 #[test]
880 fn rejects_non_hex_pubkey() {
881 let mut p = good();
882 p.public_key = "z".repeat(64);
883 assert!(p.validate().is_err());
884 }
885
886 use crate::bundle::{
889 Assertion, Conditions, Confidence, Evidence, Extraction, FindingBundle, Flags, Provenance,
890 ReviewState,
891 };
892 use crate::project::{self, Project};
893
894 fn finding(id: &str, score: f64) -> FindingBundle {
895 let mut b = FindingBundle::new(
896 Assertion {
897 text: format!("claim {id}"),
898 assertion_type: "mechanism".into(),
899 entities: vec![],
900 relation: None,
901 direction: None,
902 causal_claim: None,
903 causal_evidence_grade: None,
904 },
905 Evidence {
906 evidence_type: "experimental".into(),
907 model_system: String::new(),
908 species: None,
909 method: String::new(),
910 sample_size: Some("n=30".into()),
911 effect_size: None,
912 p_value: None,
913 replicated: false,
914 replication_count: None,
915 evidence_spans: vec![],
916 },
917 Conditions {
918 text: String::new(),
919 species_verified: vec![],
920 species_unverified: vec![],
921 in_vitro: false,
922 in_vivo: false,
923 human_data: false,
924 clinical_trial: false,
925 concentration_range: None,
926 duration: None,
927 age_group: None,
928 cell_type: None,
929 },
930 Confidence::raw(score, "test", 0.85),
931 Provenance {
932 source_type: "published_paper".into(),
933 doi: None,
934 pmid: None,
935 pmc: None,
936 openalex_id: None,
937 url: None,
938 title: "Test".into(),
939 authors: vec![],
940 year: Some(2025),
941 journal: None,
942 license: None,
943 publisher: None,
944 funders: vec![],
945 extraction: Extraction::default(),
946 review: None,
947 citation_count: None,
948 },
949 Flags::default(),
950 );
951 b.id = id.to_string();
952 b
953 }
954
955 fn assemble(name: &str, findings: Vec<FindingBundle>) -> Project {
956 project::assemble(name, findings, 1, 0, "test")
957 }
958
959 #[test]
960 fn diff_identical_frontiers_returns_no_conflicts() {
961 let f = finding("vf_001", 0.7);
962 let ours = assemble("ours", vec![f.clone()]);
963 let theirs = assemble("theirs", vec![f]);
964 let conflicts = diff_frontiers(&ours, &theirs);
965 assert_eq!(conflicts.len(), 0);
966 }
967
968 #[test]
969 fn diff_detects_missing_in_peer_and_locally() {
970 let f1 = finding("vf_001", 0.7);
971 let f2 = finding("vf_002", 0.7);
972 let ours = assemble("ours", vec![f1.clone()]);
973 let theirs = assemble("theirs", vec![f2.clone()]);
974 let conflicts = diff_frontiers(&ours, &theirs);
975 let kinds: Vec<&str> = conflicts.iter().map(|c| c.kind.as_str()).collect();
976 assert!(kinds.contains(&"missing_in_peer"));
977 assert!(kinds.contains(&"missing_locally"));
978 }
979
980 #[test]
981 fn diff_detects_confidence_divergence_above_threshold() {
982 let mut f_ours = finding("vf_001", 0.85);
983 let mut f_theirs = finding("vf_001", 0.55);
984 f_ours.id = "vf_001".into();
986 f_theirs.id = "vf_001".into();
987 let ours = assemble("ours", vec![f_ours]);
988 let theirs = assemble("theirs", vec![f_theirs]);
989 let conflicts = diff_frontiers(&ours, &theirs);
990 assert!(
991 conflicts
992 .iter()
993 .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
994 "expected confidence_diverged in {conflicts:?}"
995 );
996 }
997
998 #[test]
999 fn diff_ignores_confidence_drift_below_threshold() {
1000 let mut f_ours = finding("vf_001", 0.700);
1001 let mut f_theirs = finding("vf_001", 0.730);
1002 f_ours.id = "vf_001".into();
1003 f_theirs.id = "vf_001".into();
1004 let ours = assemble("ours", vec![f_ours]);
1005 let theirs = assemble("theirs", vec![f_theirs]);
1006 let conflicts = diff_frontiers(&ours, &theirs);
1007 assert!(
1008 !conflicts
1009 .iter()
1010 .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
1011 "0.03 drift should not flag: {conflicts:?}"
1012 );
1013 }
1014
1015 #[test]
1016 fn diff_detects_retracted_divergence() {
1017 let mut f_ours = finding("vf_001", 0.7);
1018 let mut f_theirs = finding("vf_001", 0.7);
1019 f_ours.id = "vf_001".into();
1020 f_theirs.id = "vf_001".into();
1021 f_theirs.flags.retracted = true;
1022 let ours = assemble("ours", vec![f_ours]);
1023 let theirs = assemble("theirs", vec![f_theirs]);
1024 let conflicts = diff_frontiers(&ours, &theirs);
1025 assert!(
1026 conflicts
1027 .iter()
1028 .any(|c| c.kind == ConflictKind::RetractedDiverged)
1029 );
1030 }
1031
1032 #[test]
1033 fn diff_detects_review_state_divergence() {
1034 let mut f_ours = finding("vf_001", 0.7);
1035 let mut f_theirs = finding("vf_001", 0.7);
1036 f_ours.id = "vf_001".into();
1037 f_theirs.id = "vf_001".into();
1038 f_theirs.flags.review_state = Some(ReviewState::Contested);
1039 let ours = assemble("ours", vec![f_ours]);
1040 let theirs = assemble("theirs", vec![f_theirs]);
1041 let conflicts = diff_frontiers(&ours, &theirs);
1042 assert!(
1043 conflicts
1044 .iter()
1045 .any(|c| c.kind == ConflictKind::ReviewStateDiverged)
1046 );
1047 }
1048
1049 #[test]
1050 fn diff_detects_assertion_text_divergence() {
1051 let mut f_ours = finding("vf_001", 0.7);
1052 let mut f_theirs = finding("vf_001", 0.7);
1053 f_ours.id = "vf_001".into();
1054 f_theirs.id = "vf_001".into();
1055 f_theirs.assertion.text = "different claim".into();
1056 let ours = assemble("ours", vec![f_ours]);
1057 let theirs = assemble("theirs", vec![f_theirs]);
1058 let conflicts = diff_frontiers(&ours, &theirs);
1059 assert!(
1060 conflicts
1061 .iter()
1062 .any(|c| c.kind == ConflictKind::AssertionTextDiverged)
1063 );
1064 }
1065
1066 #[test]
1067 fn sync_appends_one_synced_event_plus_one_per_conflict() {
1068 let mut f_ours = finding("vf_001", 0.7);
1069 let mut f_theirs = finding("vf_001", 0.7);
1070 f_ours.id = "vf_001".into();
1071 f_theirs.id = "vf_001".into();
1072 f_theirs.flags.retracted = true;
1073 let mut ours = assemble("ours", vec![f_ours]);
1074 let theirs = assemble("theirs", vec![f_theirs]);
1075 let events_before = ours.events.len();
1076 let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1077 assert_eq!(report.conflicts.len(), 1);
1078 assert_eq!(report.events_appended, 2); assert_eq!(ours.events.len() - events_before, 2);
1080 let sync_ev = &ours.events[events_before];
1082 assert_eq!(sync_ev.kind, "frontier.synced_with_peer");
1083 assert_eq!(sync_ev.payload["divergence_count"].as_u64(), Some(1));
1084 let conf_ev = &ours.events[events_before + 1];
1086 assert_eq!(conf_ev.kind, "frontier.conflict_detected");
1087 assert_eq!(conf_ev.payload["kind"], "retracted_diverged");
1088 }
1089
1090 #[test]
1091 fn sync_with_clean_diff_emits_zero_divergence_event() {
1092 let f = finding("vf_001", 0.7);
1093 let mut ours = assemble("ours", vec![f.clone()]);
1094 let theirs = assemble("theirs", vec![f]);
1095 let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1096 assert_eq!(report.conflicts.len(), 0);
1097 assert_eq!(report.events_appended, 1);
1098 let last = ours.events.last().unwrap();
1099 assert_eq!(last.kind, "frontier.synced_with_peer");
1100 assert_eq!(last.payload["divergence_count"].as_u64(), Some(0));
1101 }
1102
1103 fn make_event(id: &str, parents: &[&str]) -> StateEvent {
1104 StateEvent {
1105 schema: EVENT_SCHEMA.to_string(),
1106 id: id.to_string(),
1107 kind: "test.event".to_string(),
1108 target: StateTarget {
1109 r#type: "frontier_observation".to_string(),
1110 id: "vfr_test".to_string(),
1111 },
1112 actor: StateActor {
1113 id: "test".to_string(),
1114 r#type: "system".to_string(),
1115 },
1116 timestamp: "2026-05-09T00:00:00Z".to_string(),
1117 reason: "test".to_string(),
1118 before_hash: NULL_HASH.to_string(),
1119 after_hash: NULL_HASH.to_string(),
1120 payload: json!({"parents": parents}),
1121 caveats: vec![],
1122 signature: None,
1123 schema_artifact_id: None,
1124 }
1125 }
1126
1127 #[test]
1128 fn classify_returns_proceed_when_peer_set_is_complete() {
1129 let our_events = vec![make_event("e1", &[])];
1132 let peer_events = vec![make_event("e2", &["e1"])];
1133 let action = classify_peer_event_set(&our_events, &peer_events);
1134 assert!(matches!(
1135 action,
1136 crate::ancestor_closure::AncestorAction::Proceed
1137 ));
1138 }
1139
1140 #[test]
1141 fn classify_returns_fetch_when_peer_references_missing_ancestor() {
1142 let our_events = vec![make_event("e1", &[])];
1145 let peer_events = vec![make_event("e3", &["e2"])];
1146 let action = classify_peer_event_set(&our_events, &peer_events);
1147 match action {
1148 crate::ancestor_closure::AncestorAction::Fetch { missing } => {
1149 assert_eq!(missing, vec!["e2"]);
1150 }
1151 other => panic!("expected Fetch, got {other:?}"),
1152 }
1153 }
1154
1155 #[test]
1156 fn classify_handles_events_without_explicit_parents() {
1157 let our_events = vec![StateEvent {
1160 schema: EVENT_SCHEMA.to_string(),
1161 id: "e1".to_string(),
1162 kind: "test.event".to_string(),
1163 target: StateTarget {
1164 r#type: "frontier_observation".to_string(),
1165 id: "vfr_test".to_string(),
1166 },
1167 actor: StateActor {
1168 id: "test".to_string(),
1169 r#type: "system".to_string(),
1170 },
1171 timestamp: "2026-05-09T00:00:00Z".to_string(),
1172 reason: "test".to_string(),
1173 before_hash: NULL_HASH.to_string(),
1174 after_hash: NULL_HASH.to_string(),
1175 payload: json!({}),
1176 caveats: vec![],
1177 signature: None,
1178 schema_artifact_id: None,
1179 }];
1180 let peer_events = vec![];
1181 let action = classify_peer_event_set(&our_events, &peer_events);
1182 assert!(matches!(
1183 action,
1184 crate::ancestor_closure::AncestorAction::Proceed
1185 ));
1186 }
1187}