Skip to main content

vela_protocol/
federation.rs

1//! v0.39: Hub federation — peer registry + conflict detection.
2//!
3//! Pre-v0.39, every Vela frontier had exactly one source of truth: the
4//! single hub it was published to (`vela-hub.fly.dev`). The substrate
5//! claimed the kernel was content-addressed and signed, but the
6//! distribution layer was centralized — there was no way for a second
7//! hub to mirror a frontier and detect when its view diverged from
8//! the original.
9//!
10//! v0.39.0 lands the *schema layer* of federation. A frontier can now
11//! register peer hubs (id + URL + public key) in `Project.peers`, and
12//! the kernel knows two new event kinds:
13//!
14//! - `frontier.synced_with_peer` — append-only record of a sync pass:
15//!   what we exchanged, what hash we ended up agreeing on, how many
16//!   findings differed.
17//! - `frontier.conflict_detected` — emitted per finding when our view
18//!   and the peer's view disagree on a substantive field (review
19//!   verdict, confidence, retraction, presence).
20//!
21//! The actual sync runtime (HTTP fetch, manifest verification,
22//! conflict-resolution proposal emission) ships in v0.39.1+. Same
23//! staging discipline as v0.32 (Replication object) → v0.36.1
24//! (Project.replications becomes authoritative) and v0.38.0 (causal
25//! schema) → v0.38.1 (causal math).
26//!
27//! Doctrine for v0.39.0:
28//! - The peer registry is a frontier-local declaration. Adding a peer
29//!   does not yet trust their state; it just establishes who we know
30//!   about.
31//! - Peer signatures still verify under the same Ed25519 discipline
32//!   as `actors`. A peer's `frontier.merged` event signed by their
33//!   key can be replayed locally only when their pubkey is in our
34//!   `peers` registry.
35//! - Conflicts are recorded, not auto-resolved. v0.39.1+ will surface
36//!   them through proposals so a human reviewer chooses which side
37//!   to accept.
38
39use chrono::Utc;
40use serde::{Deserialize, Serialize};
41use serde_json::json;
42
43use crate::events::{
44    EVENT_SCHEMA, NULL_HASH, StateActor, StateEvent, StateTarget, compute_event_id, snapshot_hash,
45};
46use crate::project::Project;
47
48/// v0.39: A registered peer hub the local frontier knows about.
49/// Content-addressed by `(id, public_key)` so two registry entries
50/// for the same peer with different keys can be detected as a
51/// material change rather than silent overwrite.
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct PeerHub {
54    /// Stable, namespaced identifier — the equivalent of an
55    /// `actor.id` for hub-scale identities. Recommended form
56    /// `hub:<short-name>` (e.g. `hub:vela-mirror-eu`).
57    pub id: String,
58    /// HTTPS URL where the peer publishes signed manifests. The
59    /// expected shape is `<base>/manifest/<vfr_id>.json` with a
60    /// detached signature at `<base>/manifest/<vfr_id>.sig`.
61    pub url: String,
62    /// Hex-encoded Ed25519 public key (64 hex chars) the peer signs
63    /// their manifests with. Used to verify any
64    /// `frontier.merged` event coming from them.
65    pub public_key: String,
66    /// ISO 8601 timestamp of when the peer was added to this
67    /// frontier's registry.
68    pub added_at: String,
69    /// Optional human-readable note: "EU mirror, run by lab Z."
70    /// Doesn't enter any content address; stored verbatim.
71    #[serde(default, skip_serializing_if = "String::is_empty")]
72    pub note: String,
73}
74
75impl PeerHub {
76    /// Validate the structural shape of a `PeerHub` before insertion.
77    /// Specifically: id must be non-empty, url must be HTTPS, and
78    /// public_key must be 64 hex chars.
79    pub fn validate(&self) -> Result<(), String> {
80        if self.id.trim().is_empty() {
81            return Err("peer id must be non-empty".into());
82        }
83        if !self.url.starts_with("https://") {
84            return Err(format!(
85                "peer url must start with `https://` (got `{}`)",
86                self.url
87            ));
88        }
89        let trimmed = self.public_key.trim();
90        if trimmed.len() != 64 {
91            return Err(format!(
92                "peer public_key must be 64 hex chars (got {})",
93                trimmed.len()
94            ));
95        }
96        if hex::decode(trimmed).is_err() {
97            return Err("peer public_key must be valid hex".into());
98        }
99        Ok(())
100    }
101}
102
103/// v0.39.1: Conflict taxonomy. The kinds of disagreement two hubs can
104/// have over the same `vfr_id`. v0.39.0 left `kind` as an open string;
105/// v0.39.1 pins it to this closed set, derived from auditing every
106/// substantive field-level disagreement we expect to see.
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
108#[serde(rename_all = "snake_case")]
109pub enum ConflictKind {
110    /// Finding present in our frontier, absent in the peer's.
111    MissingInPeer,
112    /// Finding present in the peer's frontier, absent in ours.
113    MissingLocally,
114    /// Same `vf_id`, score differs by more than 0.05. Below the
115    /// threshold it's noise from confidence recompute drift.
116    ConfidenceDiverged,
117    /// Same `vf_id`, one side has it retracted, the other doesn't.
118    RetractedDiverged,
119    /// Same `vf_id`, different `flags.review_state`.
120    ReviewStateDiverged,
121    /// Same `vf_id`, one side has it superseded, the other doesn't.
122    SupersededDiverged,
123    /// Same `vf_id`, different assertion text. This is a serious
124    /// signal — `vf_id` is content-addressed over the assertion, so
125    /// matching id with diverging text means a content-address
126    /// collision or signing-bytes mismatch between implementations.
127    AssertionTextDiverged,
128    /// v0.41.0: peer's registry entry resolves but its
129    /// `network_locator` returns 4xx/5xx. The peer hub is healthy and
130    /// signed the entry, but the manifest URL the entry points at is
131    /// dead. Common when frontiers move repos (e.g. v0.34.1 split
132    /// `vela-science/vela` → `vela-science/vela-frontiers`) and the
133    /// peer's published entry was never refreshed. Surfaces the
134    /// stale-locator failure mode that "peer is reachable but
135    /// content isn't" produces — distinct from a missing finding.
136    BrokenLocator,
137    /// v0.41.0: peer's registry entry exists but its signature does
138    /// not verify against the registered owner pubkey. Either the
139    /// signature is corrupt or the owner pubkey we registered for
140    /// this peer is wrong. Halts content sync — the kernel won't
141    /// trust unsigned-or-misverified state.
142    UnverifiedPeerEntry,
143}
144
145impl ConflictKind {
146    pub fn as_str(self) -> &'static str {
147        match self {
148            ConflictKind::MissingInPeer => "missing_in_peer",
149            ConflictKind::MissingLocally => "missing_locally",
150            ConflictKind::ConfidenceDiverged => "confidence_diverged",
151            ConflictKind::RetractedDiverged => "retracted_diverged",
152            ConflictKind::ReviewStateDiverged => "review_state_diverged",
153            ConflictKind::SupersededDiverged => "superseded_diverged",
154            ConflictKind::AssertionTextDiverged => "assertion_text_diverged",
155            ConflictKind::BrokenLocator => "broken_locator",
156            ConflictKind::UnverifiedPeerEntry => "unverified_peer_entry",
157        }
158    }
159}
160
161/// One per-finding disagreement detected during sync.
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct Conflict {
164    pub finding_id: String,
165    pub kind: ConflictKind,
166    /// Free-form context for the rendering layer ("our: 0.82, peer:
167    /// 0.65"). Not part of any content address.
168    pub detail: String,
169}
170
171/// Result of one `sync_with_peer` pass.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct SyncReport {
174    pub peer_id: String,
175    pub our_snapshot_hash: String,
176    pub peer_snapshot_hash: String,
177    pub conflicts: Vec<Conflict>,
178    /// Number of `StateEvent`s appended to our project (1 sync event +
179    /// N conflict events).
180    pub events_appended: usize,
181}
182
183/// v0.39.1: Diff two frontiers and produce a list of conflicts. Pure
184/// function, no I/O. The peer's state is passed in directly so the
185/// sync orchestrator can be unit-tested without HTTP.
186///
187/// Confidence diff threshold is 0.05 — below that it's recompute drift
188/// (the v0.36.1 formula change moved scores by < 0.001 on real data).
189#[must_use]
190pub fn diff_frontiers(ours: &Project, theirs: &Project) -> Vec<Conflict> {
191    use std::collections::HashMap;
192
193    let our_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
194        ours.findings.iter().map(|f| (f.id.as_str(), f)).collect();
195    let their_by_id: HashMap<&str, &crate::bundle::FindingBundle> =
196        theirs.findings.iter().map(|f| (f.id.as_str(), f)).collect();
197
198    let mut conflicts = Vec::new();
199
200    // Findings only in ours.
201    for id in our_by_id.keys() {
202        if !their_by_id.contains_key(id) {
203            conflicts.push(Conflict {
204                finding_id: (*id).to_string(),
205                kind: ConflictKind::MissingInPeer,
206                detail: "present locally, absent in peer".to_string(),
207            });
208        }
209    }
210    // Findings only in theirs.
211    for id in their_by_id.keys() {
212        if !our_by_id.contains_key(id) {
213            conflicts.push(Conflict {
214                finding_id: (*id).to_string(),
215                kind: ConflictKind::MissingLocally,
216                detail: "present in peer, absent locally".to_string(),
217            });
218        }
219    }
220    // Findings in both — check field-level disagreements.
221    for (id, ours_f) in &our_by_id {
222        let Some(theirs_f) = their_by_id.get(id) else {
223            continue;
224        };
225        if (ours_f.confidence.score - theirs_f.confidence.score).abs() > 0.05 {
226            conflicts.push(Conflict {
227                finding_id: (*id).to_string(),
228                kind: ConflictKind::ConfidenceDiverged,
229                detail: format!(
230                    "ours: {:.3}, peer: {:.3}",
231                    ours_f.confidence.score, theirs_f.confidence.score
232                ),
233            });
234        }
235        if ours_f.flags.retracted != theirs_f.flags.retracted {
236            conflicts.push(Conflict {
237                finding_id: (*id).to_string(),
238                kind: ConflictKind::RetractedDiverged,
239                detail: format!(
240                    "ours: {}, peer: {}",
241                    ours_f.flags.retracted, theirs_f.flags.retracted
242                ),
243            });
244        }
245        if ours_f.flags.review_state != theirs_f.flags.review_state {
246            conflicts.push(Conflict {
247                finding_id: (*id).to_string(),
248                kind: ConflictKind::ReviewStateDiverged,
249                detail: format!(
250                    "ours: {:?}, peer: {:?}",
251                    ours_f.flags.review_state, theirs_f.flags.review_state
252                ),
253            });
254        }
255        if ours_f.flags.superseded != theirs_f.flags.superseded {
256            conflicts.push(Conflict {
257                finding_id: (*id).to_string(),
258                kind: ConflictKind::SupersededDiverged,
259                detail: format!(
260                    "ours: {}, peer: {}",
261                    ours_f.flags.superseded, theirs_f.flags.superseded
262                ),
263            });
264        }
265        if ours_f.assertion.text.trim() != theirs_f.assertion.text.trim() {
266            conflicts.push(Conflict {
267                finding_id: (*id).to_string(),
268                kind: ConflictKind::AssertionTextDiverged,
269                detail:
270                    "matching id but diverging assertion text — possible content-address collision"
271                        .to_string(),
272            });
273        }
274    }
275
276    conflicts.sort_by(|a, b| {
277        a.finding_id
278            .cmp(&b.finding_id)
279            .then_with(|| a.kind.as_str().cmp(b.kind.as_str()))
280    });
281    conflicts
282}
283
284/// v0.41.0: Record a single broken-locator conflict against a peer.
285/// Emits one `frontier.synced_with_peer` event with `divergence_count
286/// = 1` plus one `frontier.conflict_detected` event of kind
287/// `broken_locator`. Used when the peer hub is reachable, its
288/// registry entry signature verifies, but the locator URL the entry
289/// points at returns 4xx/5xx — common for stale published locators
290/// after a repo move.
291pub fn record_locator_failure(
292    project: &mut Project,
293    peer_id: &str,
294    vfr_id: &str,
295    locator: &str,
296    status: u16,
297) -> SyncReport {
298    let now = Utc::now().to_rfc3339();
299    let our_hash = snapshot_hash(project);
300    let frontier_id = project.frontier_id();
301    let detail = format!("locator {locator} returned HTTP {status}");
302
303    let synced_event = StateEvent {
304        schema: EVENT_SCHEMA.to_string(),
305        id: String::new(),
306        kind: "frontier.synced_with_peer".to_string(),
307        target: StateTarget {
308            r#type: "frontier_observation".to_string(),
309            id: frontier_id.clone(),
310        },
311        actor: StateActor {
312            id: "federation".to_string(),
313            r#type: "system".to_string(),
314        },
315        timestamp: now.clone(),
316        reason: format!("synced with peer {peer_id} (broken locator)"),
317        before_hash: NULL_HASH.to_string(),
318        after_hash: NULL_HASH.to_string(),
319        payload: json!({
320            "peer_id": peer_id,
321            "peer_snapshot_hash": "",
322            "our_snapshot_hash": our_hash,
323            "divergence_count": 1,
324        }),
325        caveats: Vec::new(),
326        signature: None,
327        schema_artifact_id: None,
328    };
329    let mut sync_ev = synced_event;
330    sync_ev.id = compute_event_id(&sync_ev);
331
332    let conflict_ev = StateEvent {
333        schema: EVENT_SCHEMA.to_string(),
334        id: String::new(),
335        kind: "frontier.conflict_detected".to_string(),
336        target: StateTarget {
337            r#type: "frontier_observation".to_string(),
338            id: frontier_id.clone(),
339        },
340        actor: StateActor {
341            id: "federation".to_string(),
342            r#type: "system".to_string(),
343        },
344        timestamp: now.clone(),
345        reason: format!("peer={peer_id} kind=broken_locator {detail}"),
346        before_hash: NULL_HASH.to_string(),
347        after_hash: NULL_HASH.to_string(),
348        payload: json!({
349            "peer_id": peer_id,
350            "finding_id": vfr_id,
351            "kind": "broken_locator",
352            "detail": detail,
353        }),
354        caveats: Vec::new(),
355        signature: None,
356        schema_artifact_id: None,
357    };
358    let mut conflict_ev = conflict_ev;
359    conflict_ev.id = compute_event_id(&conflict_ev);
360
361    project.events.push(sync_ev);
362    project.events.push(conflict_ev);
363
364    SyncReport {
365        peer_id: peer_id.to_string(),
366        our_snapshot_hash: our_hash,
367        peer_snapshot_hash: String::new(),
368        conflicts: vec![Conflict {
369            finding_id: vfr_id.to_string(),
370            kind: ConflictKind::BrokenLocator,
371            detail,
372        }],
373        events_appended: 2,
374    }
375}
376
377/// v0.41.0: Record an unverified-peer-entry conflict. Same shape as
378/// `record_locator_failure` but for when the peer's registry entry
379/// signature did not verify against the registered owner pubkey.
380/// Sync halts before any content is fetched — the kernel won't trust
381/// unsigned-or-misverified state.
382pub fn record_unverified_entry(
383    project: &mut Project,
384    peer_id: &str,
385    vfr_id: &str,
386    reason: &str,
387) -> SyncReport {
388    let now = Utc::now().to_rfc3339();
389    let our_hash = snapshot_hash(project);
390    let frontier_id = project.frontier_id();
391
392    let mut sync_ev = StateEvent {
393        schema: EVENT_SCHEMA.to_string(),
394        id: String::new(),
395        kind: "frontier.synced_with_peer".to_string(),
396        target: StateTarget {
397            r#type: "frontier_observation".to_string(),
398            id: frontier_id.clone(),
399        },
400        actor: StateActor {
401            id: "federation".to_string(),
402            r#type: "system".to_string(),
403        },
404        timestamp: now.clone(),
405        reason: format!("synced with peer {peer_id} (unverified entry; halted)"),
406        before_hash: NULL_HASH.to_string(),
407        after_hash: NULL_HASH.to_string(),
408        payload: json!({
409            "peer_id": peer_id,
410            "peer_snapshot_hash": "",
411            "our_snapshot_hash": our_hash,
412            "divergence_count": 1,
413        }),
414        caveats: Vec::new(),
415        signature: None,
416        schema_artifact_id: None,
417    };
418    sync_ev.id = compute_event_id(&sync_ev);
419
420    let mut conflict_ev = StateEvent {
421        schema: EVENT_SCHEMA.to_string(),
422        id: String::new(),
423        kind: "frontier.conflict_detected".to_string(),
424        target: StateTarget {
425            r#type: "frontier_observation".to_string(),
426            id: frontier_id.clone(),
427        },
428        actor: StateActor {
429            id: "federation".to_string(),
430            r#type: "system".to_string(),
431        },
432        timestamp: now.clone(),
433        reason: format!("peer={peer_id} kind=unverified_peer_entry {reason}"),
434        before_hash: NULL_HASH.to_string(),
435        after_hash: NULL_HASH.to_string(),
436        payload: json!({
437            "peer_id": peer_id,
438            "finding_id": vfr_id,
439            "kind": "unverified_peer_entry",
440            "detail": reason,
441        }),
442        caveats: Vec::new(),
443        signature: None,
444        schema_artifact_id: None,
445    };
446    conflict_ev.id = compute_event_id(&conflict_ev);
447
448    project.events.push(sync_ev);
449    project.events.push(conflict_ev);
450
451    SyncReport {
452        peer_id: peer_id.to_string(),
453        our_snapshot_hash: our_hash,
454        peer_snapshot_hash: String::new(),
455        conflicts: vec![Conflict {
456            finding_id: vfr_id.to_string(),
457            kind: ConflictKind::UnverifiedPeerEntry,
458            detail: reason.to_string(),
459        }],
460        events_appended: 2,
461    }
462}
463
464/// v0.85: Classify the merge action for a peer event set against
465/// our local event log. Pure event-level algebra: takes peer event
466/// summaries (id, parents) and our local pool, and decides whether
467/// merge can proceed or whether ancestors must be fetched first.
468///
469/// Implements the federation half of `docs/THEORY.md` §5.2 / §5.3:
470/// only causally down-closed event sets are valid replay inputs;
471/// when ancestors are missing, merge is undefined until they are
472/// fetched or an explicit fork policy is invoked.
473///
474/// Returns:
475/// - [`AncestorAction::Proceed`] if every parent referenced by the
476///   peer's events is already present in `our_events ∪ peer_events`.
477///   Caller can safely apply [`sync_with_peer`] after persisting
478///   the new events.
479/// - [`AncestorAction::Fetch`] with the list of missing ancestor
480///   ids, otherwise. Caller should fetch and re-classify before
481///   applying sync.
482///
483/// This function only reads event ids and parent links; it does
484/// not require the full event payloads. Suitable for event-summary
485/// gossip and for federation flows that stream events incrementally
486/// rather than synchronizing whole Projects.
487pub fn classify_peer_event_set(
488    our_events: &[StateEvent],
489    peer_events: &[StateEvent],
490) -> crate::ancestor_closure::AncestorAction {
491    use crate::ancestor_closure::classify_ancestor_action;
492    // Combine local and peer events into the candidate post-merge
493    // set. Each event is represented as (id, parents). For the
494    // current substrate, StateEvent does not yet carry an explicit
495    // parents field (see THEORY.md target v0.7 in the schema column
496    // of the current/target table); in v0.85 we treat empty parents
497    // as the conservative default. Hubs that adopt explicit parents
498    // wire them through here.
499    let combined: Vec<(String, Vec<String>)> = our_events
500        .iter()
501        .chain(peer_events.iter())
502        .map(|e| (e.id.clone(), event_parents(e)))
503        .collect();
504    classify_ancestor_action(combined)
505}
506
507/// Extract parent event ids from an event. v0.85: parents are
508/// optional; events without explicit parents return an empty list.
509/// When schema/reducer artifacts gain explicit parent fields
510/// (target v0.7 per docs/THEORY.md §5.1), this function reads them
511/// without changing callers.
512fn event_parents(e: &StateEvent) -> Vec<String> {
513    e.payload
514        .get("parents")
515        .and_then(|v| v.as_array())
516        .map(|arr| {
517            arr.iter()
518                .filter_map(|p| p.as_str().map(String::from))
519                .collect()
520        })
521        .unwrap_or_default()
522}
523
524/// v0.39.1: Run a full sync pass against a peer's already-fetched
525/// frontier state. Diffs, emits one `frontier.synced_with_peer`
526/// event recording the pass, and one `frontier.conflict_detected`
527/// event per disagreement. Returns the report; caller persists the
528/// project.
529///
530/// Splitting fetch from sync this way lets the sync logic be
531/// fully unit-testable without HTTP — the CLI pipes a real fetch
532/// into this function.
533pub fn sync_with_peer(project: &mut Project, peer_id: &str, peer: &Project) -> SyncReport {
534    let our_hash = snapshot_hash(project);
535    let peer_hash = snapshot_hash(peer);
536    let conflicts = diff_frontiers(project, peer);
537
538    let now = Utc::now().to_rfc3339();
539    let frontier_id = project.frontier_id().clone();
540
541    // v0.39.1 fix: federation events are frontier-level *observations*,
542    // not finding-level state changes. Target the frontier (vfr_id)
543    // with `target.type = "frontier_observation"` so:
544    //   - replay's per-finding chain validator skips them (chain
545    //     only runs on `target.type == "finding"`);
546    //   - the orphan check skips them (orphan check only flags
547    //     finding-targeted events whose finding_id is unknown).
548    // The `finding_id` of each conflict still lives in the payload
549    // for downstream queries; only the canonical event target is the
550    // frontier.
551    let synced_reason = format!("synced with peer {peer_id}");
552    let mut synced_event = StateEvent {
553        schema: EVENT_SCHEMA.to_string(),
554        id: String::new(),
555        kind: "frontier.synced_with_peer".to_string(),
556        target: StateTarget {
557            r#type: "frontier_observation".to_string(),
558            id: frontier_id.clone(),
559        },
560        actor: StateActor {
561            id: "federation".to_string(),
562            r#type: "system".to_string(),
563        },
564        timestamp: now.clone(),
565        reason: synced_reason,
566        before_hash: NULL_HASH.to_string(),
567        after_hash: NULL_HASH.to_string(),
568        payload: json!({
569            "peer_id": peer_id,
570            "peer_snapshot_hash": peer_hash,
571            "our_snapshot_hash": our_hash,
572            "divergence_count": conflicts.len(),
573        }),
574        caveats: Vec::new(),
575        signature: None,
576        schema_artifact_id: None,
577    };
578    synced_event.id = compute_event_id(&synced_event);
579
580    let mut conflict_events: Vec<StateEvent> = Vec::with_capacity(conflicts.len());
581    for c in &conflicts {
582        let reason = format!("peer={peer_id} kind={} {}", c.kind.as_str(), c.detail);
583        let mut ev = StateEvent {
584            schema: EVENT_SCHEMA.to_string(),
585            id: String::new(),
586            kind: "frontier.conflict_detected".to_string(),
587            target: StateTarget {
588                r#type: "frontier_observation".to_string(),
589                id: frontier_id.clone(),
590            },
591            actor: StateActor {
592                id: "federation".to_string(),
593                r#type: "system".to_string(),
594            },
595            timestamp: now.clone(),
596            reason,
597            before_hash: NULL_HASH.to_string(),
598            after_hash: NULL_HASH.to_string(),
599            payload: json!({
600                "peer_id": peer_id,
601                "finding_id": c.finding_id,
602                "kind": c.kind.as_str(),
603                "detail": c.detail,
604            }),
605            caveats: Vec::new(),
606            signature: None,
607            schema_artifact_id: None,
608        };
609        ev.id = compute_event_id(&ev);
610        conflict_events.push(ev);
611    }
612
613    let events_appended = 1 + conflict_events.len();
614    project.events.push(synced_event);
615    project.events.extend(conflict_events);
616
617    SyncReport {
618        peer_id: peer_id.to_string(),
619        our_snapshot_hash: our_hash,
620        peer_snapshot_hash: peer_hash,
621        conflicts,
622        events_appended,
623    }
624}
625
626/// v0.41.0: Result of trying to discover a peer's frontier through
627/// the hub's `/entries/<vfr_id>` endpoint. The runtime needs to
628/// distinguish three failure modes — peer unreachable, registry
629/// entry signature invalid, and locator URL dead — because each one
630/// has a different remediation.
631#[derive(Debug)]
632pub enum DiscoveryResult {
633    /// Hub returned a valid entry, signature verified, locator
634    /// fetched, manifest parsed. Includes the project for
635    /// downstream diff.
636    Resolved(Project),
637    /// Hub /entries/<vfr_id> returned 4xx/5xx — peer doesn't claim
638    /// to know this vfr_id.
639    EntryNotFound { vfr_id: String, status: u16 },
640    /// Hub returned an entry but its signature does not verify
641    /// against the registered peer pubkey. Halts content sync.
642    UnverifiedEntry { vfr_id: String, reason: String },
643    /// Hub entry verifies, but its `network_locator` URL returns
644    /// 4xx/5xx. Stale-locator failure mode.
645    BrokenLocator {
646        vfr_id: String,
647        locator: String,
648        status: u16,
649    },
650    /// Network error to the hub itself or to the locator.
651    Unreachable { url: String, error: String },
652}
653
654/// v0.41.0: Discover a peer frontier by routing through the hub's
655/// `/entries/<vfr_id>` endpoint. Verifies the registry entry's
656/// signature against `expected_owner_pubkey`, then follows
657/// `entry.network_locator` to fetch the actual manifest.
658///
659/// This is the "real federation" path: hubs publish signed registry
660/// entries pointing at content URLs; sync fetches both, verifying the
661/// signature chain end-to-end. If any step fails, the failure mode
662/// is captured as a typed result so the calling sync runtime can
663/// emit the appropriate `Conflict` (BrokenLocator, UnverifiedEntry,
664/// etc.) rather than blackhole'ing the error.
665pub fn discover_peer_frontier(
666    hub_url: &str,
667    vfr_id: &str,
668    expected_owner_pubkey: Option<&str>,
669) -> DiscoveryResult {
670    let hub = hub_url.trim_end_matches('/').to_string();
671    let entries_url = format!("{hub}/entries/{vfr_id}");
672    let vfr_owned = vfr_id.to_string();
673    let expected = expected_owner_pubkey.map(|s| s.to_string());
674
675    std::thread::spawn(move || -> DiscoveryResult {
676        let resp = match reqwest::blocking::get(&entries_url) {
677            Ok(r) => r,
678            Err(e) => {
679                return DiscoveryResult::Unreachable {
680                    url: entries_url.clone(),
681                    error: e.to_string(),
682                };
683            }
684        };
685        let status = resp.status();
686        if status.as_u16() == 404 {
687            return DiscoveryResult::EntryNotFound {
688                vfr_id: vfr_owned,
689                status: status.as_u16(),
690            };
691        }
692        if !status.is_success() {
693            return DiscoveryResult::Unreachable {
694                url: entries_url.clone(),
695                error: format!("hub returned HTTP {status}"),
696            };
697        }
698        let body = match resp.text() {
699            Ok(b) => b,
700            Err(e) => {
701                return DiscoveryResult::Unreachable {
702                    url: entries_url.clone(),
703                    error: format!("read body: {e}"),
704                };
705            }
706        };
707        let entry: crate::registry::RegistryEntry = match serde_json::from_str(&body) {
708            Ok(e) => e,
709            Err(e) => {
710                return DiscoveryResult::UnverifiedEntry {
711                    vfr_id: vfr_owned,
712                    reason: format!("parse registry entry: {e}"),
713                };
714            }
715        };
716
717        // Verify signature.
718        match crate::registry::verify_entry(&entry) {
719            Ok(true) => {}
720            Ok(false) => {
721                return DiscoveryResult::UnverifiedEntry {
722                    vfr_id: vfr_owned,
723                    reason: "registry entry signature does not verify against entry.owner_pubkey"
724                        .to_string(),
725                };
726            }
727            Err(e) => {
728                return DiscoveryResult::UnverifiedEntry {
729                    vfr_id: vfr_owned,
730                    reason: format!("signature verification error: {e}"),
731                };
732            }
733        }
734        // Cross-check expected pubkey if the caller supplied one.
735        if let Some(want) = expected.as_deref()
736            && entry.owner_pubkey != want
737        {
738            return DiscoveryResult::UnverifiedEntry {
739                vfr_id: vfr_owned,
740                reason: format!(
741                    "entry owner_pubkey {} != expected peer pubkey {}",
742                    &entry.owner_pubkey[..16],
743                    &want[..16]
744                ),
745            };
746        }
747
748        // Follow locator to fetch the manifest.
749        let locator = entry.network_locator.clone();
750        let mresp = match reqwest::blocking::get(&locator) {
751            Ok(r) => r,
752            Err(e) => {
753                return DiscoveryResult::BrokenLocator {
754                    vfr_id: vfr_owned,
755                    locator,
756                    status: 0,
757                }
758                .with_error(e.to_string());
759            }
760        };
761        let mstatus = mresp.status();
762        if !mstatus.is_success() {
763            return DiscoveryResult::BrokenLocator {
764                vfr_id: vfr_owned,
765                locator,
766                status: mstatus.as_u16(),
767            };
768        }
769        let mbody = match mresp.text() {
770            Ok(b) => b,
771            Err(e) => {
772                return DiscoveryResult::BrokenLocator {
773                    vfr_id: vfr_owned,
774                    locator,
775                    status: 0,
776                }
777                .with_error(e.to_string());
778            }
779        };
780        match serde_json::from_str::<Project>(&mbody) {
781            Ok(p) => DiscoveryResult::Resolved(p),
782            Err(e) => DiscoveryResult::BrokenLocator {
783                vfr_id: vfr_owned,
784                locator,
785                status: 0,
786            }
787            .with_error(format!("manifest parse: {e}")),
788        }
789    })
790    .join()
791    .unwrap_or(DiscoveryResult::Unreachable {
792        url: hub_url.to_string(),
793        error: "discovery thread panicked".to_string(),
794    })
795}
796
797impl DiscoveryResult {
798    fn with_error(self, _ctx: String) -> Self {
799        // BrokenLocator already carries status; reserved hook for
800        // richer diagnostics later.
801        self
802    }
803}
804
805/// v0.39.1: Fetch a peer's frontier JSON over HTTP. The URL is
806/// expected to serve a JSON-serialized `Project`. Blocking call —
807/// `vela federation sync` is a one-shot CLI verb, not a service.
808///
809/// Implementation note: the CLI top-level dispatcher runs inside a
810/// tokio runtime, but `reqwest::blocking` panics if dropped inside
811/// an async context. We escape into a dedicated OS thread that owns
812/// its own runtime, making the call safe to issue from sync code
813/// regardless of who's calling it.
814///
815/// Verification of peer signatures (and registry entries) is a
816/// separate concern, addressed in v0.39.2+. v0.39.1 trusts the
817/// transport so the sync diff/event-emission machinery can be
818/// validated against real peer state first.
819pub fn fetch_peer_frontier(url: &str) -> Result<Project, String> {
820    let url_owned = url.to_string();
821    let handle = std::thread::spawn(move || -> Result<Project, String> {
822        let resp = reqwest::blocking::get(&url_owned)
823            .map_err(|e| format!("HTTP GET {url_owned} failed: {e}"))?;
824        let status = resp.status();
825        if !status.is_success() {
826            return Err(format!("peer returned HTTP {status}"));
827        }
828        let body = resp
829            .text()
830            .map_err(|e| format!("read body from {url_owned}: {e}"))?;
831        serde_json::from_str(&body)
832            .map_err(|e| format!("parse peer frontier from {url_owned}: {e}"))
833    });
834    handle
835        .join()
836        .map_err(|_| "fetch thread panicked".to_string())?
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842
843    fn good() -> PeerHub {
844        PeerHub {
845            id: "hub:test".into(),
846            url: "https://example.invalid/".into(),
847            public_key: "00".repeat(32),
848            added_at: "2026-04-27T00:00:00Z".into(),
849            note: String::new(),
850        }
851    }
852
853    #[test]
854    fn validates_correct_shape() {
855        assert!(good().validate().is_ok());
856    }
857
858    #[test]
859    fn rejects_empty_id() {
860        let mut p = good();
861        p.id = "  ".into();
862        assert!(p.validate().is_err());
863    }
864
865    #[test]
866    fn rejects_http_url() {
867        let mut p = good();
868        p.url = "http://insecure.example/".into();
869        assert!(p.validate().is_err());
870    }
871
872    #[test]
873    fn rejects_short_pubkey() {
874        let mut p = good();
875        p.public_key = "abcd".into();
876        assert!(p.validate().is_err());
877    }
878
879    #[test]
880    fn rejects_non_hex_pubkey() {
881        let mut p = good();
882        p.public_key = "z".repeat(64);
883        assert!(p.validate().is_err());
884    }
885
886    // ── v0.39.1 sync runtime tests ───────────────────────────────────
887
888    use crate::bundle::{
889        Assertion, Conditions, Confidence, Evidence, Extraction, FindingBundle, Flags, Provenance,
890        ReviewState,
891    };
892    use crate::project::{self, Project};
893
894    fn finding(id: &str, score: f64) -> FindingBundle {
895        let mut b = FindingBundle::new(
896            Assertion {
897                text: format!("claim {id}"),
898                assertion_type: "mechanism".into(),
899                entities: vec![],
900                relation: None,
901                direction: None,
902                causal_claim: None,
903                causal_evidence_grade: None,
904            },
905            Evidence {
906                evidence_type: "experimental".into(),
907                model_system: String::new(),
908                species: None,
909                method: String::new(),
910                sample_size: Some("n=30".into()),
911                effect_size: None,
912                p_value: None,
913                replicated: false,
914                replication_count: None,
915                evidence_spans: vec![],
916            },
917            Conditions {
918                text: String::new(),
919                species_verified: vec![],
920                species_unverified: vec![],
921                in_vitro: false,
922                in_vivo: false,
923                human_data: false,
924                clinical_trial: false,
925                concentration_range: None,
926                duration: None,
927                age_group: None,
928                cell_type: None,
929            },
930            Confidence::raw(score, "test", 0.85),
931            Provenance {
932                source_type: "published_paper".into(),
933                doi: None,
934                pmid: None,
935                pmc: None,
936                openalex_id: None,
937                url: None,
938                title: "Test".into(),
939                authors: vec![],
940                year: Some(2025),
941                journal: None,
942                license: None,
943                publisher: None,
944                funders: vec![],
945                extraction: Extraction::default(),
946                review: None,
947                citation_count: None,
948            },
949            Flags::default(),
950        );
951        b.id = id.to_string();
952        b
953    }
954
955    fn assemble(name: &str, findings: Vec<FindingBundle>) -> Project {
956        project::assemble(name, findings, 1, 0, "test")
957    }
958
959    #[test]
960    fn diff_identical_frontiers_returns_no_conflicts() {
961        let f = finding("vf_001", 0.7);
962        let ours = assemble("ours", vec![f.clone()]);
963        let theirs = assemble("theirs", vec![f]);
964        let conflicts = diff_frontiers(&ours, &theirs);
965        assert_eq!(conflicts.len(), 0);
966    }
967
968    #[test]
969    fn diff_detects_missing_in_peer_and_locally() {
970        let f1 = finding("vf_001", 0.7);
971        let f2 = finding("vf_002", 0.7);
972        let ours = assemble("ours", vec![f1.clone()]);
973        let theirs = assemble("theirs", vec![f2.clone()]);
974        let conflicts = diff_frontiers(&ours, &theirs);
975        let kinds: Vec<&str> = conflicts.iter().map(|c| c.kind.as_str()).collect();
976        assert!(kinds.contains(&"missing_in_peer"));
977        assert!(kinds.contains(&"missing_locally"));
978    }
979
980    #[test]
981    fn diff_detects_confidence_divergence_above_threshold() {
982        let mut f_ours = finding("vf_001", 0.85);
983        let mut f_theirs = finding("vf_001", 0.55);
984        // Force same id by aligning content; here they share id by construction.
985        f_ours.id = "vf_001".into();
986        f_theirs.id = "vf_001".into();
987        let ours = assemble("ours", vec![f_ours]);
988        let theirs = assemble("theirs", vec![f_theirs]);
989        let conflicts = diff_frontiers(&ours, &theirs);
990        assert!(
991            conflicts
992                .iter()
993                .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
994            "expected confidence_diverged in {conflicts:?}"
995        );
996    }
997
998    #[test]
999    fn diff_ignores_confidence_drift_below_threshold() {
1000        let mut f_ours = finding("vf_001", 0.700);
1001        let mut f_theirs = finding("vf_001", 0.730);
1002        f_ours.id = "vf_001".into();
1003        f_theirs.id = "vf_001".into();
1004        let ours = assemble("ours", vec![f_ours]);
1005        let theirs = assemble("theirs", vec![f_theirs]);
1006        let conflicts = diff_frontiers(&ours, &theirs);
1007        assert!(
1008            !conflicts
1009                .iter()
1010                .any(|c| c.kind == ConflictKind::ConfidenceDiverged),
1011            "0.03 drift should not flag: {conflicts:?}"
1012        );
1013    }
1014
1015    #[test]
1016    fn diff_detects_retracted_divergence() {
1017        let mut f_ours = finding("vf_001", 0.7);
1018        let mut f_theirs = finding("vf_001", 0.7);
1019        f_ours.id = "vf_001".into();
1020        f_theirs.id = "vf_001".into();
1021        f_theirs.flags.retracted = true;
1022        let ours = assemble("ours", vec![f_ours]);
1023        let theirs = assemble("theirs", vec![f_theirs]);
1024        let conflicts = diff_frontiers(&ours, &theirs);
1025        assert!(
1026            conflicts
1027                .iter()
1028                .any(|c| c.kind == ConflictKind::RetractedDiverged)
1029        );
1030    }
1031
1032    #[test]
1033    fn diff_detects_review_state_divergence() {
1034        let mut f_ours = finding("vf_001", 0.7);
1035        let mut f_theirs = finding("vf_001", 0.7);
1036        f_ours.id = "vf_001".into();
1037        f_theirs.id = "vf_001".into();
1038        f_theirs.flags.review_state = Some(ReviewState::Contested);
1039        let ours = assemble("ours", vec![f_ours]);
1040        let theirs = assemble("theirs", vec![f_theirs]);
1041        let conflicts = diff_frontiers(&ours, &theirs);
1042        assert!(
1043            conflicts
1044                .iter()
1045                .any(|c| c.kind == ConflictKind::ReviewStateDiverged)
1046        );
1047    }
1048
1049    #[test]
1050    fn diff_detects_assertion_text_divergence() {
1051        let mut f_ours = finding("vf_001", 0.7);
1052        let mut f_theirs = finding("vf_001", 0.7);
1053        f_ours.id = "vf_001".into();
1054        f_theirs.id = "vf_001".into();
1055        f_theirs.assertion.text = "different claim".into();
1056        let ours = assemble("ours", vec![f_ours]);
1057        let theirs = assemble("theirs", vec![f_theirs]);
1058        let conflicts = diff_frontiers(&ours, &theirs);
1059        assert!(
1060            conflicts
1061                .iter()
1062                .any(|c| c.kind == ConflictKind::AssertionTextDiverged)
1063        );
1064    }
1065
1066    #[test]
1067    fn sync_appends_one_synced_event_plus_one_per_conflict() {
1068        let mut f_ours = finding("vf_001", 0.7);
1069        let mut f_theirs = finding("vf_001", 0.7);
1070        f_ours.id = "vf_001".into();
1071        f_theirs.id = "vf_001".into();
1072        f_theirs.flags.retracted = true;
1073        let mut ours = assemble("ours", vec![f_ours]);
1074        let theirs = assemble("theirs", vec![f_theirs]);
1075        let events_before = ours.events.len();
1076        let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1077        assert_eq!(report.conflicts.len(), 1);
1078        assert_eq!(report.events_appended, 2); // 1 sync + 1 conflict
1079        assert_eq!(ours.events.len() - events_before, 2);
1080        // The first appended event is the sync record.
1081        let sync_ev = &ours.events[events_before];
1082        assert_eq!(sync_ev.kind, "frontier.synced_with_peer");
1083        assert_eq!(sync_ev.payload["divergence_count"].as_u64(), Some(1));
1084        // The second is the conflict.
1085        let conf_ev = &ours.events[events_before + 1];
1086        assert_eq!(conf_ev.kind, "frontier.conflict_detected");
1087        assert_eq!(conf_ev.payload["kind"], "retracted_diverged");
1088    }
1089
1090    #[test]
1091    fn sync_with_clean_diff_emits_zero_divergence_event() {
1092        let f = finding("vf_001", 0.7);
1093        let mut ours = assemble("ours", vec![f.clone()]);
1094        let theirs = assemble("theirs", vec![f]);
1095        let report = sync_with_peer(&mut ours, "hub:test-peer", &theirs);
1096        assert_eq!(report.conflicts.len(), 0);
1097        assert_eq!(report.events_appended, 1);
1098        let last = ours.events.last().unwrap();
1099        assert_eq!(last.kind, "frontier.synced_with_peer");
1100        assert_eq!(last.payload["divergence_count"].as_u64(), Some(0));
1101    }
1102
1103    fn make_event(id: &str, parents: &[&str]) -> StateEvent {
1104        StateEvent {
1105            schema: EVENT_SCHEMA.to_string(),
1106            id: id.to_string(),
1107            kind: "test.event".to_string(),
1108            target: StateTarget {
1109                r#type: "frontier_observation".to_string(),
1110                id: "vfr_test".to_string(),
1111            },
1112            actor: StateActor {
1113                id: "test".to_string(),
1114                r#type: "system".to_string(),
1115            },
1116            timestamp: "2026-05-09T00:00:00Z".to_string(),
1117            reason: "test".to_string(),
1118            before_hash: NULL_HASH.to_string(),
1119            after_hash: NULL_HASH.to_string(),
1120            payload: json!({"parents": parents}),
1121            caveats: vec![],
1122            signature: None,
1123            schema_artifact_id: None,
1124        }
1125    }
1126
1127    #[test]
1128    fn classify_returns_proceed_when_peer_set_is_complete() {
1129        // Local has e1; peer has e2 with parent e1. Combined set
1130        // is causally down-closed: Proceed.
1131        let our_events = vec![make_event("e1", &[])];
1132        let peer_events = vec![make_event("e2", &["e1"])];
1133        let action = classify_peer_event_set(&our_events, &peer_events);
1134        assert!(matches!(
1135            action,
1136            crate::ancestor_closure::AncestorAction::Proceed
1137        ));
1138    }
1139
1140    #[test]
1141    fn classify_returns_fetch_when_peer_references_missing_ancestor() {
1142        // Peer sends e3 with parent e2; we don't have e2.
1143        // Expected action: Fetch with missing=[e2].
1144        let our_events = vec![make_event("e1", &[])];
1145        let peer_events = vec![make_event("e3", &["e2"])];
1146        let action = classify_peer_event_set(&our_events, &peer_events);
1147        match action {
1148            crate::ancestor_closure::AncestorAction::Fetch { missing } => {
1149                assert_eq!(missing, vec!["e2"]);
1150            }
1151            other => panic!("expected Fetch, got {other:?}"),
1152        }
1153    }
1154
1155    #[test]
1156    fn classify_handles_events_without_explicit_parents() {
1157        // Events without `parents` payload key: treated as having
1158        // no parents. Trivially down-closed.
1159        let our_events = vec![StateEvent {
1160            schema: EVENT_SCHEMA.to_string(),
1161            id: "e1".to_string(),
1162            kind: "test.event".to_string(),
1163            target: StateTarget {
1164                r#type: "frontier_observation".to_string(),
1165                id: "vfr_test".to_string(),
1166            },
1167            actor: StateActor {
1168                id: "test".to_string(),
1169                r#type: "system".to_string(),
1170            },
1171            timestamp: "2026-05-09T00:00:00Z".to_string(),
1172            reason: "test".to_string(),
1173            before_hash: NULL_HASH.to_string(),
1174            after_hash: NULL_HASH.to_string(),
1175            payload: json!({}),
1176            caveats: vec![],
1177            signature: None,
1178            schema_artifact_id: None,
1179        }];
1180        let peer_events = vec![];
1181        let action = classify_peer_event_set(&our_events, &peer_events);
1182        assert!(matches!(
1183            action,
1184            crate::ancestor_closure::AncestorAction::Proceed
1185        ));
1186    }
1187}