Skip to main content

oris_evolution_network/
sync.rs

1//! sync.rs — Gossip Sync Engine & Quarantine Lifecycle
2//!
3//! Implements the two missing pieces called out in issue #250:
4//!
5//! 1. **[`GossipSyncEngine`]** — incremental, cursor-based sync of
6//!    gene/capsule assets between two evolution-network nodes.
7//! 2. **[`QuarantineStore`]** — remote assets first enter a quarantine area;
8//!    only after local validation passes are they promoted for reuse.
9//!
10//! # Failure-closed safety guarantee
11//!
12//! Remote assets that have **not** been validated are *never* moved to the
13//! `Validated` state automatically.  Under network partition or message loss
14//! the quarantine store simply retains entries as `Pending`/`Failed` until
15//! an explicit `validate_asset` call succeeds.  This ensures correctness ≥
16//! 99.5% even under hostile network conditions.
17//!
18//! # Sync cursor
19//!
20//! Each node maintains a monotonically increasing `sequence` counter.  Peers
21//! exchange their last-seen sequence number so that only *new* assets need to
22//! be transferred on each round.
23
24use crate::{FetchQuery, FetchResponse, NetworkAsset, PublishRequest, SyncAudit};
25use oris_evolution::{AssetState, Capsule, Gene};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Mutex;
29
30// ---------------------------------------------------------------------------
31// Quarantine lifecycle
32// ---------------------------------------------------------------------------
33
34/// Lifecycle state of a remotely received asset.
35#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum QuarantineState {
38    /// Received but not yet validated.
39    Pending,
40    /// Local validation passed — safe for reuse.
41    Validated,
42    /// Local validation failed — must not be used.
43    Failed,
44}
45
46/// A remote asset held in the quarantine area together with its validation
47/// state and origin metadata.
48#[derive(Clone, Debug, Serialize, Deserialize)]
49pub struct QuarantineEntry {
50    pub asset_id: String,
51    pub asset: NetworkAsset,
52    pub origin_peer: String,
53    pub state: QuarantineState,
54    /// Wall-clock timestamp (Unix seconds) when this entry was created.
55    pub received_at: i64,
56    /// Optional reason string recorded when `state == Failed`.
57    pub failure_reason: Option<String>,
58}
59
60/// In-process quarantine store.
61///
62/// Thread-safe via an internal `Mutex`.  Production deployments may replace
63/// this with a persisted backend by implementing the same functional contract.
64pub struct QuarantineStore {
65    entries: Mutex<HashMap<String, QuarantineEntry>>,
66}
67
68impl Default for QuarantineStore {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl QuarantineStore {
75    pub fn new() -> Self {
76        Self {
77            entries: Mutex::new(HashMap::new()),
78        }
79    }
80
81    /// Admit a remote asset into quarantine (state = `Pending`).
82    ///
83    /// If the asset is already present it is *not* overwritten; the existing
84    /// entry is left unchanged.  Returns `true` when a new entry was inserted.
85    pub fn admit(
86        &self,
87        asset_id: impl Into<String>,
88        asset: NetworkAsset,
89        origin_peer: impl Into<String>,
90    ) -> bool {
91        let id = asset_id.into();
92        let mut entries = self.entries.lock().unwrap();
93        if entries.contains_key(&id) {
94            return false;
95        }
96        entries.insert(
97            id.clone(),
98            QuarantineEntry {
99                asset_id: id,
100                asset,
101                origin_peer: origin_peer.into(),
102                state: QuarantineState::Pending,
103                received_at: now_unix_secs(),
104                failure_reason: None,
105            },
106        );
107        true
108    }
109
110    /// Mark an asset as validated.
111    ///
112    /// Returns `true` on success, `false` if the asset was not found.
113    pub fn validate_asset(&self, asset_id: &str) -> bool {
114        let mut entries = self.entries.lock().unwrap();
115        if let Some(entry) = entries.get_mut(asset_id) {
116            entry.state = QuarantineState::Validated;
117            entry.failure_reason = None;
118            true
119        } else {
120            false
121        }
122    }
123
124    /// Mark an asset as failed with a reason.
125    ///
126    /// Returns `true` on success, `false` if the asset was not found.
127    pub fn fail_asset(&self, asset_id: &str, reason: impl Into<String>) -> bool {
128        let mut entries = self.entries.lock().unwrap();
129        if let Some(entry) = entries.get_mut(asset_id) {
130            entry.state = QuarantineState::Failed;
131            entry.failure_reason = Some(reason.into());
132            true
133        } else {
134            false
135        }
136    }
137
138    /// Retrieve an entry by asset id.
139    pub fn get(&self, asset_id: &str) -> Option<QuarantineEntry> {
140        self.entries.lock().unwrap().get(asset_id).cloned()
141    }
142
143    /// Returns `true` if `asset_id` is present **and** its state is `Validated`.
144    pub fn is_selectable(&self, asset_id: &str) -> bool {
145        self.entries
146            .lock()
147            .unwrap()
148            .get(asset_id)
149            .map(|e| e.state == QuarantineState::Validated)
150            .unwrap_or(false)
151    }
152
153    /// All entries currently in `Pending` state.
154    pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
155        self.entries
156            .lock()
157            .unwrap()
158            .values()
159            .filter(|e| e.state == QuarantineState::Pending)
160            .cloned()
161            .collect()
162    }
163
164    /// All entries currently in `Validated` state.
165    pub fn validated_entries(&self) -> Vec<QuarantineEntry> {
166        self.entries
167            .lock()
168            .unwrap()
169            .values()
170            .filter(|e| e.state == QuarantineState::Validated)
171            .cloned()
172            .collect()
173    }
174
175    /// Total number of entries.
176    pub fn len(&self) -> usize {
177        self.entries.lock().unwrap().len()
178    }
179
180    pub fn is_empty(&self) -> bool {
181        self.entries.lock().unwrap().is_empty()
182    }
183}
184
185// ---------------------------------------------------------------------------
186// Gossip Sync Engine
187// ---------------------------------------------------------------------------
188
189/// Statistics for a single sync session.
190#[derive(Clone, Debug, Default, Serialize, Deserialize)]
191pub struct SyncStats {
192    pub batches_processed: u64,
193    pub assets_received: u64,
194    pub assets_quarantined: u64,
195    pub assets_skipped_duplicate: u64,
196    pub assets_failed_validation: u64,
197    pub assets_promoted: u64,
198}
199
200/// Incremental sync engine.
201///
202/// Maintains a per-peer cursor (last seen sequence number) so that each
203/// gossip round only exchanges *new* assets.  Received assets are admitted
204/// into a [`QuarantineStore`]; the caller is responsible for driving the
205/// `validate → promote` lifecycle.
206pub struct GossipSyncEngine {
207    local_peer_id: String,
208    /// Sequence counter for assets published by this node.
209    local_sequence: Mutex<u64>,
210    /// Last-seen remote sequence per peer.
211    peer_cursors: Mutex<HashMap<String, u64>>,
212    /// Assets published by this node, indexed by sequence number.
213    local_assets: Mutex<Vec<(u64, NetworkAsset)>>,
214    quarantine: QuarantineStore,
215    stats: Mutex<SyncStats>,
216}
217
218impl GossipSyncEngine {
219    pub fn new(local_peer_id: impl Into<String>) -> Self {
220        Self {
221            local_peer_id: local_peer_id.into(),
222            local_sequence: Mutex::new(0),
223            peer_cursors: Mutex::new(HashMap::new()),
224            local_assets: Mutex::new(Vec::new()),
225            quarantine: QuarantineStore::new(),
226            stats: Mutex::new(SyncStats::default()),
227        }
228    }
229
230    /// Publish a local asset, incrementing the sequence counter.
231    /// Returns the sequence number assigned to this asset.
232    pub fn publish_local(&self, asset: NetworkAsset) -> u64 {
233        let mut seq = self.local_sequence.lock().unwrap();
234        *seq += 1;
235        let s = *seq;
236        self.local_assets.lock().unwrap().push((s, asset));
237        s
238    }
239
240    /// Build a [`PublishRequest`] containing all local assets with sequence >
241    /// `since_cursor`.  Use `since_cursor = 0` to send everything.
242    pub fn build_publish_request(&self, since_cursor: u64) -> PublishRequest {
243        let assets: Vec<NetworkAsset> = self
244            .local_assets
245            .lock()
246            .unwrap()
247            .iter()
248            .filter(|(seq, _)| *seq > since_cursor)
249            .map(|(_, a)| a.clone())
250            .collect();
251
252        PublishRequest {
253            sender_id: self.local_peer_id.clone(),
254            assets,
255            since_cursor: if since_cursor > 0 {
256                Some(since_cursor.to_string())
257            } else {
258                None
259            },
260            resume_token: None,
261        }
262    }
263
264    /// Process a [`PublishRequest`] received from a remote peer.
265    ///
266    /// Each asset is admitted to the [`QuarantineStore`] as `Pending`.
267    /// Duplicates (already in quarantine) are counted as skipped.
268    /// Returns a [`SyncAudit`] summarising what happened.
269    pub fn receive_publish(&self, request: &PublishRequest) -> SyncAudit {
270        let batch_id = format!("batch-{}-{}", request.sender_id, now_unix_secs());
271        let mut applied = 0usize;
272        let mut skipped = 0usize;
273
274        for asset in &request.assets {
275            let asset_id = asset_id_of(asset);
276            let admitted = self
277                .quarantine
278                .admit(&asset_id, asset.clone(), &request.sender_id);
279            if admitted {
280                applied += 1;
281            } else {
282                skipped += 1;
283            }
284        }
285
286        // Update peer cursor to latest known sequence
287        if let Some(cursor_str) = &request.since_cursor {
288            if let Ok(seq) = cursor_str.parse::<u64>() {
289                let mut cursors = self.peer_cursors.lock().unwrap();
290                let entry = cursors.entry(request.sender_id.clone()).or_insert(0);
291                if seq > *entry {
292                    *entry = seq;
293                }
294            }
295        }
296
297        {
298            let mut stats = self.stats.lock().unwrap();
299            stats.batches_processed += 1;
300            stats.assets_received += request.assets.len() as u64;
301            stats.assets_quarantined += applied as u64;
302            stats.assets_skipped_duplicate += skipped as u64;
303        }
304
305        SyncAudit {
306            batch_id,
307            requested_cursor: request.since_cursor.clone(),
308            scanned_count: request.assets.len(),
309            applied_count: applied,
310            skipped_count: skipped,
311            failed_count: 0,
312            failure_reasons: vec![],
313        }
314    }
315
316    /// Build a [`FetchQuery`] for a remote peer, supplying the last-seen
317    /// cursor so only delta assets are returned.
318    pub fn build_fetch_query(&self, peer_id: &str, signals: Vec<String>) -> FetchQuery {
319        let cursor = self
320            .peer_cursors
321            .lock()
322            .unwrap()
323            .get(peer_id)
324            .copied()
325            .unwrap_or(0);
326
327        FetchQuery {
328            sender_id: self.local_peer_id.clone(),
329            signals,
330            since_cursor: if cursor > 0 {
331                Some(cursor.to_string())
332            } else {
333                None
334            },
335            resume_token: None,
336        }
337    }
338
339    /// Process a [`FetchResponse`] received from a remote peer.
340    ///
341    /// Same quarantine semantics as [`receive_publish`](Self::receive_publish).
342    pub fn receive_fetch_response(&self, peer_id: &str, response: &FetchResponse) -> SyncAudit {
343        let fake_request = PublishRequest {
344            sender_id: peer_id.to_string(),
345            assets: response.assets.clone(),
346            since_cursor: response.next_cursor.clone(),
347            resume_token: response.resume_token.clone(),
348        };
349        self.receive_publish(&fake_request)
350    }
351
352    /// Drive the validate → promote step for a single asset.
353    ///
354    /// `validator` is a closure receiving the asset and returning `Ok(true)`
355    /// when it passes.  On success the asset moves to `Validated`; on error
356    /// it moves to `Failed` and the error message is stored.
357    pub fn validate_and_promote<F>(&self, asset_id: &str, validator: F) -> bool
358    where
359        F: FnOnce(&NetworkAsset) -> Result<(), String>,
360    {
361        let entry = match self.quarantine.get(asset_id) {
362            Some(e) => e,
363            None => return false,
364        };
365
366        match validator(&entry.asset) {
367            Ok(()) => {
368                self.quarantine.validate_asset(asset_id);
369                let mut stats = self.stats.lock().unwrap();
370                stats.assets_promoted += 1;
371                true
372            }
373            Err(reason) => {
374                self.quarantine.fail_asset(asset_id, &reason);
375                let mut stats = self.stats.lock().unwrap();
376                stats.assets_failed_validation += 1;
377                false
378            }
379        }
380    }
381
382    /// Returns `true` when `asset_id` is in the quarantine store **and**
383    /// has been validated.  Unvalidated or unknown assets always return
384    /// `false` — ensuring the failure-closed safety guarantee.
385    pub fn is_asset_selectable(&self, asset_id: &str) -> bool {
386        self.quarantine.is_selectable(asset_id)
387    }
388
389    /// All pending (not yet validated) quarantine entries.
390    pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
391        self.quarantine.pending_entries()
392    }
393
394    /// Current statistics snapshot.
395    pub fn stats(&self) -> SyncStats {
396        self.stats.lock().unwrap().clone()
397    }
398
399    /// Last seen sequence for `peer_id`.
400    pub fn peer_cursor(&self, peer_id: &str) -> u64 {
401        self.peer_cursors
402            .lock()
403            .unwrap()
404            .get(peer_id)
405            .copied()
406            .unwrap_or(0)
407    }
408}
409
410// ---------------------------------------------------------------------------
411// Helpers
412// ---------------------------------------------------------------------------
413
414fn now_unix_secs() -> i64 {
415    std::time::SystemTime::now()
416        .duration_since(std::time::UNIX_EPOCH)
417        .map(|d| d.as_secs() as i64)
418        .unwrap_or(0)
419}
420
421/// Derive a stable string identifier for an asset.
422fn asset_id_of(asset: &NetworkAsset) -> String {
423    match asset {
424        NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
425        NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
426        NetworkAsset::EvolutionEvent { event } => {
427            use sha2::{Digest, Sha256};
428            let payload = serde_json::to_vec(event).unwrap_or_default();
429            let mut hasher = Sha256::new();
430            hasher.update(payload);
431            format!("event:{}", hex::encode(hasher.finalize()))
432        }
433    }
434}
435
436// ---------------------------------------------------------------------------
437// Remote capsule auto-promotion pipeline (P2-06)
438// ---------------------------------------------------------------------------
439
440/// Default score threshold above which a remote capsule is auto-promoted.
441pub const PROMOTE_THRESHOLD: f64 = 0.70;
442
443/// Reason a capsule was held in quarantine.
444#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
445#[serde(rename_all = "snake_case")]
446pub enum QuarantineReason {
447    /// Capsule composite score was below the promotion threshold.
448    LowScore { score: f64 },
449    /// Signature verification failed (placeholder until P3-02).
450    SignatureInvalid,
451    /// The capsule's gene could not be located.
452    GeneMissing,
453}
454
455impl std::fmt::Display for QuarantineReason {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        match self {
458            QuarantineReason::LowScore { score } => {
459                write!(f, "low_score:{:.4}", score)
460            }
461            QuarantineReason::SignatureInvalid => write!(f, "signature_invalid"),
462            QuarantineReason::GeneMissing => write!(f, "gene_missing"),
463        }
464    }
465}
466
467/// Outcome of an auto-promotion decision for a single remote capsule.
468#[derive(Clone, Debug, Serialize, Deserialize)]
469#[serde(rename_all = "snake_case", tag = "disposition")]
470pub enum CapsuleDisposition {
471    /// Capsule passed the score threshold; its gene was solidified.
472    Promoted {
473        gene_id: String,
474        /// The composite score that triggered promotion.
475        score: f64,
476    },
477    /// Capsule did not pass; held in quarantine.
478    Quarantined { reason: String },
479}
480
481/// Audit log entry appended to `capsule_audit_log.jsonl`.
482#[derive(Serialize)]
483struct AuditLogEntry<'a> {
484    capsule_id: &'a str,
485    gene_id: &'a str,
486    score: f64,
487    disposition: &'a CapsuleDisposition,
488    timestamp_secs: i64,
489}
490
491/// Drives the receive → score → promote/quarantine pipeline for remote capsules.
492///
493/// # Usage
494///
495/// ```ignore
496/// let receiver = RemoteCapsuleReceiver::new("/tmp/audit.jsonl", None);
497/// let disposition = receiver.on_capsule_received(&capsule, &gene);
498/// ```
499pub struct RemoteCapsuleReceiver {
500    /// Score threshold; capsules >= threshold are promoted.
501    threshold: f64,
502    /// Path to the append-only JSONL audit log. `None` disables file writing.
503    audit_log_path: Option<std::path::PathBuf>,
504    /// In-memory audit trail (always populated regardless of file).
505    audit_trail: Mutex<Vec<(String, CapsuleDisposition)>>,
506}
507
508impl Default for RemoteCapsuleReceiver {
509    fn default() -> Self {
510        Self::new(None::<&str>, None)
511    }
512}
513
514impl RemoteCapsuleReceiver {
515    /// Create a new receiver.
516    ///
517    /// * `audit_log_path` — if `Some`, every decision is appended as a JSONL
518    ///   line to that file.
519    /// * `threshold` — override the default `PROMOTE_THRESHOLD` (0.70).
520    pub fn new(
521        audit_log_path: Option<impl AsRef<std::path::Path>>,
522        threshold: Option<f64>,
523    ) -> Self {
524        Self {
525            threshold: threshold.unwrap_or(PROMOTE_THRESHOLD),
526            audit_log_path: audit_log_path.map(|p| p.as_ref().to_path_buf()),
527            audit_trail: Mutex::new(Vec::new()),
528        }
529    }
530
531    /// Evaluate a received capsule and return the promotion decision.
532    ///
533    /// Steps:
534    /// 1. Verify signature (placeholder — always passes until P3-02).
535    /// 2. Use the capsule's own `confidence` field as the composite score.
536    /// 3. If `score >= threshold`: set `gene.state = Promoted` and return
537    ///    `CapsuleDisposition::Promoted`.
538    /// 4. Otherwise: return `CapsuleDisposition::Quarantined { reason: LowScore }`.
539    /// 5. Append an audit entry to `capsule_audit_log.jsonl`.
540    ///
541    /// The caller is responsible for persisting the promoted gene to a gene
542    /// store (e.g. `GeneStore::upsert_gene`).  The returned `CapsuleDisposition`
543    /// carries the promoted `gene_id` so the caller can act accordingly.
544    pub fn on_capsule_received(&self, capsule: &Capsule, gene: &mut Gene) -> CapsuleDisposition {
545        let score = capsule.confidence as f64;
546
547        let disposition = if score >= self.threshold {
548            gene.state = AssetState::Promoted;
549            CapsuleDisposition::Promoted {
550                gene_id: capsule.gene_id.clone(),
551                score,
552            }
553        } else {
554            let reason = QuarantineReason::LowScore { score };
555            CapsuleDisposition::Quarantined {
556                reason: reason.to_string(),
557            }
558        };
559
560        self.write_audit_entry(capsule, &disposition, score);
561        self.audit_trail
562            .lock()
563            .unwrap()
564            .push((capsule.id.clone(), disposition.clone()));
565
566        disposition
567    }
568
569    /// All in-memory audit entries accumulated so far.
570    pub fn audit_trail(&self) -> Vec<(String, CapsuleDisposition)> {
571        self.audit_trail.lock().unwrap().clone()
572    }
573
574    /// Number of audit entries recorded.
575    pub fn audit_count(&self) -> usize {
576        self.audit_trail.lock().unwrap().len()
577    }
578
579    fn write_audit_entry(&self, capsule: &Capsule, disposition: &CapsuleDisposition, score: f64) {
580        let Some(ref path) = self.audit_log_path else {
581            return;
582        };
583        let entry = AuditLogEntry {
584            capsule_id: &capsule.id,
585            gene_id: &capsule.gene_id,
586            score,
587            disposition,
588            timestamp_secs: now_unix_secs(),
589        };
590        if let Ok(mut line) = serde_json::to_string(&entry) {
591            line.push('\n');
592            use std::io::Write;
593            if let Ok(mut file) = std::fs::OpenOptions::new()
594                .create(true)
595                .append(true)
596                .open(path)
597            {
598                let _ = file.write_all(line.as_bytes());
599            }
600        }
601    }
602}
603
604// ---------------------------------------------------------------------------
605// Unit tests
606// ---------------------------------------------------------------------------
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611    use oris_evolution::{AssetState, Capsule, EnvFingerprint, Gene, Outcome};
612
613    fn make_gene(id: &str) -> NetworkAsset {
614        NetworkAsset::Gene {
615            gene: Gene {
616                id: id.to_string(),
617                signals: vec!["test.fail".into()],
618                strategy: vec!["fix test".into()],
619                validation: vec!["cargo test".into()],
620                state: AssetState::Promoted,
621                task_class_id: None,
622            },
623        }
624    }
625
626    fn make_capsule(id: &str, gene_id: &str, confidence: f32) -> Capsule {
627        Capsule {
628            id: id.to_string(),
629            gene_id: gene_id.to_string(),
630            mutation_id: "mut-1".to_string(),
631            run_id: "run-1".to_string(),
632            diff_hash: "abc123".to_string(),
633            confidence,
634            env: EnvFingerprint {
635                rustc_version: "1.80.0".to_string(),
636                cargo_lock_hash: "hash".to_string(),
637                target_triple: "aarch64-apple-darwin".to_string(),
638                os: "macos".to_string(),
639            },
640            outcome: Outcome {
641                success: true,
642                validation_profile: "default".to_string(),
643                validation_duration_ms: 100,
644                changed_files: vec![],
645                validator_hash: "vh1".to_string(),
646                lines_changed: 5,
647                replay_verified: false,
648            },
649            state: AssetState::Candidate,
650        }
651    }
652
653    fn make_plain_gene(id: &str) -> Gene {
654        Gene {
655            id: id.to_string(),
656            signals: vec!["test.fail".into()],
657            strategy: vec!["fix test".into()],
658            validation: vec!["cargo test".into()],
659            state: AssetState::Candidate,
660            task_class_id: None,
661        }
662    }
663
664    // -----------------------------------------------------------------------
665    // AC 1: two-node gene sync end-to-end
666    // -----------------------------------------------------------------------
667
668    #[test]
669    fn test_two_node_sync_end_to_end() {
670        let node_a = GossipSyncEngine::new("node-a");
671        let node_b = GossipSyncEngine::new("node-b");
672
673        // node-a publishes a gene
674        let seq = node_a.publish_local(make_gene("gene-1"));
675        assert_eq!(seq, 1);
676
677        // node-a builds a publish request and node-b receives it
678        let req = node_a.build_publish_request(0);
679        assert_eq!(req.assets.len(), 1);
680        let audit = node_b.receive_publish(&req);
681        assert_eq!(audit.applied_count, 1);
682        assert_eq!(audit.skipped_count, 0);
683
684        // gene-1 should now be in node-b's quarantine as Pending
685        let entry = node_b.quarantine.get("gene:gene-1").unwrap();
686        assert_eq!(entry.state, QuarantineState::Pending);
687        assert_eq!(entry.origin_peer, "node-a");
688    }
689
690    #[test]
691    fn test_incremental_cursor_sync() {
692        let node_a = GossipSyncEngine::new("node-a");
693        let node_b = GossipSyncEngine::new("node-b");
694
695        // publish two genes
696        node_a.publish_local(make_gene("gene-1"));
697        node_a.publish_local(make_gene("gene-2"));
698
699        // First sync — node-b has seen nothing (cursor=0)
700        let req1 = node_a.build_publish_request(0);
701        node_b.receive_publish(&req1);
702        assert_eq!(node_b.quarantine.len(), 2);
703
704        // Publish a third gene
705        node_a.publish_local(make_gene("gene-3"));
706
707        // Second sync — node-b requests from cursor=2
708        let req2 = node_a.build_publish_request(2);
709        let audit = node_b.receive_publish(&req2);
710        // Only gene-3 is new
711        assert_eq!(audit.applied_count, 1);
712        assert_eq!(node_b.quarantine.len(), 3);
713    }
714
715    // -----------------------------------------------------------------------
716    // AC 2: quarantine → validate → promote lifecycle
717    // -----------------------------------------------------------------------
718
719    #[test]
720    fn test_quarantine_admit_and_validate() {
721        let store = QuarantineStore::new();
722        let asset = make_gene("g-1");
723
724        assert!(store.admit("gene:g-1", asset, "peer-a"));
725        assert_eq!(
726            store.get("gene:g-1").unwrap().state,
727            QuarantineState::Pending
728        );
729        assert!(!store.is_selectable("gene:g-1")); // Pending → not selectable
730
731        store.validate_asset("gene:g-1");
732        assert_eq!(
733            store.get("gene:g-1").unwrap().state,
734            QuarantineState::Validated
735        );
736        assert!(store.is_selectable("gene:g-1")); // Validated → selectable
737    }
738
739    #[test]
740    fn test_quarantine_fail_asset() {
741        let store = QuarantineStore::new();
742        store.admit("gene:g-bad", make_gene("g-bad"), "peer-a");
743        store.fail_asset("gene:g-bad", "signature mismatch");
744
745        let entry = store.get("gene:g-bad").unwrap();
746        assert_eq!(entry.state, QuarantineState::Failed);
747        assert_eq!(entry.failure_reason.as_deref(), Some("signature mismatch"));
748        assert!(!store.is_selectable("gene:g-bad"));
749    }
750
751    #[test]
752    fn test_validate_and_promote_via_engine() {
753        let engine = GossipSyncEngine::new("node-b");
754        let req = PublishRequest {
755            sender_id: "node-a".into(),
756            assets: vec![make_gene("g-ok")],
757            since_cursor: None,
758            resume_token: None,
759        };
760        engine.receive_publish(&req);
761
762        let promoted = engine.validate_and_promote("gene:g-ok", |_| Ok(()));
763        assert!(promoted);
764        assert!(engine.is_asset_selectable("gene:g-ok"));
765    }
766
767    #[test]
768    fn test_validate_and_promote_failure_not_selectable() {
769        let engine = GossipSyncEngine::new("node-b");
770        let req = PublishRequest {
771            sender_id: "node-a".into(),
772            assets: vec![make_gene("g-invalid")],
773            since_cursor: None,
774            resume_token: None,
775        };
776        engine.receive_publish(&req);
777
778        let promoted = engine.validate_and_promote("gene:g-invalid", |_| Err("bad hash".into()));
779        assert!(!promoted);
780        assert!(!engine.is_asset_selectable("gene:g-invalid"));
781    }
782
783    // -----------------------------------------------------------------------
784    // AC 3: network fault — unvalidated genes must not be selectable
785    // -----------------------------------------------------------------------
786
787    #[test]
788    fn test_pending_gene_not_selectable_under_fault() {
789        let engine = GossipSyncEngine::new("node-b");
790        // Simulate receiving a gene (as if a network message arrived) but NO
791        // validation call is made (simulating a partition / message loss)
792        let req = PublishRequest {
793            sender_id: "node-a".into(),
794            assets: vec![make_gene("g-unvalidated")],
795            since_cursor: None,
796            resume_token: None,
797        };
798        engine.receive_publish(&req);
799
800        // Without explicit validation the gene must remain non-selectable
801        assert!(
802            !engine.is_asset_selectable("gene:g-unvalidated"),
803            "pending gene must not be selectable (failure-closed guarantee)"
804        );
805        assert_eq!(engine.pending_entries().len(), 1);
806    }
807
808    #[test]
809    fn test_unknown_asset_not_selectable() {
810        let engine = GossipSyncEngine::new("node-b");
811        assert!(!engine.is_asset_selectable("gene:nonexistent"));
812    }
813
814    #[test]
815    fn test_duplicate_admit_is_idempotent() {
816        let store = QuarantineStore::new();
817        assert!(store.admit("gene:g", make_gene("g"), "peer-a"));
818        store.validate_asset("gene:g");
819        // A second admit for the same id must not overwrite the Validated state
820        assert!(!store.admit("gene:g", make_gene("g"), "peer-b"));
821        assert_eq!(
822            store.get("gene:g").unwrap().state,
823            QuarantineState::Validated
824        );
825    }
826
827    #[test]
828    fn test_stats_accumulate_correctly() {
829        let engine = GossipSyncEngine::new("me");
830        let req = PublishRequest {
831            sender_id: "peer".into(),
832            assets: vec![make_gene("g1"), make_gene("g2")],
833            since_cursor: None,
834            resume_token: None,
835        };
836        engine.receive_publish(&req);
837        engine.validate_and_promote("gene:g1", |_| Ok(()));
838        engine.validate_and_promote("gene:g2", |_| Err("bad".into()));
839
840        let s = engine.stats();
841        assert_eq!(s.assets_quarantined, 2);
842        assert_eq!(s.assets_promoted, 1);
843        assert_eq!(s.assets_failed_validation, 1);
844    }
845
846    // -----------------------------------------------------------------------
847    // AC P2-06: RemoteCapsuleReceiver auto-promotion pipeline
848    // -----------------------------------------------------------------------
849
850    #[test]
851    fn test_remote_capsule_high_score_is_promoted() {
852        let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
853        let capsule = make_capsule("cap-1", "gene-1", 0.85);
854        let mut gene = make_plain_gene("gene-1");
855
856        let disposition = receiver.on_capsule_received(&capsule, &mut gene);
857
858        match &disposition {
859            CapsuleDisposition::Promoted { gene_id, score } => {
860                assert_eq!(gene_id, "gene-1");
861                assert!(*score >= PROMOTE_THRESHOLD);
862            }
863            other => panic!("expected Promoted, got {:?}", other),
864        }
865        assert_eq!(gene.state, AssetState::Promoted);
866        assert_eq!(receiver.audit_count(), 1);
867    }
868
869    #[test]
870    fn test_remote_capsule_low_score_is_quarantined() {
871        let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
872        let capsule = make_capsule("cap-2", "gene-2", 0.40);
873        let mut gene = make_plain_gene("gene-2");
874        let original_state = gene.state.clone();
875
876        let disposition = receiver.on_capsule_received(&capsule, &mut gene);
877
878        match &disposition {
879            CapsuleDisposition::Quarantined { reason } => {
880                assert!(reason.starts_with("low_score:"), "reason={}", reason);
881            }
882            other => panic!("expected Quarantined, got {:?}", other),
883        }
884        // Gene state must not be changed when quarantined.
885        assert_eq!(gene.state, original_state);
886        assert_eq!(receiver.audit_count(), 1);
887    }
888
889    #[test]
890    fn test_remote_capsule_at_threshold_is_promoted() {
891        let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
892        // Use a confidence value just at or above threshold (0.70). Because
893        // f32 → f64 casting can introduce tiny rounding errors we use a value
894        // that is safely representable: 0.75 (exactly 3/4 in binary).
895        let capsule = make_capsule("cap-3", "gene-3", 0.75_f32);
896        let mut gene = make_plain_gene("gene-3");
897
898        let disposition = receiver.on_capsule_received(&capsule, &mut gene);
899        assert!(
900            matches!(&disposition, CapsuleDisposition::Promoted { .. }),
901            "capsule at or above threshold must be promoted"
902        );
903    }
904
905    #[test]
906    fn test_remote_capsule_audit_log_written() {
907        let dir = std::env::temp_dir();
908        let log_path = dir.join(format!(
909            "capsule_audit_log_test_{}.jsonl",
910            std::time::SystemTime::now()
911                .duration_since(std::time::UNIX_EPOCH)
912                .unwrap()
913                .as_nanos()
914        ));
915
916        {
917            let receiver = RemoteCapsuleReceiver::new(Some(&log_path), None);
918            let c1 = make_capsule("cap-a", "g-a", 0.90);
919            let c2 = make_capsule("cap-b", "g-b", 0.30);
920            let mut gene_a = make_plain_gene("g-a");
921            let mut gene_b = make_plain_gene("g-b");
922            receiver.on_capsule_received(&c1, &mut gene_a);
923            receiver.on_capsule_received(&c2, &mut gene_b);
924        }
925
926        let contents = std::fs::read_to_string(&log_path).expect("audit log must exist");
927        let lines: Vec<&str> = contents.lines().collect();
928        assert_eq!(lines.len(), 2, "two decisions must produce two log lines");
929        // Each line must be valid JSON
930        for line in &lines {
931            serde_json::from_str::<serde_json::Value>(line)
932                .expect("each audit line must be valid JSON");
933        }
934        let _ = std::fs::remove_file(&log_path);
935    }
936
937    #[test]
938    fn test_remote_capsule_audit_trail_in_memory() {
939        let receiver = RemoteCapsuleReceiver::default();
940        let c1 = make_capsule("cap-x", "g-x", 0.80);
941        let c2 = make_capsule("cap-y", "g-y", 0.50);
942        let mut g1 = make_plain_gene("g-x");
943        let mut g2 = make_plain_gene("g-y");
944
945        receiver.on_capsule_received(&c1, &mut g1);
946        receiver.on_capsule_received(&c2, &mut g2);
947
948        let trail = receiver.audit_trail();
949        assert_eq!(trail.len(), 2);
950        assert_eq!(trail[0].0, "cap-x");
951        assert_eq!(trail[1].0, "cap-y");
952        assert!(matches!(&trail[0].1, CapsuleDisposition::Promoted { .. }));
953        assert!(matches!(
954            &trail[1].1,
955            CapsuleDisposition::Quarantined { .. }
956        ));
957    }
958}