1use chrono::Utc;
40use serde::{Deserialize, Serialize};
41use serde_json::json;
42
43use crate::events::{
44 EVENT_SCHEMA, NULL_HASH, StateActor, StateEvent, StateTarget, compute_event_id, snapshot_hash,
45};
46use crate::project::Project;
47
48#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct PeerHub {
54 pub id: String,
58 pub url: String,
62 pub public_key: String,
66 pub added_at: String,
69 #[serde(default, skip_serializing_if = "String::is_empty")]
72 pub note: String,
73}
74
75impl PeerHub {
76 pub fn validate(&self) -> Result<(), String> {
80 if self.id.trim().is_empty() {
81 return Err("peer id must be non-empty".into());
82 }
83 if !self.url.starts_with("https://") {
84 return Err(format!(
85 "peer url must start with `https://` (got `{}`)",
86 self.url
87 ));
88 }
89 let trimmed = self.public_key.trim();
90 if trimmed.len() != 64 {
91 return Err(format!(
92 "peer public_key must be 64 hex chars (got {})",
93 trimmed.len()
94 ));
95 }
96 if hex::decode(trimmed).is_err() {
97 return Err("peer public_key must be valid hex".into());
98 }
99 Ok(())
100 }
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
108#[serde(rename_all = "snake_case")]
109pub enum ConflictKind {
110 MissingInPeer,
112 MissingLocally,
114 ConfidenceDiverged,
117 RetractedDiverged,
119 ReviewStateDiverged,
121 SupersededDiverged,
123 AssertionTextDiverged,
128 BrokenLocator,
137 UnverifiedPeerEntry,
143}
144
145impl ConflictKind {
146 pub fn as_str(self) -> &'static str {
147 match self {
148 ConflictKind::MissingInPeer => "missing_in_peer",
149 ConflictKind::MissingLocally => "missing_locally",
150 ConflictKind::ConfidenceDiverged => "confidence_diverged",
151 ConflictKind::RetractedDiverged => "retracted_diverged",
152 ConflictKind::ReviewStateDiverged => "review_state_diverged",
153 ConflictKind::SupersededDiverged => "superseded_diverged",
154 ConflictKind::AssertionTextDiverged => "assertion_text_diverged",
155 ConflictKind::BrokenLocator => "broken_locator",
156 ConflictKind::UnverifiedPeerEntry => "unverified_peer_entry",
157 }
158 }
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct Conflict {
164 pub finding_id: String,
165 pub kind: ConflictKind,
166 pub detail: String,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct SyncReport {
174 pub peer_id: String,
175 pub our_snapshot_hash: String,
176 pub peer_snapshot_hash: String,
177 pub conflicts: Vec<Conflict>,
178 pub events_appended: usize,
181}
182
183#[must_use]
190pub fn diff_frontiers(ours: &Project, theirs: &Project) -> Vec<Conflict> {
191 use std::collections::HashMap;
192
193 let our_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
194 ours.findings.iter().map(|f| (f.id.as_str(), f)).collect();
195 let their_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
196 theirs.findings.iter().map(|f| (f.id.as_str(), f)).collect();
197
198 let mut conflicts = Vec::new();
199
200 for id in our_by_id.keys() {
202 if !their_by_id.contains_key(id) {
203 conflicts.push(Conflict {
204 finding_id: (*id).to_string(),
205 kind: ConflictKind::MissingInPeer,
206 detail: "present locally, absent in peer".to_string(),
207 });
208 }
209 }
210 for id in their_by_id.keys() {
212 if !our_by_id.contains_key(id) {
213 conflicts.push(Conflict {
214 finding_id: (*id).to_string(),
215 kind: ConflictKind::MissingLocally,
216 detail: "present in peer, absent locally".to_string(),
217 });
218 }
219 }
220 for (id, ours_f) in &our_by_id {
222 let Some(theirs_f) = their_by_id.get(id) else {
223 continue;
224 };
225 if (ours_f.confidence.score - theirs_f.confidence.score).abs() > 0.05 {
226 conflicts.push(Conflict {
227 finding_id: (*id).to_string(),
228 kind: ConflictKind::ConfidenceDiverged,
229 detail: format!(
230 "ours: {:.3}, peer: {:.3}",
231 ours_f.confidence.score, theirs_f.confidence.score
232 ),
233 });
234 }
235 if ours_f.flags.retracted != theirs_f.flags.retracted {
236 conflicts.push(Conflict {
237 finding_id: (*id).to_string(),
238 kind: ConflictKind::RetractedDiverged,
239 detail: format!(
240 "ours: {}, peer: {}",
241 ours_f.flags.retracted, theirs_f.flags.retracted
242 ),
243 });
244 }
245 if ours_f.flags.review_state != theirs_f.flags.review_state {
246 conflicts.push(Conflict {
247 finding_id: (*id).to_string(),
248 kind: ConflictKind::ReviewStateDiverged,
249 detail: format!(
250 "ours: {:?}, peer: {:?}",
251 ours_f.flags.review_state, theirs_f.flags.review_state
252 ),
253 });
254 }
255 if ours_f.flags.superseded != theirs_f.flags.superseded {
256 conflicts.push(Conflict {
257 finding_id: (*id).to_string(),
258 kind: ConflictKind::SupersededDiverged,
259 detail: format!(
260 "ours: {}, peer: {}",
261 ours_f.flags.superseded, theirs_f.flags.superseded
262 ),
263 });
264 }
265 if ours_f.assertion.text.trim() != theirs_f.assertion.text.trim() {
266 conflicts.push(Conflict {
267 finding_id: (*id).to_string(),
268 kind: ConflictKind::AssertionTextDiverged,
269 detail:
270 "matching id but diverging assertion text — possible content-address collision"
271 .to_string(),
272 });
273 }
274 }
275
276 conflicts.sort_by(|a, b| {
277 a.finding_id
278 .cmp(&b.finding_id)
279 .then_with(|| a.kind.as_str().cmp(b.kind.as_str()))
280 });
281 conflicts
282}
283
284pub fn record_locator_failure(
292 project: &mut Project,
293 peer_id: &str,
294 vfr_id: &str,
295 locator: &str,
296 status: u16,
297) -> SyncReport {
298 let now = Utc::now().to_rfc3339();
299 let our_hash = snapshot_hash(project);
300 let frontier_id = project.frontier_id();
301 let detail = format!("locator {locator} returned HTTP {status}");
302
303 let synced_event = StateEvent {
304 schema: EVENT_SCHEMA.to_string(),
305 id: String::new(),
306 kind: "frontier.synced_with_peer".to_string(),
307 target: StateTarget {
308 r#type: "frontier_observation".to_string(),
309 id: frontier_id.clone(),
310 },
311 actor: StateActor {
312 id: "federation".to_string(),
313 r#type: "system".to_string(),
314 },
315 timestamp: now.clone(),
316 reason: format!("synced with peer {peer_id} (broken locator)"),
317 before_hash: NULL_HASH.to_string(),
318 after_hash: NULL_HASH.to_string(),
319 payload: json!({
320 "peer_id": peer_id,
321 "peer_snapshot_hash": "",
322 "our_snapshot_hash": our_hash,
323 "divergence_count": 1,
324 }),
325 caveats: Vec::new(),
326 signature: None,
327 };
328 let mut sync_ev = synced_event;
329 sync_ev.id = compute_event_id(&sync_ev);
330
331 let conflict_ev = StateEvent {
332 schema: EVENT_SCHEMA.to_string(),
333 id: String::new(),
334 kind: "frontier.conflict_detected".to_string(),
335 target: StateTarget {
336 r#type: "frontier_observation".to_string(),
337 id: frontier_id.clone(),
338 },
339 actor: StateActor {
340 id: "federation".to_string(),
341 r#type: "system".to_string(),
342 },
343 timestamp: now.clone(),
344 reason: format!("peer={peer_id} kind=broken_locator {detail}"),
345 before_hash: NULL_HASH.to_string(),
346 after_hash: NULL_HASH.to_string(),
347 payload: json!({
348 "peer_id": peer_id,
349 "finding_id": vfr_id,
350 "kind": "broken_locator",
351 "detail": detail,
352 }),
353 caveats: Vec::new(),
354 signature: None,
355 };
356 let mut conflict_ev = conflict_ev;
357 conflict_ev.id = compute_event_id(&conflict_ev);
358
359 project.events.push(sync_ev);
360 project.events.push(conflict_ev);
361
362 SyncReport {
363 peer_id: peer_id.to_string(),
364 our_snapshot_hash: our_hash,
365 peer_snapshot_hash: String::new(),
366 conflicts: vec![Conflict {
367 finding_id: vfr_id.to_string(),
368 kind: ConflictKind::BrokenLocator,
369 detail,
370 }],
371 events_appended: 2,
372 }
373}
374
375pub fn record_unverified_entry(
381 project: &mut Project,
382 peer_id: &str,
383 vfr_id: &str,
384 reason: &str,
385) -> SyncReport {
386 let now = Utc::now().to_rfc3339();
387 let our_hash = snapshot_hash(project);
388 let frontier_id = project.frontier_id();
389
390 let mut sync_ev = StateEvent {
391 schema: EVENT_SCHEMA.to_string(),
392 id: String::new(),
393 kind: "frontier.synced_with_peer".to_string(),
394 target: StateTarget {
395 r#type: "frontier_observation".to_string(),
396 id: frontier_id.clone(),
397 },
398 actor: StateActor {
399 id: "federation".to_string(),
400 r#type: "system".to_string(),
401 },
402 timestamp: now.clone(),
403 reason: format!("synced with peer {peer_id} (unverified entry; halted)"),
404 before_hash: NULL_HASH.to_string(),
405 after_hash: NULL_HASH.to_string(),
406 payload: json!({
407 "peer_id": peer_id,
408 "peer_snapshot_hash": "",
409 "our_snapshot_hash": our_hash,
410 "divergence_count": 1,
411 }),
412 caveats: Vec::new(),
413 signature: None,
414 };
415 sync_ev.id = compute_event_id(&sync_ev);
416
417 let mut conflict_ev = StateEvent {
418 schema: EVENT_SCHEMA.to_string(),
419 id: String::new(),
420 kind: "frontier.conflict_detected".to_string(),
421 target: StateTarget {
422 r#type: "frontier_observation".to_string(),
423 id: frontier_id.clone(),
424 },
425 actor: StateActor {
426 id: "federation".to_string(),
427 r#type: "system".to_string(),
428 },
429 timestamp: now.clone(),
430 reason: format!("peer={peer_id} kind=unverified_peer_entry {reason}"),
431 before_hash: NULL_HASH.to_string(),
432 after_hash: NULL_HASH.to_string(),
433 payload: json!({
434 "peer_id": peer_id,
435 "finding_id": vfr_id,
436 "kind": "unverified_peer_entry",
437 "detail": reason,
438 }),
439 caveats: Vec::new(),
440 signature: None,
441 };
442 conflict_ev.id = compute_event_id(&conflict_ev);
443
444 project.events.push(sync_ev);
445 project.events.push(conflict_ev);
446
447 SyncReport {
448 peer_id: peer_id.to_string(),
449 our_snapshot_hash: our_hash,
450 peer_snapshot_hash: String::new(),
451 conflicts: vec![Conflict {
452 finding_id: vfr_id.to_string(),
453 kind: ConflictKind::UnverifiedPeerEntry,
454 detail: reason.to_string(),
455 }],
456 events_appended: 2,
457 }
458}
459
460pub fn sync_with_peer(project: &mut Project, peer_id: &str, peer: &Project) -> SyncReport {
470 let our_hash = snapshot_hash(project);
471 let peer_hash = snapshot_hash(peer);
472 let conflicts = diff_frontiers(project, peer);
473
474 let now = Utc::now().to_rfc3339();
475 let frontier_id = project.frontier_id().clone();
476
477 let synced_reason = format!("synced with peer {peer_id}");
488 let mut synced_event = StateEvent {
489 schema: EVENT_SCHEMA.to_string(),
490 id: String::new(),
491 kind: "frontier.synced_with_peer".to_string(),
492 target: StateTarget {
493 r#type: "frontier_observation".to_string(),
494 id: frontier_id.clone(),
495 },
496 actor: StateActor {
497 id: "federation".to_string(),
498 r#type: "system".to_string(),
499 },
500 timestamp: now.clone(),
501 reason: synced_reason,
502 before_hash: NULL_HASH.to_string(),
503 after_hash: NULL_HASH.to_string(),
504 payload: json!({
505 "peer_id": peer_id,
506 "peer_snapshot_hash": peer_hash,
507 "our_snapshot_hash": our_hash,
508 "divergence_count": conflicts.len(),
509 }),
510 caveats: Vec::new(),
511 signature: None,
512 };
513 synced_event.id = compute_event_id(&synced_event);
514
515 let mut conflict_events: Vec<StateEvent> = Vec::with_capacity(conflicts.len());
516 for c in &conflicts {
517 let reason = format!("peer={peer_id} kind={} {}", c.kind.as_str(), c.detail);
518 let mut ev = StateEvent {
519 schema: EVENT_SCHEMA.to_string(),
520 id: String::new(),
521 kind: "frontier.conflict_detected".to_string(),
522 target: StateTarget {
523 r#type: "frontier_observation".to_string(),
524 id: frontier_id.clone(),
525 },
526 actor: StateActor {
527 id: "federation".to_string(),
528 r#type: "system".to_string(),
529 },
530 timestamp: now.clone(),
531 reason,
532 before_hash: NULL_HASH.to_string(),
533 after_hash: NULL_HASH.to_string(),
534 payload: json!({
535 "peer_id": peer_id,
536 "finding_id": c.finding_id,
537 "kind": c.kind.as_str(),
538 "detail": c.detail,
539 }),
540 caveats: Vec::new(),
541 signature: None,
542 };
543 ev.id = compute_event_id(&ev);
544 conflict_events.push(ev);
545 }
546
547 let events_appended = 1 + conflict_events.len();
548 project.events.push(synced_event);
549 project.events.extend(conflict_events);
550
551 SyncReport {
552 peer_id: peer_id.to_string(),
553 our_snapshot_hash: our_hash,
554 peer_snapshot_hash: peer_hash,
555 conflicts,
556 events_appended,
557 }
558}
559
560#[derive(Debug)]
566pub enum DiscoveryResult {
567 Resolved(Project),
571 EntryNotFound { vfr_id: String, status: u16 },
574 UnverifiedEntry { vfr_id: String, reason: String },
577 BrokenLocator {
580 vfr_id: String,
581 locator: String,
582 status: u16,
583 },
584 Unreachable { url: String, error: String },
586}
587
588pub fn discover_peer_frontier(
600 hub_url: &str,
601 vfr_id: &str,
602 expected_owner_pubkey: Option<&str>,
603) -> DiscoveryResult {
604 let hub = hub_url.trim_end_matches('/').to_string();
605 let entries_url = format!("{hub}/entries/{vfr_id}");
606 let vfr_owned = vfr_id.to_string();
607 let expected = expected_owner_pubkey.map(|s| s.to_string());
608
609 std::thread::spawn(move || -> DiscoveryResult {
610 let resp = match reqwest::blocking::get(&entries_url) {
611 Ok(r) => r,
612 Err(e) => {
613 return DiscoveryResult::Unreachable {
614 url: entries_url.clone(),
615 error: e.to_string(),
616 };
617 }
618 };
619 let status = resp.status();
620 if status.as_u16() == 404 {
621 return DiscoveryResult::EntryNotFound {
622 vfr_id: vfr_owned,
623 status: status.as_u16(),
624 };
625 }
626 if !status.is_success() {
627 return DiscoveryResult::Unreachable {
628 url: entries_url.clone(),
629 error: format!("hub returned HTTP {status}"),
630 };
631 }
632 let body = match resp.text() {
633 Ok(b) => b,
634 Err(e) => {
635 return DiscoveryResult::Unreachable {
636 url: entries_url.clone(),
637 error: format!("read body: {e}"),
638 };
639 }
640 };
641 let entry: crate::registry::RegistryEntry = match serde_json::from_str(&body) {
642 Ok(e) => e,
643 Err(e) => {
644 return DiscoveryResult::UnverifiedEntry {
645 vfr_id: vfr_owned,
646 reason: format!("parse registry entry: {e}"),
647 };
648 }
649 };
650
651 match crate::registry::verify_entry(&entry) {
653 Ok(true) => {}
654 Ok(false) => {
655 return DiscoveryResult::UnverifiedEntry {
656 vfr_id: vfr_owned,
657 reason: "registry entry signature does not verify against entry.owner_pubkey"
658 .to_string(),
659 };
660 }
661 Err(e) => {
662 return DiscoveryResult::UnverifiedEntry {
663 vfr_id: vfr_owned,
664 reason: format!("signature verification error: {e}"),
665 };
666 }
667 }
668 if let Some(want) = expected.as_deref()
670 && entry.owner_pubkey != want
671 {
672 return DiscoveryResult::UnverifiedEntry {
673 vfr_id: vfr_owned,
674 reason: format!(
675 "entry owner_pubkey {} != expected peer pubkey {}",
676 &entry.owner_pubkey[..16],
677 &want[..16]
678 ),
679 };
680 }
681
682 let locator = entry.network_locator.clone();
684 let mresp = match reqwest::blocking::get(&locator) {
685 Ok(r) => r,
686 Err(e) => {
687 return DiscoveryResult::BrokenLocator {
688 vfr_id: vfr_owned,
689 locator,
690 status: 0,
691 }
692 .with_error(e.to_string());
693 }
694 };
695 let mstatus = mresp.status();
696 if !mstatus.is_success() {
697 return DiscoveryResult::BrokenLocator {
698 vfr_id: vfr_owned,
699 locator,
700 status: mstatus.as_u16(),
701 };
702 }
703 let mbody = match mresp.text() {
704 Ok(b) => b,
705 Err(e) => {
706 return DiscoveryResult::BrokenLocator {
707 vfr_id: vfr_owned,
708 locator,
709 status: 0,
710 }
711 .with_error(e.to_string());
712 }
713 };
714 match serde_json::from_str::<Project>(&mbody) {
715 Ok(p) => DiscoveryResult::Resolved(p),
716 Err(e) => DiscoveryResult::BrokenLocator {
717 vfr_id: vfr_owned,
718 locator,
719 status: 0,
720 }
721 .with_error(format!("manifest parse: {e}")),
722 }
723 })
724 .join()
725 .unwrap_or(DiscoveryResult::Unreachable {
726 url: hub_url.to_string(),
727 error: "discovery thread panicked".to_string(),
728 })
729}
730
731impl DiscoveryResult {
732 fn with_error(self, _ctx: String) -> Self {
733 self
736 }
737}
738
739pub fn fetch_peer_frontier(url: &str) -> Result<Project, String> {
754 let url_owned = url.to_string();
755 let handle = std::thread::spawn(move || -> Result<Project, String> {
756 let resp = reqwest::blocking::get(&url_owned)
757 .map_err(|e| format!("HTTP GET {url_owned} failed: {e}"))?;
758 let status = resp.status();
759 if !status.is_success() {
760 return Err(format!("peer returned HTTP {status}"));
761 }
762 let body = resp
763 .text()
764 .map_err(|e| format!("read body from {url_owned}: {e}"))?;
765 serde_json::from_str(&body)
766 .map_err(|e| format!("parse peer frontier from {url_owned}: {e}"))
767 });
768 handle
769 .join()
770 .map_err(|_| "fetch thread panicked".to_string())?
771}
772
773#[cfg(test)]
774mod tests {
775 use super::*;
776
777 fn good() -> PeerHub {
778 PeerHub {
779 id: "hub:test".into(),
780 url: "https://example.invalid/".into(),
781 public_key: "00".repeat(32),
782 added_at: "2026-04-27T00:00:00Z".into(),
783 note: String::new(),
784 }
785 }
786
787 #[test]
788 fn validates_correct_shape() {
789 assert!(good().validate().is_ok());
790 }
791
792 #[test]
793 fn rejects_empty_id() {
794 let mut p = good();
795 p.id = " ".into();
796 assert!(p.validate().is_err());
797 }
798
799 #[test]
800 fn rejects_http_url() {
801 let mut p = good();
802 p.url = "http://insecure.example/".into();
803 assert!(p.validate().is_err());
804 }
805
806 #[test]
807 fn rejects_short_pubkey() {
808 let mut p = good();
809 p.public_key = "abcd".into();
810 assert!(p.validate().is_err());
811 }
812
813 #[test]
814 fn rejects_non_hex_pubkey() {
815 let mut p = good();
816 p.public_key = "z".repeat(64);
817 assert!(p.validate().is_err());
818 }
819
820 use crate::bundle::{
823 Assertion, Conditions, Confidence, Evidence, Extraction, FindingBundle, Flags, Provenance,
824 ReviewState,
825 };
826 use crate::project::{self, Project};
827
828 fn finding(id: &str, score: f64) -> FindingBundle {
829 let mut b = FindingBundle::new(
830 Assertion {
831 text: format!("claim {id}"),
832 assertion_type: "mechanism".into(),
833 entities: vec![],
834 relation: None,
835 direction: None,
836 causal_claim: None,
837 causal_evidence_grade: None,
838 },
839 Evidence {
840 evidence_type: "experimental".into(),
841 model_system: String::new(),
842 species: None,
843 method: String::new(),
844 sample_size: Some("n=30".into()),
845 effect_size: None,
846 p_value: None,
847 replicated: false,
848 replication_count: None,
849 evidence_spans: vec![],
850 },
851 Conditions {
852 text: String::new(),
853 species_verified: vec![],
854 species_unverified: vec![],
855 in_vitro: false,
856 in_vivo: false,
857 human_data: false,
858 clinical_trial: false,
859 concentration_range: None,
860 duration: None,
861 age_group: None,
862 cell_type: None,
863 },
864 Confidence::raw(score, "test", 0.85),
865 Provenance {
866 source_type: "published_paper".into(),
867 doi: None,
868 pmid: None,
869 pmc: None,
870 openalex_id: None,
871 url: None,
872 title: "Test".into(),
873 authors: vec![],
874 year: Some(2025),
875 journal: None,
876 license: None,
877 publisher: None,
878 funders: vec![],
879 extraction: Extraction::default(),
880 review: None,
881 citation_count: None,
882 },
883 Flags::default(),
884 );
885 b.id = id.to_string();
886 b
887 }
888
889 fn assemble(name: &str, findings: Vec<FindingBundle>) -> Project {
890 project::assemble(name, findings, 1, 0, "test")
891 }
892
893 #[test]
894 fn diff_identical_frontiers_returns_no_conflicts() {
895 let f = finding("vf_001", 0.7);
896 let ours = assemble("ours", vec![f.clone()]);
897 let theirs = assemble("theirs", vec![f]);
898 let conflicts = diff_frontiers(&ours, &theirs);
899 assert_eq!(conflicts.len(), 0);
900 }
901
902 #[test]
903 fn diff_detects_missing_in_peer_and_locally() {
904 let f1 = finding("vf_001", 0.7);
905 let f2 = finding("vf_002", 0.7);
906 let ours = assemble("ours", vec![f1.clone()]);
907 let theirs = assemble("theirs", vec![f2.clone()]);
908 let conflicts = diff_frontiers(&ours, &theirs);
909 let kinds: Vec<&str> = conflicts.iter().map(|c| c.kind.as_str()).collect();
910 assert!(kinds.contains(&"missing_in_peer"));
911 assert!(kinds.contains(&"missing_locally"));
912 }
913
914 #[test]
915 fn diff_detects_confidence_divergence_above_threshold() {
916 let mut f_ours = finding("vf_001", 0.85);
917 let mut f_theirs = finding("vf_001", 0.55);
918 f_ours.id = "vf_001".into();
920 f_theirs.id = "vf_001".into();
921 let ours = assemble("ours", vec![f_ours]);
922 let theirs = assemble("theirs", vec![f_theirs]);
923 let conflicts = diff_frontiers(&ours, &theirs);
924 assert!(
925 conflicts
926 .iter()
927 .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
928 "expected confidence_diverged in {conflicts:?}"
929 );
930 }
931
932 #[test]
933 fn diff_ignores_confidence_drift_below_threshold() {
934 let mut f_ours = finding("vf_001", 0.700);
935 let mut f_theirs = finding("vf_001", 0.730);
936 f_ours.id = "vf_001".into();
937 f_theirs.id = "vf_001".into();
938 let ours = assemble("ours", vec![f_ours]);
939 let theirs = assemble("theirs", vec![f_theirs]);
940 let conflicts = diff_frontiers(&ours, &theirs);
941 assert!(
942 !conflicts
943 .iter()
944 .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
945 "0.03 drift should not flag: {conflicts:?}"
946 );
947 }
948
949 #[test]
950 fn diff_detects_retracted_divergence() {
951 let mut f_ours = finding("vf_001", 0.7);
952 let mut f_theirs = finding("vf_001", 0.7);
953 f_ours.id = "vf_001".into();
954 f_theirs.id = "vf_001".into();
955 f_theirs.flags.retracted = true;
956 let ours = assemble("ours", vec![f_ours]);
957 let theirs = assemble("theirs", vec![f_theirs]);
958 let conflicts = diff_frontiers(&ours, &theirs);
959 assert!(
960 conflicts
961 .iter()
962 .any(|c| c.kind == ConflictKind::RetractedDiverged)
963 );
964 }
965
966 #[test]
967 fn diff_detects_review_state_divergence() {
968 let mut f_ours = finding("vf_001", 0.7);
969 let mut f_theirs = finding("vf_001", 0.7);
970 f_ours.id = "vf_001".into();
971 f_theirs.id = "vf_001".into();
972 f_theirs.flags.review_state = Some(ReviewState::Contested);
973 let ours = assemble("ours", vec![f_ours]);
974 let theirs = assemble("theirs", vec![f_theirs]);
975 let conflicts = diff_frontiers(&ours, &theirs);
976 assert!(
977 conflicts
978 .iter()
979 .any(|c| c.kind == ConflictKind::ReviewStateDiverged)
980 );
981 }
982
983 #[test]
984 fn diff_detects_assertion_text_divergence() {
985 let mut f_ours = finding("vf_001", 0.7);
986 let mut f_theirs = finding("vf_001", 0.7);
987 f_ours.id = "vf_001".into();
988 f_theirs.id = "vf_001".into();
989 f_theirs.assertion.text = "different claim".into();
990 let ours = assemble("ours", vec![f_ours]);
991 let theirs = assemble("theirs", vec![f_theirs]);
992 let conflicts = diff_frontiers(&ours, &theirs);
993 assert!(
994 conflicts
995 .iter()
996 .any(|c| c.kind == ConflictKind::AssertionTextDiverged)
997 );
998 }
999
1000 #[test]
1001 fn sync_appends_one_synced_event_plus_one_per_conflict() {
1002 let mut f_ours = finding("vf_001", 0.7);
1003 let mut f_theirs = finding("vf_001", 0.7);
1004 f_ours.id = "vf_001".into();
1005 f_theirs.id = "vf_001".into();
1006 f_theirs.flags.retracted = true;
1007 let mut ours = assemble("ours", vec![f_ours]);
1008 let theirs = assemble("theirs", vec![f_theirs]);
1009 let events_before = ours.events.len();
1010 let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1011 assert_eq!(report.conflicts.len(), 1);
1012 assert_eq!(report.events_appended, 2); assert_eq!(ours.events.len() - events_before, 2);
1014 let sync_ev = &ours.events[events_before];
1016 assert_eq!(sync_ev.kind, "frontier.synced_with_peer");
1017 assert_eq!(sync_ev.payload["divergence_count"].as_u64(), Some(1));
1018 let conf_ev = &ours.events[events_before + 1];
1020 assert_eq!(conf_ev.kind, "frontier.conflict_detected");
1021 assert_eq!(conf_ev.payload["kind"], "retracted_diverged");
1022 }
1023
1024 #[test]
1025 fn sync_with_clean_diff_emits_zero_divergence_event() {
1026 let f = finding("vf_001", 0.7);
1027 let mut ours = assemble("ours", vec![f.clone()]);
1028 let theirs = assemble("theirs", vec![f]);
1029 let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1030 assert_eq!(report.conflicts.len(), 0);
1031 assert_eq!(report.events_appended, 1);
1032 let last = ours.events.last().unwrap();
1033 assert_eq!(last.kind, "frontier.synced_with_peer");
1034 assert_eq!(last.payload["divergence_count"].as_u64(), Some(0));
1035 }
1036}