Skip to main content

vela_protocol/
federation.rs

1//! v0.39: Hub federation — peer registry + conflict detection.
2//!
3//! Pre-v0.39, every Vela frontier had exactly one source of truth: the
4//! single hub it was published to (`vela-hub.fly.dev`). The substrate
5//! claimed the kernel was content-addressed and signed, but the
6//! distribution layer was centralized — there was no way for a second
7//! hub to mirror a frontier and detect when its view diverged from
8//! the original.
9//!
10//! v0.39.0 lands the *schema layer* of federation. A frontier can now
11//! register peer hubs (id + URL + public key) in `Project.peers`, and
12//! the kernel knows two new event kinds:
13//!
14//! - `frontier.synced_with_peer` — append-only record of a sync pass:
15//!   what we exchanged, what hash we ended up agreeing on, how many
16//!   findings differed.
17//! - `frontier.conflict_detected` — emitted per finding when our view
18//!   and the peer's view disagree on a substantive field (review
19//!   verdict, confidence, retraction, presence).
20//!
21//! The actual sync runtime (HTTP fetch, manifest verification,
22//! conflict-resolution proposal emission) ships in v0.39.1+. Same
23//! staging discipline as v0.32 (Replication object) → v0.36.1
24//! (Project.replications becomes authoritative) and v0.38.0 (causal
25//! schema) → v0.38.1 (causal math).
26//!
27//! Doctrine for v0.39.0:
28//! - The peer registry is a frontier-local declaration. Adding a peer
29//!   does not yet trust their state; it just establishes who we know
30//!   about.
31//! - Peer signatures still verify under the same Ed25519 discipline
32//!   as `actors`. A peer's `frontier.merged` event signed by their
33//!   key can be replayed locally only when their pubkey is in our
34//!   `peers` registry.
35//! - Conflicts are recorded, not auto-resolved. v0.39.1+ will surface
36//!   them through proposals so a human reviewer chooses which side
37//!   to accept.
38
39use chrono::Utc;
40use serde::{Deserialize, Serialize};
41use serde_json::json;
42
43use crate::events::{
44    EVENT_SCHEMA, NULL_HASH, StateActor, StateEvent, StateTarget, compute_event_id, snapshot_hash,
45};
46use crate::project::Project;
47
48/// v0.39: A registered peer hub the local frontier knows about.
49/// Content-addressed by `(id, public_key)` so two registry entries
50/// for the same peer with different keys can be detected as a
51/// material change rather than silent overwrite.
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct PeerHub {
54    /// Stable, namespaced identifier — the equivalent of an
55    /// `actor.id` for hub-scale identities. Recommended form
56    /// `hub:<short-name>` (e.g. `hub:vela-mirror-eu`).
57    pub id: String,
58    /// HTTPS URL where the peer publishes signed manifests. The
59    /// expected shape is `<base>/manifest/<vfr_id>.json` with a
60    /// detached signature at `<base>/manifest/<vfr_id>.sig`.
61    pub url: String,
62    /// Hex-encoded Ed25519 public key (64 hex chars) the peer signs
63    /// their manifests with. Used to verify any
64    /// `frontier.merged` event coming from them.
65    pub public_key: String,
66    /// ISO 8601 timestamp of when the peer was added to this
67    /// frontier's registry.
68    pub added_at: String,
69    /// Optional human-readable note: "EU mirror, run by lab Z."
70    /// Doesn't enter any content address; stored verbatim.
71    #[serde(default, skip_serializing_if = "String::is_empty")]
72    pub note: String,
73}
74
75impl PeerHub {
76    /// Validate the structural shape of a `PeerHub` before insertion.
77    /// Specifically: id must be non-empty, url must be HTTPS, and
78    /// public_key must be 64 hex chars.
79    pub fn validate(&self) -> Result<(), String> {
80        if self.id.trim().is_empty() {
81            return Err("peer id must be non-empty".into());
82        }
83        if !self.url.starts_with("https://") {
84            return Err(format!(
85                "peer url must start with `https://` (got `{}`)",
86                self.url
87            ));
88        }
89        let trimmed = self.public_key.trim();
90        if trimmed.len() != 64 {
91            return Err(format!(
92                "peer public_key must be 64 hex chars (got {})",
93                trimmed.len()
94            ));
95        }
96        if hex::decode(trimmed).is_err() {
97            return Err("peer public_key must be valid hex".into());
98        }
99        Ok(())
100    }
101}
102
103/// v0.39.1: Conflict taxonomy. The kinds of disagreement two hubs can
104/// have over the same `vfr_id`. v0.39.0 left `kind` as an open string;
105/// v0.39.1 pins it to this closed set, derived from auditing every
106/// substantive field-level disagreement we expect to see.
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
108#[serde(rename_all = "snake_case")]
109pub enum ConflictKind {
110    /// Finding present in our frontier, absent in the peer's.
111    MissingInPeer,
112    /// Finding present in the peer's frontier, absent in ours.
113    MissingLocally,
114    /// Same `vf_id`, score differs by more than 0.05. Below the
115    /// threshold it's noise from confidence recompute drift.
116    ConfidenceDiverged,
117    /// Same `vf_id`, one side has it retracted, the other doesn't.
118    RetractedDiverged,
119    /// Same `vf_id`, different `flags.review_state`.
120    ReviewStateDiverged,
121    /// Same `vf_id`, one side has it superseded, the other doesn't.
122    SupersededDiverged,
123    /// Same `vf_id`, different assertion text. This is a serious
124    /// signal — `vf_id` is content-addressed over the assertion, so
125    /// matching id with diverging text means a content-address
126    /// collision or signing-bytes mismatch between implementations.
127    AssertionTextDiverged,
128    /// v0.41.0: peer's registry entry resolves but its
129    /// `network_locator` returns 4xx/5xx. The peer hub is healthy and
130    /// signed the entry, but the manifest URL the entry points at is
131    /// dead. Common when frontiers move repos (e.g. v0.34.1 split
132    /// `vela-science/vela` → `vela-science/vela-frontiers`) and the
133    /// peer's published entry was never refreshed. Surfaces the
134    /// stale-locator failure mode that "peer is reachable but
135    /// content isn't" produces — distinct from a missing finding.
136    BrokenLocator,
137    /// v0.41.0: peer's registry entry exists but its signature does
138    /// not verify against the registered owner pubkey. Either the
139    /// signature is corrupt or the owner pubkey we registered for
140    /// this peer is wrong. Halts content sync — the kernel won't
141    /// trust unsigned-or-misverified state.
142    UnverifiedPeerEntry,
143}
144
145impl ConflictKind {
146    pub fn as_str(self) -> &'static str {
147        match self {
148            ConflictKind::MissingInPeer => "missing_in_peer",
149            ConflictKind::MissingLocally => "missing_locally",
150            ConflictKind::ConfidenceDiverged => "confidence_diverged",
151            ConflictKind::RetractedDiverged => "retracted_diverged",
152            ConflictKind::ReviewStateDiverged => "review_state_diverged",
153            ConflictKind::SupersededDiverged => "superseded_diverged",
154            ConflictKind::AssertionTextDiverged => "assertion_text_diverged",
155            ConflictKind::BrokenLocator => "broken_locator",
156            ConflictKind::UnverifiedPeerEntry => "unverified_peer_entry",
157        }
158    }
159}
160
161/// One per-finding disagreement detected during sync.
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct Conflict {
164    pub finding_id: String,
165    pub kind: ConflictKind,
166    /// Free-form context for the rendering layer ("our: 0.82, peer:
167    /// 0.65"). Not part of any content address.
168    pub detail: String,
169}
170
171/// Result of one `sync_with_peer` pass.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct SyncReport {
174    pub peer_id: String,
175    pub our_snapshot_hash: String,
176    pub peer_snapshot_hash: String,
177    pub conflicts: Vec<Conflict>,
178    /// Number of `StateEvent`s appended to our project (1 sync event +
179    /// N conflict events).
180    pub events_appended: usize,
181}
182
183/// v0.39.1: Diff two frontiers and produce a list of conflicts. Pure
184/// function, no I/O. The peer's state is passed in directly so the
185/// sync orchestrator can be unit-tested without HTTP.
186///
187/// Confidence diff threshold is 0.05 — below that it's recompute drift
188/// (the v0.36.1 formula change moved scores by < 0.001 on real data).
189#[must_use]
190pub fn diff_frontiers(ours: &Project, theirs: &Project) -> Vec<Conflict> {
191    use std::collections::HashMap;
192
193    let our_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
194        ours.findings.iter().map(|f| (f.id.as_str(), f)).collect();
195    let their_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
196        theirs.findings.iter().map(|f| (f.id.as_str(), f)).collect();
197
198    let mut conflicts = Vec::new();
199
200    // Findings only in ours.
201    for id in our_by_id.keys() {
202        if !their_by_id.contains_key(id) {
203            conflicts.push(Conflict {
204                finding_id: (*id).to_string(),
205                kind: ConflictKind::MissingInPeer,
206                detail: "present locally, absent in peer".to_string(),
207            });
208        }
209    }
210    // Findings only in theirs.
211    for id in their_by_id.keys() {
212        if !our_by_id.contains_key(id) {
213            conflicts.push(Conflict {
214                finding_id: (*id).to_string(),
215                kind: ConflictKind::MissingLocally,
216                detail: "present in peer, absent locally".to_string(),
217            });
218        }
219    }
220    // Findings in both — check field-level disagreements.
221    for (id, ours_f) in &our_by_id {
222        let Some(theirs_f) = their_by_id.get(id) else {
223            continue;
224        };
225        if (ours_f.confidence.score - theirs_f.confidence.score).abs() > 0.05 {
226            conflicts.push(Conflict {
227                finding_id: (*id).to_string(),
228                kind: ConflictKind::ConfidenceDiverged,
229                detail: format!(
230                    "ours: {:.3}, peer: {:.3}",
231                    ours_f.confidence.score, theirs_f.confidence.score
232                ),
233            });
234        }
235        if ours_f.flags.retracted != theirs_f.flags.retracted {
236            conflicts.push(Conflict {
237                finding_id: (*id).to_string(),
238                kind: ConflictKind::RetractedDiverged,
239                detail: format!(
240                    "ours: {}, peer: {}",
241                    ours_f.flags.retracted, theirs_f.flags.retracted
242                ),
243            });
244        }
245        if ours_f.flags.review_state != theirs_f.flags.review_state {
246            conflicts.push(Conflict {
247                finding_id: (*id).to_string(),
248                kind: ConflictKind::ReviewStateDiverged,
249                detail: format!(
250                    "ours: {:?}, peer: {:?}",
251                    ours_f.flags.review_state, theirs_f.flags.review_state
252                ),
253            });
254        }
255        if ours_f.flags.superseded != theirs_f.flags.superseded {
256            conflicts.push(Conflict {
257                finding_id: (*id).to_string(),
258                kind: ConflictKind::SupersededDiverged,
259                detail: format!(
260                    "ours: {}, peer: {}",
261                    ours_f.flags.superseded, theirs_f.flags.superseded
262                ),
263            });
264        }
265        if ours_f.assertion.text.trim() != theirs_f.assertion.text.trim() {
266            conflicts.push(Conflict {
267                finding_id: (*id).to_string(),
268                kind: ConflictKind::AssertionTextDiverged,
269                detail:
270                    "matching id but diverging assertion text — possible content-address collision"
271                        .to_string(),
272            });
273        }
274    }
275
276    conflicts.sort_by(|a, b| {
277        a.finding_id
278            .cmp(&b.finding_id)
279            .then_with(|| a.kind.as_str().cmp(b.kind.as_str()))
280    });
281    conflicts
282}
283
284/// v0.41.0: Record a single broken-locator conflict against a peer.
285/// Emits one `frontier.synced_with_peer` event with `divergence_count
286/// = 1` plus one `frontier.conflict_detected` event of kind
287/// `broken_locator`. Used when the peer hub is reachable, its
288/// registry entry signature verifies, but the locator URL the entry
289/// points at returns 4xx/5xx — common for stale published locators
290/// after a repo move.
291pub fn record_locator_failure(
292    project: &mut Project,
293    peer_id: &str,
294    vfr_id: &str,
295    locator: &str,
296    status: u16,
297) -> SyncReport {
298    let now = Utc::now().to_rfc3339();
299    let our_hash = snapshot_hash(project);
300    let frontier_id = project.frontier_id();
301    let detail = format!("locator {locator} returned HTTP {status}");
302
303    let synced_event = StateEvent {
304        schema: EVENT_SCHEMA.to_string(),
305        id: String::new(),
306        kind: "frontier.synced_with_peer".to_string(),
307        target: StateTarget {
308            r#type: "frontier_observation".to_string(),
309            id: frontier_id.clone(),
310        },
311        actor: StateActor {
312            id: "federation".to_string(),
313            r#type: "system".to_string(),
314        },
315        timestamp: now.clone(),
316        reason: format!("synced with peer {peer_id} (broken locator)"),
317        before_hash: NULL_HASH.to_string(),
318        after_hash: NULL_HASH.to_string(),
319        payload: json!({
320            "peer_id": peer_id,
321            "peer_snapshot_hash": "",
322            "our_snapshot_hash": our_hash,
323            "divergence_count": 1,
324        }),
325        caveats: Vec::new(),
326        signature: None,
327    };
328    let mut sync_ev = synced_event;
329    sync_ev.id = compute_event_id(&sync_ev);
330
331    let conflict_ev = StateEvent {
332        schema: EVENT_SCHEMA.to_string(),
333        id: String::new(),
334        kind: "frontier.conflict_detected".to_string(),
335        target: StateTarget {
336            r#type: "frontier_observation".to_string(),
337            id: frontier_id.clone(),
338        },
339        actor: StateActor {
340            id: "federation".to_string(),
341            r#type: "system".to_string(),
342        },
343        timestamp: now.clone(),
344        reason: format!("peer={peer_id} kind=broken_locator {detail}"),
345        before_hash: NULL_HASH.to_string(),
346        after_hash: NULL_HASH.to_string(),
347        payload: json!({
348            "peer_id": peer_id,
349            "finding_id": vfr_id,
350            "kind": "broken_locator",
351            "detail": detail,
352        }),
353        caveats: Vec::new(),
354        signature: None,
355    };
356    let mut conflict_ev = conflict_ev;
357    conflict_ev.id = compute_event_id(&conflict_ev);
358
359    project.events.push(sync_ev);
360    project.events.push(conflict_ev);
361
362    SyncReport {
363        peer_id: peer_id.to_string(),
364        our_snapshot_hash: our_hash,
365        peer_snapshot_hash: String::new(),
366        conflicts: vec![Conflict {
367            finding_id: vfr_id.to_string(),
368            kind: ConflictKind::BrokenLocator,
369            detail,
370        }],
371        events_appended: 2,
372    }
373}
374
375/// v0.41.0: Record an unverified-peer-entry conflict. Same shape as
376/// `record_locator_failure` but for when the peer's registry entry
377/// signature did not verify against the registered owner pubkey.
378/// Sync halts before any content is fetched — the kernel won't trust
379/// unsigned-or-misverified state.
380pub fn record_unverified_entry(
381    project: &mut Project,
382    peer_id: &str,
383    vfr_id: &str,
384    reason: &str,
385) -> SyncReport {
386    let now = Utc::now().to_rfc3339();
387    let our_hash = snapshot_hash(project);
388    let frontier_id = project.frontier_id();
389
390    let mut sync_ev = StateEvent {
391        schema: EVENT_SCHEMA.to_string(),
392        id: String::new(),
393        kind: "frontier.synced_with_peer".to_string(),
394        target: StateTarget {
395            r#type: "frontier_observation".to_string(),
396            id: frontier_id.clone(),
397        },
398        actor: StateActor {
399            id: "federation".to_string(),
400            r#type: "system".to_string(),
401        },
402        timestamp: now.clone(),
403        reason: format!("synced with peer {peer_id} (unverified entry; halted)"),
404        before_hash: NULL_HASH.to_string(),
405        after_hash: NULL_HASH.to_string(),
406        payload: json!({
407            "peer_id": peer_id,
408            "peer_snapshot_hash": "",
409            "our_snapshot_hash": our_hash,
410            "divergence_count": 1,
411        }),
412        caveats: Vec::new(),
413        signature: None,
414    };
415    sync_ev.id = compute_event_id(&sync_ev);
416
417    let mut conflict_ev = StateEvent {
418        schema: EVENT_SCHEMA.to_string(),
419        id: String::new(),
420        kind: "frontier.conflict_detected".to_string(),
421        target: StateTarget {
422            r#type: "frontier_observation".to_string(),
423            id: frontier_id.clone(),
424        },
425        actor: StateActor {
426            id: "federation".to_string(),
427            r#type: "system".to_string(),
428        },
429        timestamp: now.clone(),
430        reason: format!("peer={peer_id} kind=unverified_peer_entry {reason}"),
431        before_hash: NULL_HASH.to_string(),
432        after_hash: NULL_HASH.to_string(),
433        payload: json!({
434            "peer_id": peer_id,
435            "finding_id": vfr_id,
436            "kind": "unverified_peer_entry",
437            "detail": reason,
438        }),
439        caveats: Vec::new(),
440        signature: None,
441    };
442    conflict_ev.id = compute_event_id(&conflict_ev);
443
444    project.events.push(sync_ev);
445    project.events.push(conflict_ev);
446
447    SyncReport {
448        peer_id: peer_id.to_string(),
449        our_snapshot_hash: our_hash,
450        peer_snapshot_hash: String::new(),
451        conflicts: vec![Conflict {
452            finding_id: vfr_id.to_string(),
453            kind: ConflictKind::UnverifiedPeerEntry,
454            detail: reason.to_string(),
455        }],
456        events_appended: 2,
457    }
458}
459
460/// v0.39.1: Run a full sync pass against a peer's already-fetched
461/// frontier state. Diffs, emits one `frontier.synced_with_peer`
462/// event recording the pass, and one `frontier.conflict_detected`
463/// event per disagreement. Returns the report; caller persists the
464/// project.
465///
466/// Splitting fetch from sync this way lets the sync logic be
467/// fully unit-testable without HTTP — the CLI pipes a real fetch
468/// into this function.
469pub fn sync_with_peer(project: &mut Project, peer_id: &str, peer: &Project) -> SyncReport {
470    let our_hash = snapshot_hash(project);
471    let peer_hash = snapshot_hash(peer);
472    let conflicts = diff_frontiers(project, peer);
473
474    let now = Utc::now().to_rfc3339();
475    let frontier_id = project.frontier_id().clone();
476
477    // v0.39.1 fix: federation events are frontier-level *observations*,
478    // not finding-level state changes. Target the frontier (vfr_id)
479    // with `target.type = "frontier_observation"` so:
480    //   - replay's per-finding chain validator skips them (chain
481    //     only runs on `target.type == "finding"`);
482    //   - the orphan check skips them (orphan check only flags
483    //     finding-targeted events whose finding_id is unknown).
484    // The `finding_id` of each conflict still lives in the payload
485    // for downstream queries; only the canonical event target is the
486    // frontier.
487    let synced_reason = format!("synced with peer {peer_id}");
488    let mut synced_event = StateEvent {
489        schema: EVENT_SCHEMA.to_string(),
490        id: String::new(),
491        kind: "frontier.synced_with_peer".to_string(),
492        target: StateTarget {
493            r#type: "frontier_observation".to_string(),
494            id: frontier_id.clone(),
495        },
496        actor: StateActor {
497            id: "federation".to_string(),
498            r#type: "system".to_string(),
499        },
500        timestamp: now.clone(),
501        reason: synced_reason,
502        before_hash: NULL_HASH.to_string(),
503        after_hash: NULL_HASH.to_string(),
504        payload: json!({
505            "peer_id": peer_id,
506            "peer_snapshot_hash": peer_hash,
507            "our_snapshot_hash": our_hash,
508            "divergence_count": conflicts.len(),
509        }),
510        caveats: Vec::new(),
511        signature: None,
512    };
513    synced_event.id = compute_event_id(&synced_event);
514
515    let mut conflict_events: Vec<StateEvent> = Vec::with_capacity(conflicts.len());
516    for c in &conflicts {
517        let reason = format!("peer={peer_id} kind={} {}", c.kind.as_str(), c.detail);
518        let mut ev = StateEvent {
519            schema: EVENT_SCHEMA.to_string(),
520            id: String::new(),
521            kind: "frontier.conflict_detected".to_string(),
522            target: StateTarget {
523                r#type: "frontier_observation".to_string(),
524                id: frontier_id.clone(),
525            },
526            actor: StateActor {
527                id: "federation".to_string(),
528                r#type: "system".to_string(),
529            },
530            timestamp: now.clone(),
531            reason,
532            before_hash: NULL_HASH.to_string(),
533            after_hash: NULL_HASH.to_string(),
534            payload: json!({
535                "peer_id": peer_id,
536                "finding_id": c.finding_id,
537                "kind": c.kind.as_str(),
538                "detail": c.detail,
539            }),
540            caveats: Vec::new(),
541            signature: None,
542        };
543        ev.id = compute_event_id(&ev);
544        conflict_events.push(ev);
545    }
546
547    let events_appended = 1 + conflict_events.len();
548    project.events.push(synced_event);
549    project.events.extend(conflict_events);
550
551    SyncReport {
552        peer_id: peer_id.to_string(),
553        our_snapshot_hash: our_hash,
554        peer_snapshot_hash: peer_hash,
555        conflicts,
556        events_appended,
557    }
558}
559
560/// v0.41.0: Result of trying to discover a peer's frontier through
561/// the hub's `/entries/<vfr_id>` endpoint. The runtime needs to
562/// distinguish three failure modes — peer unreachable, registry
563/// entry signature invalid, and locator URL dead — because each one
564/// has a different remediation.
565#[derive(Debug)]
566pub enum DiscoveryResult {
567    /// Hub returned a valid entry, signature verified, locator
568    /// fetched, manifest parsed. Includes the project for
569    /// downstream diff.
570    Resolved(Project),
571    /// Hub /entries/<vfr_id> returned 4xx/5xx — peer doesn't claim
572    /// to know this vfr_id.
573    EntryNotFound { vfr_id: String, status: u16 },
574    /// Hub returned an entry but its signature does not verify
575    /// against the registered peer pubkey. Halts content sync.
576    UnverifiedEntry { vfr_id: String, reason: String },
577    /// Hub entry verifies, but its `network_locator` URL returns
578    /// 4xx/5xx. Stale-locator failure mode.
579    BrokenLocator {
580        vfr_id: String,
581        locator: String,
582        status: u16,
583    },
584    /// Network error to the hub itself or to the locator.
585    Unreachable { url: String, error: String },
586}
587
588/// v0.41.0: Discover a peer frontier by routing through the hub's
589/// `/entries/<vfr_id>` endpoint. Verifies the registry entry's
590/// signature against `expected_owner_pubkey`, then follows
591/// `entry.network_locator` to fetch the actual manifest.
592///
593/// This is the "real federation" path: hubs publish signed registry
594/// entries pointing at content URLs; sync fetches both, verifying the
595/// signature chain end-to-end. If any step fails, the failure mode
596/// is captured as a typed result so the calling sync runtime can
597/// emit the appropriate `Conflict` (BrokenLocator, UnverifiedEntry,
598/// etc.) rather than blackhole'ing the error.
599pub fn discover_peer_frontier(
600    hub_url: &str,
601    vfr_id: &str,
602    expected_owner_pubkey: Option<&str>,
603) -> DiscoveryResult {
604    let hub = hub_url.trim_end_matches('/').to_string();
605    let entries_url = format!("{hub}/entries/{vfr_id}");
606    let vfr_owned = vfr_id.to_string();
607    let expected = expected_owner_pubkey.map(|s| s.to_string());
608
609    std::thread::spawn(move || -> DiscoveryResult {
610        let resp = match reqwest::blocking::get(&entries_url) {
611            Ok(r) => r,
612            Err(e) => {
613                return DiscoveryResult::Unreachable {
614                    url: entries_url.clone(),
615                    error: e.to_string(),
616                };
617            }
618        };
619        let status = resp.status();
620        if status.as_u16() == 404 {
621            return DiscoveryResult::EntryNotFound {
622                vfr_id: vfr_owned,
623                status: status.as_u16(),
624            };
625        }
626        if !status.is_success() {
627            return DiscoveryResult::Unreachable {
628                url: entries_url.clone(),
629                error: format!("hub returned HTTP {status}"),
630            };
631        }
632        let body = match resp.text() {
633            Ok(b) => b,
634            Err(e) => {
635                return DiscoveryResult::Unreachable {
636                    url: entries_url.clone(),
637                    error: format!("read body: {e}"),
638                };
639            }
640        };
641        let entry: crate::registry::RegistryEntry = match serde_json::from_str(&body) {
642            Ok(e) => e,
643            Err(e) => {
644                return DiscoveryResult::UnverifiedEntry {
645                    vfr_id: vfr_owned,
646                    reason: format!("parse registry entry: {e}"),
647                };
648            }
649        };
650
651        // Verify signature.
652        match crate::registry::verify_entry(&entry) {
653            Ok(true) => {}
654            Ok(false) => {
655                return DiscoveryResult::UnverifiedEntry {
656                    vfr_id: vfr_owned,
657                    reason: "registry entry signature does not verify against entry.owner_pubkey"
658                        .to_string(),
659                };
660            }
661            Err(e) => {
662                return DiscoveryResult::UnverifiedEntry {
663                    vfr_id: vfr_owned,
664                    reason: format!("signature verification error: {e}"),
665                };
666            }
667        }
668        // Cross-check expected pubkey if the caller supplied one.
669        if let Some(want) = expected.as_deref()
670            && entry.owner_pubkey != want
671        {
672            return DiscoveryResult::UnverifiedEntry {
673                vfr_id: vfr_owned,
674                reason: format!(
675                    "entry owner_pubkey {} != expected peer pubkey {}",
676                    &entry.owner_pubkey[..16],
677                    &want[..16]
678                ),
679            };
680        }
681
682        // Follow locator to fetch the manifest.
683        let locator = entry.network_locator.clone();
684        let mresp = match reqwest::blocking::get(&locator) {
685            Ok(r) => r,
686            Err(e) => {
687                return DiscoveryResult::BrokenLocator {
688                    vfr_id: vfr_owned,
689                    locator,
690                    status: 0,
691                }
692                .with_error(e.to_string());
693            }
694        };
695        let mstatus = mresp.status();
696        if !mstatus.is_success() {
697            return DiscoveryResult::BrokenLocator {
698                vfr_id: vfr_owned,
699                locator,
700                status: mstatus.as_u16(),
701            };
702        }
703        let mbody = match mresp.text() {
704            Ok(b) => b,
705            Err(e) => {
706                return DiscoveryResult::BrokenLocator {
707                    vfr_id: vfr_owned,
708                    locator,
709                    status: 0,
710                }
711                .with_error(e.to_string());
712            }
713        };
714        match serde_json::from_str::<Project>(&mbody) {
715            Ok(p) => DiscoveryResult::Resolved(p),
716            Err(e) => DiscoveryResult::BrokenLocator {
717                vfr_id: vfr_owned,
718                locator,
719                status: 0,
720            }
721            .with_error(format!("manifest parse: {e}")),
722        }
723    })
724    .join()
725    .unwrap_or(DiscoveryResult::Unreachable {
726        url: hub_url.to_string(),
727        error: "discovery thread panicked".to_string(),
728    })
729}
730
731impl DiscoveryResult {
732    fn with_error(self, _ctx: String) -> Self {
733        // BrokenLocator already carries status; reserved hook for
734        // richer diagnostics later.
735        self
736    }
737}
738
739/// v0.39.1: Fetch a peer's frontier JSON over HTTP. The URL is
740/// expected to serve a JSON-serialized `Project`. Blocking call —
741/// `vela federation sync` is a one-shot CLI verb, not a service.
742///
743/// Implementation note: the CLI top-level dispatcher runs inside a
744/// tokio runtime, but `reqwest::blocking` panics if dropped inside
745/// an async context. We escape into a dedicated OS thread that owns
746/// its own runtime, making the call safe to issue from sync code
747/// regardless of who's calling it.
748///
749/// Verification of peer signatures (and registry entries) is a
750/// separate concern, addressed in v0.39.2+. v0.39.1 trusts the
751/// transport so the sync diff/event-emission machinery can be
752/// validated against real peer state first.
753pub fn fetch_peer_frontier(url: &str) -> Result<Project, String> {
754    let url_owned = url.to_string();
755    let handle = std::thread::spawn(move || -> Result<Project, String> {
756        let resp = reqwest::blocking::get(&url_owned)
757            .map_err(|e| format!("HTTP GET {url_owned} failed: {e}"))?;
758        let status = resp.status();
759        if !status.is_success() {
760            return Err(format!("peer returned HTTP {status}"));
761        }
762        let body = resp
763            .text()
764            .map_err(|e| format!("read body from {url_owned}: {e}"))?;
765        serde_json::from_str(&body)
766            .map_err(|e| format!("parse peer frontier from {url_owned}: {e}"))
767    });
768    handle
769        .join()
770        .map_err(|_| "fetch thread panicked".to_string())?
771}
772
773#[cfg(test)]
774mod tests {
775    use super::*;
776
777    fn good() -> PeerHub {
778        PeerHub {
779            id: "hub:test".into(),
780            url: "https://example.invalid/".into(),
781            public_key: "00".repeat(32),
782            added_at: "2026-04-27T00:00:00Z".into(),
783            note: String::new(),
784        }
785    }
786
787    #[test]
788    fn validates_correct_shape() {
789        assert!(good().validate().is_ok());
790    }
791
792    #[test]
793    fn rejects_empty_id() {
794        let mut p = good();
795        p.id = "  ".into();
796        assert!(p.validate().is_err());
797    }
798
799    #[test]
800    fn rejects_http_url() {
801        let mut p = good();
802        p.url = "http://insecure.example/".into();
803        assert!(p.validate().is_err());
804    }
805
806    #[test]
807    fn rejects_short_pubkey() {
808        let mut p = good();
809        p.public_key = "abcd".into();
810        assert!(p.validate().is_err());
811    }
812
813    #[test]
814    fn rejects_non_hex_pubkey() {
815        let mut p = good();
816        p.public_key = "z".repeat(64);
817        assert!(p.validate().is_err());
818    }
819
820    // ── v0.39.1 sync runtime tests ───────────────────────────────────
821
822    use crate::bundle::{
823        Assertion, Conditions, Confidence, Evidence, Extraction, FindingBundle, Flags, Provenance,
824        ReviewState,
825    };
826    use crate::project::{self, Project};
827
828    fn finding(id: &str, score: f64) -> FindingBundle {
829        let mut b = FindingBundle::new(
830            Assertion {
831                text: format!("claim {id}"),
832                assertion_type: "mechanism".into(),
833                entities: vec![],
834                relation: None,
835                direction: None,
836                causal_claim: None,
837                causal_evidence_grade: None,
838            },
839            Evidence {
840                evidence_type: "experimental".into(),
841                model_system: String::new(),
842                species: None,
843                method: String::new(),
844                sample_size: Some("n=30".into()),
845                effect_size: None,
846                p_value: None,
847                replicated: false,
848                replication_count: None,
849                evidence_spans: vec![],
850            },
851            Conditions {
852                text: String::new(),
853                species_verified: vec![],
854                species_unverified: vec![],
855                in_vitro: false,
856                in_vivo: false,
857                human_data: false,
858                clinical_trial: false,
859                concentration_range: None,
860                duration: None,
861                age_group: None,
862                cell_type: None,
863            },
864            Confidence::raw(score, "test", 0.85),
865            Provenance {
866                source_type: "published_paper".into(),
867                doi: None,
868                pmid: None,
869                pmc: None,
870                openalex_id: None,
871                url: None,
872                title: "Test".into(),
873                authors: vec![],
874                year: Some(2025),
875                journal: None,
876                license: None,
877                publisher: None,
878                funders: vec![],
879                extraction: Extraction::default(),
880                review: None,
881                citation_count: None,
882            },
883            Flags::default(),
884        );
885        b.id = id.to_string();
886        b
887    }
888
889    fn assemble(name: &str, findings: Vec<FindingBundle>) -> Project {
890        project::assemble(name, findings, 1, 0, "test")
891    }
892
893    #[test]
894    fn diff_identical_frontiers_returns_no_conflicts() {
895        let f = finding("vf_001", 0.7);
896        let ours = assemble("ours", vec![f.clone()]);
897        let theirs = assemble("theirs", vec![f]);
898        let conflicts = diff_frontiers(&ours, &theirs);
899        assert_eq!(conflicts.len(), 0);
900    }
901
902    #[test]
903    fn diff_detects_missing_in_peer_and_locally() {
904        let f1 = finding("vf_001", 0.7);
905        let f2 = finding("vf_002", 0.7);
906        let ours = assemble("ours", vec![f1.clone()]);
907        let theirs = assemble("theirs", vec![f2.clone()]);
908        let conflicts = diff_frontiers(&ours, &theirs);
909        let kinds: Vec<&str> = conflicts.iter().map(|c| c.kind.as_str()).collect();
910        assert!(kinds.contains(&"missing_in_peer"));
911        assert!(kinds.contains(&"missing_locally"));
912    }
913
914    #[test]
915    fn diff_detects_confidence_divergence_above_threshold() {
916        let mut f_ours = finding("vf_001", 0.85);
917        let mut f_theirs = finding("vf_001", 0.55);
918        // Force same id by aligning content; here they share id by construction.
919        f_ours.id = "vf_001".into();
920        f_theirs.id = "vf_001".into();
921        let ours = assemble("ours", vec![f_ours]);
922        let theirs = assemble("theirs", vec![f_theirs]);
923        let conflicts = diff_frontiers(&ours, &theirs);
924        assert!(
925            conflicts
926                .iter()
927                .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
928            "expected confidence_diverged in {conflicts:?}"
929        );
930    }
931
932    #[test]
933    fn diff_ignores_confidence_drift_below_threshold() {
934        let mut f_ours = finding("vf_001", 0.700);
935        let mut f_theirs = finding("vf_001", 0.730);
936        f_ours.id = "vf_001".into();
937        f_theirs.id = "vf_001".into();
938        let ours = assemble("ours", vec![f_ours]);
939        let theirs = assemble("theirs", vec![f_theirs]);
940        let conflicts = diff_frontiers(&ours, &theirs);
941        assert!(
942            !conflicts
943                .iter()
944                .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
945            "0.03 drift should not flag: {conflicts:?}"
946        );
947    }
948
949    #[test]
950    fn diff_detects_retracted_divergence() {
951        let mut f_ours = finding("vf_001", 0.7);
952        let mut f_theirs = finding("vf_001", 0.7);
953        f_ours.id = "vf_001".into();
954        f_theirs.id = "vf_001".into();
955        f_theirs.flags.retracted = true;
956        let ours = assemble("ours", vec![f_ours]);
957        let theirs = assemble("theirs", vec![f_theirs]);
958        let conflicts = diff_frontiers(&ours, &theirs);
959        assert!(
960            conflicts
961                .iter()
962                .any(|c| c.kind == ConflictKind::RetractedDiverged)
963        );
964    }
965
966    #[test]
967    fn diff_detects_review_state_divergence() {
968        let mut f_ours = finding("vf_001", 0.7);
969        let mut f_theirs = finding("vf_001", 0.7);
970        f_ours.id = "vf_001".into();
971        f_theirs.id = "vf_001".into();
972        f_theirs.flags.review_state = Some(ReviewState::Contested);
973        let ours = assemble("ours", vec![f_ours]);
974        let theirs = assemble("theirs", vec![f_theirs]);
975        let conflicts = diff_frontiers(&ours, &theirs);
976        assert!(
977            conflicts
978                .iter()
979                .any(|c| c.kind == ConflictKind::ReviewStateDiverged)
980        );
981    }
982
983    #[test]
984    fn diff_detects_assertion_text_divergence() {
985        let mut f_ours = finding("vf_001", 0.7);
986        let mut f_theirs = finding("vf_001", 0.7);
987        f_ours.id = "vf_001".into();
988        f_theirs.id = "vf_001".into();
989        f_theirs.assertion.text = "different claim".into();
990        let ours = assemble("ours", vec![f_ours]);
991        let theirs = assemble("theirs", vec![f_theirs]);
992        let conflicts = diff_frontiers(&ours, &theirs);
993        assert!(
994            conflicts
995                .iter()
996                .any(|c| c.kind == ConflictKind::AssertionTextDiverged)
997        );
998    }
999
1000    #[test]
1001    fn sync_appends_one_synced_event_plus_one_per_conflict() {
1002        let mut f_ours = finding("vf_001", 0.7);
1003        let mut f_theirs = finding("vf_001", 0.7);
1004        f_ours.id = "vf_001".into();
1005        f_theirs.id = "vf_001".into();
1006        f_theirs.flags.retracted = true;
1007        let mut ours = assemble("ours", vec![f_ours]);
1008        let theirs = assemble("theirs", vec![f_theirs]);
1009        let events_before = ours.events.len();
1010        let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1011        assert_eq!(report.conflicts.len(), 1);
1012        assert_eq!(report.events_appended, 2); // 1 sync + 1 conflict
1013        assert_eq!(ours.events.len() - events_before, 2);
1014        // The first appended event is the sync record.
1015        let sync_ev = &ours.events[events_before];
1016        assert_eq!(sync_ev.kind, "frontier.synced_with_peer");
1017        assert_eq!(sync_ev.payload["divergence_count"].as_u64(), Some(1));
1018        // The second is the conflict.
1019        let conf_ev = &ours.events[events_before + 1];
1020        assert_eq!(conf_ev.kind, "frontier.conflict_detected");
1021        assert_eq!(conf_ev.payload["kind"], "retracted_diverged");
1022    }
1023
1024    #[test]
1025    fn sync_with_clean_diff_emits_zero_divergence_event() {
1026        let f = finding("vf_001", 0.7);
1027        let mut ours = assemble("ours", vec![f.clone()]);
1028        let theirs = assemble("theirs", vec![f]);
1029        let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1030        assert_eq!(report.conflicts.len(), 0);
1031        assert_eq!(report.events_appended, 1);
1032        let last = ours.events.last().unwrap();
1033        assert_eq!(last.kind, "frontier.synced_with_peer");
1034        assert_eq!(last.payload["divergence_count"].as_u64(), Some(0));
1035    }
1036}