Skip to main content

oris_evolution_network/
sync.rs

1//! sync.rs — Gossip Sync Engine & Quarantine Lifecycle
2//!
3//! Implements the two missing pieces called out in issue #250:
4//!
5//! 1. **[`GossipSyncEngine`]** — incremental, cursor-based sync of
6//!    gene/capsule assets between two evolution-network nodes.
7//! 2. **[`QuarantineStore`]** — remote assets first enter a quarantine area;
8//!    only after local validation passes are they promoted for reuse.
9//!
10//! # Failure-closed safety guarantee
11//!
12//! Remote assets that have **not** been validated are *never* moved to the
13//! `Validated` state automatically.  Under network partition or message loss
14//! the quarantine store simply retains entries as `Pending`/`Failed` until
15//! an explicit `validate_asset` call succeeds.  This ensures correctness ≥
16//! 99.5% even under hostile network conditions.
17//!
18//! # Sync cursor
19//!
20//! Each node maintains a monotonically increasing `sequence` counter.  Peers
21//! exchange their last-seen sequence number so that only *new* assets need to
22//! be transferred on each round.
23
24use crate::{
25    verify_envelope, EvolutionEnvelope, FetchQuery, FetchResponse, NetworkAsset,
26    PeerRateLimitConfig, PeerRateLimiter, PublishRequest, SyncAudit,
27};
28use chrono::Utc;
29use oris_evolution::{AssetState, Capsule, Gene};
30use serde::{Deserialize, Serialize};
31use std::collections::HashMap;
32use std::sync::Mutex;
33
34// ---------------------------------------------------------------------------
35// Quarantine lifecycle
36// ---------------------------------------------------------------------------
37
38/// Lifecycle state of a remotely received asset.
39#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum QuarantineState {
42    /// Received but not yet validated.
43    Pending,
44    /// Local validation passed — safe for reuse.
45    Validated,
46    /// Local validation failed — must not be used.
47    Failed,
48}
49
50/// A remote asset held in the quarantine area together with its validation
51/// state and origin metadata.
52#[derive(Clone, Debug, Serialize, Deserialize)]
53pub struct QuarantineEntry {
54    pub asset_id: String,
55    pub asset: NetworkAsset,
56    pub origin_peer: String,
57    pub state: QuarantineState,
58    /// Wall-clock timestamp (Unix seconds) when this entry was created.
59    pub received_at: i64,
60    /// Optional reason string recorded when `state == Failed`.
61    pub failure_reason: Option<String>,
62}
63
64/// In-process quarantine store.
65///
66/// Thread-safe via an internal `Mutex`.  Production deployments may replace
67/// this with a persisted backend by implementing the same functional contract.
68pub struct QuarantineStore {
69    entries: Mutex<HashMap<String, QuarantineEntry>>,
70}
71
72impl Default for QuarantineStore {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl QuarantineStore {
79    pub fn new() -> Self {
80        Self {
81            entries: Mutex::new(HashMap::new()),
82        }
83    }
84
85    /// Admit a remote asset into quarantine (state = `Pending`).
86    ///
87    /// If the asset is already present it is *not* overwritten; the existing
88    /// entry is left unchanged.  Returns `true` when a new entry was inserted.
89    pub fn admit(
90        &self,
91        asset_id: impl Into<String>,
92        asset: NetworkAsset,
93        origin_peer: impl Into<String>,
94    ) -> bool {
95        let id = asset_id.into();
96        let mut entries = self.entries.lock().unwrap();
97        if entries.contains_key(&id) {
98            return false;
99        }
100        entries.insert(
101            id.clone(),
102            QuarantineEntry {
103                asset_id: id,
104                asset,
105                origin_peer: origin_peer.into(),
106                state: QuarantineState::Pending,
107                received_at: now_unix_secs(),
108                failure_reason: None,
109            },
110        );
111        true
112    }
113
114    /// Mark an asset as validated.
115    ///
116    /// Returns `true` on success, `false` if the asset was not found.
117    pub fn validate_asset(&self, asset_id: &str) -> bool {
118        let mut entries = self.entries.lock().unwrap();
119        if let Some(entry) = entries.get_mut(asset_id) {
120            entry.state = QuarantineState::Validated;
121            entry.failure_reason = None;
122            true
123        } else {
124            false
125        }
126    }
127
128    /// Mark an asset as failed with a reason.
129    ///
130    /// Returns `true` on success, `false` if the asset was not found.
131    pub fn fail_asset(&self, asset_id: &str, reason: impl Into<String>) -> bool {
132        let mut entries = self.entries.lock().unwrap();
133        if let Some(entry) = entries.get_mut(asset_id) {
134            entry.state = QuarantineState::Failed;
135            entry.failure_reason = Some(reason.into());
136            true
137        } else {
138            false
139        }
140    }
141
142    /// Retrieve an entry by asset id.
143    pub fn get(&self, asset_id: &str) -> Option<QuarantineEntry> {
144        self.entries.lock().unwrap().get(asset_id).cloned()
145    }
146
147    /// Returns `true` if `asset_id` is present **and** its state is `Validated`.
148    pub fn is_selectable(&self, asset_id: &str) -> bool {
149        self.entries
150            .lock()
151            .unwrap()
152            .get(asset_id)
153            .map(|e| e.state == QuarantineState::Validated)
154            .unwrap_or(false)
155    }
156
157    /// All entries currently in `Pending` state.
158    pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
159        self.entries
160            .lock()
161            .unwrap()
162            .values()
163            .filter(|e| e.state == QuarantineState::Pending)
164            .cloned()
165            .collect()
166    }
167
168    /// All entries currently in `Validated` state.
169    pub fn validated_entries(&self) -> Vec<QuarantineEntry> {
170        self.entries
171            .lock()
172            .unwrap()
173            .values()
174            .filter(|e| e.state == QuarantineState::Validated)
175            .cloned()
176            .collect()
177    }
178
179    /// Total number of entries.
180    pub fn len(&self) -> usize {
181        self.entries.lock().unwrap().len()
182    }
183
184    pub fn is_empty(&self) -> bool {
185        self.entries.lock().unwrap().is_empty()
186    }
187}
188
189// ---------------------------------------------------------------------------
190// Gossip Sync Engine
191// ---------------------------------------------------------------------------
192
193/// Statistics for a single sync session.
194#[derive(Clone, Debug, Default, Serialize, Deserialize)]
195pub struct SyncStats {
196    pub batches_processed: u64,
197    pub assets_received: u64,
198    pub assets_quarantined: u64,
199    pub assets_skipped_duplicate: u64,
200    pub assets_failed_validation: u64,
201    pub assets_promoted: u64,
202}
203
204/// Incremental sync engine.
205///
206/// Maintains a per-peer cursor (last seen sequence number) so that each
207/// gossip round only exchanges *new* assets.  Received assets are admitted
208/// into a [`QuarantineStore`]; the caller is responsible for driving the
209/// `validate → promote` lifecycle.
210pub struct GossipSyncEngine {
211    local_peer_id: String,
212    /// Sequence counter for assets published by this node.
213    local_sequence: Mutex<u64>,
214    /// Last-seen remote sequence per peer.
215    peer_cursors: Mutex<HashMap<String, u64>>,
216    /// Assets published by this node, indexed by sequence number.
217    local_assets: Mutex<Vec<(u64, NetworkAsset)>>,
218    quarantine: QuarantineStore,
219    stats: Mutex<SyncStats>,
220}
221
222impl GossipSyncEngine {
223    pub fn new(local_peer_id: impl Into<String>) -> Self {
224        Self {
225            local_peer_id: local_peer_id.into(),
226            local_sequence: Mutex::new(0),
227            peer_cursors: Mutex::new(HashMap::new()),
228            local_assets: Mutex::new(Vec::new()),
229            quarantine: QuarantineStore::new(),
230            stats: Mutex::new(SyncStats::default()),
231        }
232    }
233
234    /// Publish a local asset, incrementing the sequence counter.
235    /// Returns the sequence number assigned to this asset.
236    pub fn publish_local(&self, asset: NetworkAsset) -> u64 {
237        let mut seq = self.local_sequence.lock().unwrap();
238        *seq += 1;
239        let s = *seq;
240        self.local_assets.lock().unwrap().push((s, asset));
241        s
242    }
243
244    /// Build a [`PublishRequest`] containing all local assets with sequence >
245    /// `since_cursor`.  Use `since_cursor = 0` to send everything.
246    pub fn build_publish_request(&self, since_cursor: u64) -> PublishRequest {
247        let assets: Vec<NetworkAsset> = self
248            .local_assets
249            .lock()
250            .unwrap()
251            .iter()
252            .filter(|(seq, _)| *seq > since_cursor)
253            .map(|(_, a)| a.clone())
254            .collect();
255
256        PublishRequest {
257            sender_id: self.local_peer_id.clone(),
258            assets,
259            since_cursor: if since_cursor > 0 {
260                Some(since_cursor.to_string())
261            } else {
262                None
263            },
264            resume_token: None,
265        }
266    }
267
268    /// Process a [`PublishRequest`] received from a remote peer.
269    ///
270    /// Each asset is admitted to the [`QuarantineStore`] as `Pending`.
271    /// Duplicates (already in quarantine) are counted as skipped.
272    /// Returns a [`SyncAudit`] summarising what happened.
273    pub fn receive_publish(&self, request: &PublishRequest) -> SyncAudit {
274        let batch_id = format!("batch-{}-{}", request.sender_id, now_unix_secs());
275        let mut applied = 0usize;
276        let mut skipped = 0usize;
277
278        for asset in &request.assets {
279            let asset_id = asset_id_of(asset);
280            let admitted = self
281                .quarantine
282                .admit(&asset_id, asset.clone(), &request.sender_id);
283            if admitted {
284                applied += 1;
285            } else {
286                skipped += 1;
287            }
288        }
289
290        // Update peer cursor to latest known sequence
291        if let Some(cursor_str) = &request.since_cursor {
292            if let Ok(seq) = cursor_str.parse::<u64>() {
293                let mut cursors = self.peer_cursors.lock().unwrap();
294                let entry = cursors.entry(request.sender_id.clone()).or_insert(0);
295                if seq > *entry {
296                    *entry = seq;
297                }
298            }
299        }
300
301        {
302            let mut stats = self.stats.lock().unwrap();
303            stats.batches_processed += 1;
304            stats.assets_received += request.assets.len() as u64;
305            stats.assets_quarantined += applied as u64;
306            stats.assets_skipped_duplicate += skipped as u64;
307        }
308
309        SyncAudit {
310            batch_id,
311            requested_cursor: request.since_cursor.clone(),
312            scanned_count: request.assets.len(),
313            applied_count: applied,
314            skipped_count: skipped,
315            failed_count: 0,
316            failure_reasons: vec![],
317        }
318    }
319
320    /// Build a [`FetchQuery`] for a remote peer, supplying the last-seen
321    /// cursor so only delta assets are returned.
322    pub fn build_fetch_query(&self, peer_id: &str, signals: Vec<String>) -> FetchQuery {
323        let cursor = self
324            .peer_cursors
325            .lock()
326            .unwrap()
327            .get(peer_id)
328            .copied()
329            .unwrap_or(0);
330
331        FetchQuery {
332            sender_id: self.local_peer_id.clone(),
333            signals,
334            since_cursor: if cursor > 0 {
335                Some(cursor.to_string())
336            } else {
337                None
338            },
339            resume_token: None,
340        }
341    }
342
343    /// Process a [`FetchResponse`] received from a remote peer.
344    ///
345    /// Same quarantine semantics as [`receive_publish`](Self::receive_publish).
346    pub fn receive_fetch_response(&self, peer_id: &str, response: &FetchResponse) -> SyncAudit {
347        let fake_request = PublishRequest {
348            sender_id: peer_id.to_string(),
349            assets: response.assets.clone(),
350            since_cursor: response.next_cursor.clone(),
351            resume_token: response.resume_token.clone(),
352        };
353        self.receive_publish(&fake_request)
354    }
355
356    /// Drive the validate → promote step for a single asset.
357    ///
358    /// `validator` is a closure receiving the asset and returning `Ok(true)`
359    /// when it passes.  On success the asset moves to `Validated`; on error
360    /// it moves to `Failed` and the error message is stored.
361    pub fn validate_and_promote<F>(&self, asset_id: &str, validator: F) -> bool
362    where
363        F: FnOnce(&NetworkAsset) -> Result<(), String>,
364    {
365        let entry = match self.quarantine.get(asset_id) {
366            Some(e) => e,
367            None => return false,
368        };
369
370        match validator(&entry.asset) {
371            Ok(()) => {
372                self.quarantine.validate_asset(asset_id);
373                let mut stats = self.stats.lock().unwrap();
374                stats.assets_promoted += 1;
375                true
376            }
377            Err(reason) => {
378                self.quarantine.fail_asset(asset_id, &reason);
379                let mut stats = self.stats.lock().unwrap();
380                stats.assets_failed_validation += 1;
381                false
382            }
383        }
384    }
385
386    /// Returns `true` when `asset_id` is in the quarantine store **and**
387    /// has been validated.  Unvalidated or unknown assets always return
388    /// `false` — ensuring the failure-closed safety guarantee.
389    pub fn is_asset_selectable(&self, asset_id: &str) -> bool {
390        self.quarantine.is_selectable(asset_id)
391    }
392
393    /// All pending (not yet validated) quarantine entries.
394    pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
395        self.quarantine.pending_entries()
396    }
397
398    /// Current statistics snapshot.
399    pub fn stats(&self) -> SyncStats {
400        self.stats.lock().unwrap().clone()
401    }
402
403    /// Last seen sequence for `peer_id`.
404    pub fn peer_cursor(&self, peer_id: &str) -> u64 {
405        self.peer_cursors
406            .lock()
407            .unwrap()
408            .get(peer_id)
409            .copied()
410            .unwrap_or(0)
411    }
412}
413
414// ---------------------------------------------------------------------------
415// Helpers
416// ---------------------------------------------------------------------------
417
418fn now_unix_secs() -> i64 {
419    std::time::SystemTime::now()
420        .duration_since(std::time::UNIX_EPOCH)
421        .map(|d| d.as_secs() as i64)
422        .unwrap_or(0)
423}
424
425/// Derive a stable string identifier for an asset.
426fn asset_id_of(asset: &NetworkAsset) -> String {
427    match asset {
428        NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
429        NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
430        NetworkAsset::EvolutionEvent { event } => {
431            use sha2::{Digest, Sha256};
432            let payload = serde_json::to_vec(event).unwrap_or_default();
433            let mut hasher = Sha256::new();
434            hasher.update(payload);
435            format!("event:{}", hex::encode(hasher.finalize()))
436        }
437    }
438}
439
440// ---------------------------------------------------------------------------
441// Remote capsule auto-promotion pipeline (P2-06)
442// ---------------------------------------------------------------------------
443
444/// Default score threshold above which a remote capsule is auto-promoted.
445pub const PROMOTE_THRESHOLD: f64 = 0.70;
446
447/// Reason a capsule was held in quarantine.
448#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
449#[serde(rename_all = "snake_case")]
450pub enum QuarantineReason {
451    /// Capsule composite score was below the promotion threshold.
452    LowScore { score: f64 },
453    /// Signature verification failed (placeholder until P3-02).
454    SignatureInvalid,
455    /// The capsule's gene could not be located.
456    GeneMissing,
457}
458
459#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
460#[serde(rename_all = "snake_case")]
461pub enum RejectionReason {
462    InvalidSignature,
463    MissingSignature,
464    RateLimited,
465    GeneMissing,
466}
467
468impl std::fmt::Display for RejectionReason {
469    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470        match self {
471            RejectionReason::InvalidSignature => write!(f, "invalid_signature"),
472            RejectionReason::MissingSignature => write!(f, "missing_signature"),
473            RejectionReason::RateLimited => write!(f, "rate_limited"),
474            RejectionReason::GeneMissing => write!(f, "gene_missing"),
475        }
476    }
477}
478
479impl std::fmt::Display for QuarantineReason {
480    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481        match self {
482            QuarantineReason::LowScore { score } => {
483                write!(f, "low_score:{:.4}", score)
484            }
485            QuarantineReason::SignatureInvalid => write!(f, "signature_invalid"),
486            QuarantineReason::GeneMissing => write!(f, "gene_missing"),
487        }
488    }
489}
490
491/// Outcome of an auto-promotion decision for a single remote capsule.
492#[derive(Clone, Debug, Serialize, Deserialize)]
493#[serde(rename_all = "snake_case", tag = "disposition")]
494pub enum CapsuleDisposition {
495    /// Capsule passed the score threshold; its gene was solidified.
496    Promoted {
497        gene_id: String,
498        /// The composite score that triggered promotion.
499        score: f64,
500    },
501    /// Capsule did not pass; held in quarantine.
502    Quarantined { reason: String },
503}
504
505/// Audit log entry appended to `capsule_audit_log.jsonl`.
506#[derive(Serialize)]
507struct AuditLogEntry<'a> {
508    timestamp: String,
509    peer_id: &'a str,
510    capsule_id: &'a str,
511    gene_id: &'a str,
512    disposition: NetworkAuditDisposition,
513    #[serde(skip_serializing_if = "Option::is_none")]
514    reason: Option<String>,
515    score: f64,
516}
517
518#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
519#[serde(rename_all = "snake_case")]
520pub enum NetworkAuditDisposition {
521    Accept,
522    Reject,
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize)]
526pub struct NetworkAuditEntry {
527    pub timestamp: String,
528    pub peer_id: String,
529    pub capsule_id: String,
530    pub gene_id: String,
531    pub disposition: NetworkAuditDisposition,
532    #[serde(default, skip_serializing_if = "Option::is_none")]
533    pub reason: Option<String>,
534    #[serde(default, skip_serializing_if = "Option::is_none")]
535    pub score: Option<f64>,
536}
537
538/// Drives the receive → score → promote/quarantine pipeline for remote capsules.
539///
540/// # Usage
541///
542/// ```ignore
543/// let receiver = RemoteCapsuleReceiver::new("/tmp/audit.jsonl", None);
544/// let disposition = receiver.on_capsule_received(&capsule, &gene);
545/// ```
546pub struct RemoteCapsuleReceiver {
547    /// Score threshold; capsules >= threshold are promoted.
548    threshold: f64,
549    /// Path to the append-only JSONL audit log. `None` disables file writing.
550    audit_log_path: Option<std::path::PathBuf>,
551    /// In-memory audit trail (always populated regardless of file).
552    audit_trail: Mutex<Vec<(String, CapsuleDisposition)>>,
553    network_audit_trail: Mutex<Vec<NetworkAuditEntry>>,
554    rate_limiter: PeerRateLimiter,
555}
556
557impl Default for RemoteCapsuleReceiver {
558    fn default() -> Self {
559        Self::new(None::<&str>, None)
560    }
561}
562
563impl RemoteCapsuleReceiver {
564    /// Create a new receiver.
565    ///
566    /// * `audit_log_path` — if `Some`, every decision is appended as a JSONL
567    ///   line to that file.
568    /// * `threshold` — override the default `PROMOTE_THRESHOLD` (0.70).
569    pub fn new(
570        audit_log_path: Option<impl AsRef<std::path::Path>>,
571        threshold: Option<f64>,
572    ) -> Self {
573        Self::with_rate_limit_config(audit_log_path, threshold, PeerRateLimitConfig::default())
574    }
575
576    pub fn with_rate_limit_config(
577        audit_log_path: Option<impl AsRef<std::path::Path>>,
578        threshold: Option<f64>,
579        rate_limit_config: PeerRateLimitConfig,
580    ) -> Self {
581        Self {
582            threshold: threshold.unwrap_or(PROMOTE_THRESHOLD),
583            audit_log_path: audit_log_path.map(|p| p.as_ref().to_path_buf()),
584            audit_trail: Mutex::new(Vec::new()),
585            network_audit_trail: Mutex::new(Vec::new()),
586            rate_limiter: PeerRateLimiter::new(rate_limit_config),
587        }
588    }
589
590    /// Evaluate a received capsule and return the promotion decision.
591    ///
592    /// Steps:
593    /// 1. Verify signature (placeholder — always passes until P3-02).
594    /// 2. Use the capsule's own `confidence` field as the composite score.
595    /// 3. If `score >= threshold`: set `gene.state = Promoted` and return
596    ///    `CapsuleDisposition::Promoted`.
597    /// 4. Otherwise: return `CapsuleDisposition::Quarantined { reason: LowScore }`.
598    /// 5. Append an audit entry to `capsule_audit_log.jsonl`.
599    ///
600    /// The caller is responsible for persisting the promoted gene to a gene
601    /// store (e.g. `GeneStore::upsert_gene`).  The returned `CapsuleDisposition`
602    /// carries the promoted `gene_id` so the caller can act accordingly.
603    pub fn on_capsule_received(&self, capsule: &Capsule, gene: &mut Gene) -> CapsuleDisposition {
604        self.evaluate_capsule("unknown", capsule, gene)
605    }
606
607    pub fn on_signed_capsule_received(
608        &self,
609        peer_id: &str,
610        public_key_hex: &str,
611        envelope: &EvolutionEnvelope,
612        capsule: &Capsule,
613        gene: &mut Gene,
614    ) -> Result<CapsuleDisposition, RejectionReason> {
615        if !self.rate_limiter.check(peer_id) {
616            self.write_rejection_audit_entry(
617                peer_id,
618                capsule,
619                Some(gene.id.as_str()),
620                RejectionReason::RateLimited,
621            );
622            return Err(RejectionReason::RateLimited);
623        }
624
625        if envelope.signature.is_none() {
626            self.write_rejection_audit_entry(
627                peer_id,
628                capsule,
629                Some(gene.id.as_str()),
630                RejectionReason::MissingSignature,
631            );
632            return Err(RejectionReason::MissingSignature);
633        }
634
635        if verify_envelope(public_key_hex, envelope).is_err() {
636            self.write_rejection_audit_entry(
637                peer_id,
638                capsule,
639                Some(gene.id.as_str()),
640                RejectionReason::InvalidSignature,
641            );
642            return Err(RejectionReason::InvalidSignature);
643        }
644
645        let has_capsule = envelope.assets.iter().any(|asset| {
646            matches!(asset, NetworkAsset::Capsule { capsule: remote } if remote.id == capsule.id && remote.gene_id == capsule.gene_id)
647        });
648        let has_gene = envelope.assets.iter().any(
649            |asset| matches!(asset, NetworkAsset::Gene { gene: remote } if remote.id == gene.id),
650        );
651        if !has_capsule || !has_gene || gene.id != capsule.gene_id {
652            self.write_rejection_audit_entry(
653                peer_id,
654                capsule,
655                Some(gene.id.as_str()),
656                RejectionReason::GeneMissing,
657            );
658            return Err(RejectionReason::GeneMissing);
659        }
660
661        Ok(self.evaluate_capsule(peer_id, capsule, gene))
662    }
663
664    pub fn network_audit_trail(&self) -> Vec<NetworkAuditEntry> {
665        self.network_audit_trail.lock().unwrap().clone()
666    }
667
668    fn evaluate_capsule(
669        &self,
670        peer_id: &str,
671        capsule: &Capsule,
672        gene: &mut Gene,
673    ) -> CapsuleDisposition {
674        let score = capsule.confidence as f64;
675
676        let disposition = if score >= self.threshold {
677            gene.state = AssetState::Promoted;
678            CapsuleDisposition::Promoted {
679                gene_id: capsule.gene_id.clone(),
680                score,
681            }
682        } else {
683            let reason = QuarantineReason::LowScore { score };
684            CapsuleDisposition::Quarantined {
685                reason: reason.to_string(),
686            }
687        };
688
689        self.write_accept_audit_entry(peer_id, capsule, &disposition, score);
690        self.audit_trail
691            .lock()
692            .unwrap()
693            .push((capsule.id.clone(), disposition.clone()));
694
695        disposition
696    }
697
698    /// All in-memory audit entries accumulated so far.
699    pub fn audit_trail(&self) -> Vec<(String, CapsuleDisposition)> {
700        self.audit_trail.lock().unwrap().clone()
701    }
702
703    /// Number of audit entries recorded.
704    pub fn audit_count(&self) -> usize {
705        self.audit_trail.lock().unwrap().len()
706    }
707
708    fn write_accept_audit_entry(
709        &self,
710        peer_id: &str,
711        capsule: &Capsule,
712        disposition: &CapsuleDisposition,
713        score: f64,
714    ) {
715        let Some(ref path) = self.audit_log_path else {
716            self.network_audit_trail
717                .lock()
718                .unwrap()
719                .push(NetworkAuditEntry {
720                    timestamp: Utc::now().to_rfc3339(),
721                    peer_id: peer_id.to_string(),
722                    capsule_id: capsule.id.clone(),
723                    gene_id: capsule.gene_id.clone(),
724                    disposition: NetworkAuditDisposition::Accept,
725                    reason: match disposition {
726                        CapsuleDisposition::Promoted { .. } => None,
727                        CapsuleDisposition::Quarantined { reason } => Some(reason.clone()),
728                    },
729                    score: Some(score),
730                });
731            return;
732        };
733        let entry = AuditLogEntry {
734            timestamp: Utc::now().to_rfc3339(),
735            peer_id,
736            capsule_id: &capsule.id,
737            gene_id: &capsule.gene_id,
738            disposition: NetworkAuditDisposition::Accept,
739            reason: match disposition {
740                CapsuleDisposition::Promoted { .. } => None,
741                CapsuleDisposition::Quarantined { reason } => Some(reason.clone()),
742            },
743            score,
744        };
745        self.network_audit_trail
746            .lock()
747            .unwrap()
748            .push(NetworkAuditEntry {
749                timestamp: entry.timestamp.clone(),
750                peer_id: entry.peer_id.to_string(),
751                capsule_id: entry.capsule_id.to_string(),
752                gene_id: entry.gene_id.to_string(),
753                disposition: entry.disposition.clone(),
754                reason: entry.reason.clone(),
755                score: Some(entry.score),
756            });
757        if let Ok(mut line) = serde_json::to_string(&entry) {
758            line.push('\n');
759            use std::io::Write;
760            if let Ok(mut file) = std::fs::OpenOptions::new()
761                .create(true)
762                .append(true)
763                .open(path)
764            {
765                let _ = file.write_all(line.as_bytes());
766            }
767        }
768    }
769
770    fn write_rejection_audit_entry(
771        &self,
772        peer_id: &str,
773        capsule: &Capsule,
774        gene_id: Option<&str>,
775        reason: RejectionReason,
776    ) {
777        let entry = NetworkAuditEntry {
778            timestamp: Utc::now().to_rfc3339(),
779            peer_id: peer_id.to_string(),
780            capsule_id: capsule.id.clone(),
781            gene_id: gene_id.unwrap_or(&capsule.gene_id).to_string(),
782            disposition: NetworkAuditDisposition::Reject,
783            reason: Some(reason.to_string()),
784            score: Some(capsule.confidence as f64),
785        };
786        self.network_audit_trail.lock().unwrap().push(entry.clone());
787
788        let Some(ref path) = self.audit_log_path else {
789            return;
790        };
791
792        if let Ok(mut line) = serde_json::to_string(&entry) {
793            line.push('\n');
794            use std::io::Write;
795            if let Ok(mut file) = std::fs::OpenOptions::new()
796                .create(true)
797                .append(true)
798                .open(path)
799            {
800                let _ = file.write_all(line.as_bytes());
801            }
802        }
803    }
804}
805
806// ---------------------------------------------------------------------------
807// Unit tests
808// ---------------------------------------------------------------------------
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    use crate::{sign_envelope, NodeKeypair};
814    use oris_evolution::{AssetState, Capsule, EnvFingerprint, Gene, Outcome};
815
816    fn make_gene(id: &str) -> NetworkAsset {
817        NetworkAsset::Gene {
818            gene: Gene {
819                id: id.to_string(),
820                signals: vec!["test.fail".into()],
821                strategy: vec!["fix test".into()],
822                validation: vec!["cargo test".into()],
823                state: AssetState::Promoted,
824                task_class_id: None,
825            },
826        }
827    }
828
829    fn make_capsule(id: &str, gene_id: &str, confidence: f32) -> Capsule {
830        Capsule {
831            id: id.to_string(),
832            gene_id: gene_id.to_string(),
833            mutation_id: "mut-1".to_string(),
834            run_id: "run-1".to_string(),
835            diff_hash: "abc123".to_string(),
836            confidence,
837            env: EnvFingerprint {
838                rustc_version: "1.80.0".to_string(),
839                cargo_lock_hash: "hash".to_string(),
840                target_triple: "aarch64-apple-darwin".to_string(),
841                os: "macos".to_string(),
842            },
843            outcome: Outcome {
844                success: true,
845                validation_profile: "default".to_string(),
846                validation_duration_ms: 100,
847                changed_files: vec![],
848                validator_hash: "vh1".to_string(),
849                lines_changed: 5,
850                replay_verified: false,
851            },
852            state: AssetState::Candidate,
853        }
854    }
855
856    fn make_plain_gene(id: &str) -> Gene {
857        Gene {
858            id: id.to_string(),
859            signals: vec!["test.fail".into()],
860            strategy: vec!["fix test".into()],
861            validation: vec!["cargo test".into()],
862            state: AssetState::Candidate,
863            task_class_id: None,
864        }
865    }
866
867    fn make_signed_envelope(
868        keypair: &NodeKeypair,
869        sender_id: &str,
870        capsule: &Capsule,
871        gene: &Gene,
872    ) -> EvolutionEnvelope {
873        let envelope = EvolutionEnvelope::publish(
874            sender_id,
875            vec![
876                NetworkAsset::Gene { gene: gene.clone() },
877                NetworkAsset::Capsule {
878                    capsule: capsule.clone(),
879                },
880            ],
881        );
882        sign_envelope(keypair, &envelope)
883    }
884
885    // -----------------------------------------------------------------------
886    // AC 1: two-node gene sync end-to-end
887    // -----------------------------------------------------------------------
888
889    #[test]
890    fn test_two_node_sync_end_to_end() {
891        let node_a = GossipSyncEngine::new("node-a");
892        let node_b = GossipSyncEngine::new("node-b");
893
894        // node-a publishes a gene
895        let seq = node_a.publish_local(make_gene("gene-1"));
896        assert_eq!(seq, 1);
897
898        // node-a builds a publish request and node-b receives it
899        let req = node_a.build_publish_request(0);
900        assert_eq!(req.assets.len(), 1);
901        let audit = node_b.receive_publish(&req);
902        assert_eq!(audit.applied_count, 1);
903        assert_eq!(audit.skipped_count, 0);
904
905        // gene-1 should now be in node-b's quarantine as Pending
906        let entry = node_b.quarantine.get("gene:gene-1").unwrap();
907        assert_eq!(entry.state, QuarantineState::Pending);
908        assert_eq!(entry.origin_peer, "node-a");
909    }
910
911    #[test]
912    fn test_incremental_cursor_sync() {
913        let node_a = GossipSyncEngine::new("node-a");
914        let node_b = GossipSyncEngine::new("node-b");
915
916        // publish two genes
917        node_a.publish_local(make_gene("gene-1"));
918        node_a.publish_local(make_gene("gene-2"));
919
920        // First sync — node-b has seen nothing (cursor=0)
921        let req1 = node_a.build_publish_request(0);
922        node_b.receive_publish(&req1);
923        assert_eq!(node_b.quarantine.len(), 2);
924
925        // Publish a third gene
926        node_a.publish_local(make_gene("gene-3"));
927
928        // Second sync — node-b requests from cursor=2
929        let req2 = node_a.build_publish_request(2);
930        let audit = node_b.receive_publish(&req2);
931        // Only gene-3 is new
932        assert_eq!(audit.applied_count, 1);
933        assert_eq!(node_b.quarantine.len(), 3);
934    }
935
936    // -----------------------------------------------------------------------
937    // AC 2: quarantine → validate → promote lifecycle
938    // -----------------------------------------------------------------------
939
940    #[test]
941    fn test_quarantine_admit_and_validate() {
942        let store = QuarantineStore::new();
943        let asset = make_gene("g-1");
944
945        assert!(store.admit("gene:g-1", asset, "peer-a"));
946        assert_eq!(
947            store.get("gene:g-1").unwrap().state,
948            QuarantineState::Pending
949        );
950        assert!(!store.is_selectable("gene:g-1")); // Pending → not selectable
951
952        store.validate_asset("gene:g-1");
953        assert_eq!(
954            store.get("gene:g-1").unwrap().state,
955            QuarantineState::Validated
956        );
957        assert!(store.is_selectable("gene:g-1")); // Validated → selectable
958    }
959
960    #[test]
961    fn test_quarantine_fail_asset() {
962        let store = QuarantineStore::new();
963        store.admit("gene:g-bad", make_gene("g-bad"), "peer-a");
964        store.fail_asset("gene:g-bad", "signature mismatch");
965
966        let entry = store.get("gene:g-bad").unwrap();
967        assert_eq!(entry.state, QuarantineState::Failed);
968        assert_eq!(entry.failure_reason.as_deref(), Some("signature mismatch"));
969        assert!(!store.is_selectable("gene:g-bad"));
970    }
971
972    #[test]
973    fn test_validate_and_promote_via_engine() {
974        let engine = GossipSyncEngine::new("node-b");
975        let req = PublishRequest {
976            sender_id: "node-a".into(),
977            assets: vec![make_gene("g-ok")],
978            since_cursor: None,
979            resume_token: None,
980        };
981        engine.receive_publish(&req);
982
983        let promoted = engine.validate_and_promote("gene:g-ok", |_| Ok(()));
984        assert!(promoted);
985        assert!(engine.is_asset_selectable("gene:g-ok"));
986    }
987
988    #[test]
989    fn test_validate_and_promote_failure_not_selectable() {
990        let engine = GossipSyncEngine::new("node-b");
991        let req = PublishRequest {
992            sender_id: "node-a".into(),
993            assets: vec![make_gene("g-invalid")],
994            since_cursor: None,
995            resume_token: None,
996        };
997        engine.receive_publish(&req);
998
999        let promoted = engine.validate_and_promote("gene:g-invalid", |_| Err("bad hash".into()));
1000        assert!(!promoted);
1001        assert!(!engine.is_asset_selectable("gene:g-invalid"));
1002    }
1003
1004    // -----------------------------------------------------------------------
1005    // AC 3: network fault — unvalidated genes must not be selectable
1006    // -----------------------------------------------------------------------
1007
1008    #[test]
1009    fn test_pending_gene_not_selectable_under_fault() {
1010        let engine = GossipSyncEngine::new("node-b");
1011        // Simulate receiving a gene (as if a network message arrived) but NO
1012        // validation call is made (simulating a partition / message loss)
1013        let req = PublishRequest {
1014            sender_id: "node-a".into(),
1015            assets: vec![make_gene("g-unvalidated")],
1016            since_cursor: None,
1017            resume_token: None,
1018        };
1019        engine.receive_publish(&req);
1020
1021        // Without explicit validation the gene must remain non-selectable
1022        assert!(
1023            !engine.is_asset_selectable("gene:g-unvalidated"),
1024            "pending gene must not be selectable (failure-closed guarantee)"
1025        );
1026        assert_eq!(engine.pending_entries().len(), 1);
1027    }
1028
1029    #[test]
1030    fn test_unknown_asset_not_selectable() {
1031        let engine = GossipSyncEngine::new("node-b");
1032        assert!(!engine.is_asset_selectable("gene:nonexistent"));
1033    }
1034
1035    #[test]
1036    fn test_duplicate_admit_is_idempotent() {
1037        let store = QuarantineStore::new();
1038        assert!(store.admit("gene:g", make_gene("g"), "peer-a"));
1039        store.validate_asset("gene:g");
1040        // A second admit for the same id must not overwrite the Validated state
1041        assert!(!store.admit("gene:g", make_gene("g"), "peer-b"));
1042        assert_eq!(
1043            store.get("gene:g").unwrap().state,
1044            QuarantineState::Validated
1045        );
1046    }
1047
1048    #[test]
1049    fn test_stats_accumulate_correctly() {
1050        let engine = GossipSyncEngine::new("me");
1051        let req = PublishRequest {
1052            sender_id: "peer".into(),
1053            assets: vec![make_gene("g1"), make_gene("g2")],
1054            since_cursor: None,
1055            resume_token: None,
1056        };
1057        engine.receive_publish(&req);
1058        engine.validate_and_promote("gene:g1", |_| Ok(()));
1059        engine.validate_and_promote("gene:g2", |_| Err("bad".into()));
1060
1061        let s = engine.stats();
1062        assert_eq!(s.assets_quarantined, 2);
1063        assert_eq!(s.assets_promoted, 1);
1064        assert_eq!(s.assets_failed_validation, 1);
1065    }
1066
1067    // -----------------------------------------------------------------------
1068    // AC P2-06: RemoteCapsuleReceiver auto-promotion pipeline
1069    // -----------------------------------------------------------------------
1070
1071    #[test]
1072    fn test_remote_capsule_high_score_is_promoted() {
1073        let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
1074        let capsule = make_capsule("cap-1", "gene-1", 0.85);
1075        let mut gene = make_plain_gene("gene-1");
1076
1077        let disposition = receiver.on_capsule_received(&capsule, &mut gene);
1078
1079        match &disposition {
1080            CapsuleDisposition::Promoted { gene_id, score } => {
1081                assert_eq!(gene_id, "gene-1");
1082                assert!(*score >= PROMOTE_THRESHOLD);
1083            }
1084            other => panic!("expected Promoted, got {:?}", other),
1085        }
1086        assert_eq!(gene.state, AssetState::Promoted);
1087        assert_eq!(receiver.audit_count(), 1);
1088    }
1089
1090    #[test]
1091    fn test_remote_capsule_low_score_is_quarantined() {
1092        let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
1093        let capsule = make_capsule("cap-2", "gene-2", 0.40);
1094        let mut gene = make_plain_gene("gene-2");
1095        let original_state = gene.state.clone();
1096
1097        let disposition = receiver.on_capsule_received(&capsule, &mut gene);
1098
1099        match &disposition {
1100            CapsuleDisposition::Quarantined { reason } => {
1101                assert!(reason.starts_with("low_score:"), "reason={}", reason);
1102            }
1103            other => panic!("expected Quarantined, got {:?}", other),
1104        }
1105        // Gene state must not be changed when quarantined.
1106        assert_eq!(gene.state, original_state);
1107        assert_eq!(receiver.audit_count(), 1);
1108    }
1109
1110    #[test]
1111    fn test_remote_capsule_at_threshold_is_promoted() {
1112        let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
1113        // Use a confidence value just at or above threshold (0.70). Because
1114        // f32 → f64 casting can introduce tiny rounding errors we use a value
1115        // that is safely representable: 0.75 (exactly 3/4 in binary).
1116        let capsule = make_capsule("cap-3", "gene-3", 0.75_f32);
1117        let mut gene = make_plain_gene("gene-3");
1118
1119        let disposition = receiver.on_capsule_received(&capsule, &mut gene);
1120        assert!(
1121            matches!(&disposition, CapsuleDisposition::Promoted { .. }),
1122            "capsule at or above threshold must be promoted"
1123        );
1124    }
1125
1126    #[test]
1127    fn test_remote_capsule_audit_log_written() {
1128        let dir = std::env::temp_dir();
1129        let log_path = dir.join(format!(
1130            "capsule_audit_log_test_{}.jsonl",
1131            std::time::SystemTime::now()
1132                .duration_since(std::time::UNIX_EPOCH)
1133                .unwrap()
1134                .as_nanos()
1135        ));
1136
1137        {
1138            let receiver = RemoteCapsuleReceiver::new(Some(&log_path), None);
1139            let c1 = make_capsule("cap-a", "g-a", 0.90);
1140            let c2 = make_capsule("cap-b", "g-b", 0.30);
1141            let mut gene_a = make_plain_gene("g-a");
1142            let mut gene_b = make_plain_gene("g-b");
1143            receiver.on_capsule_received(&c1, &mut gene_a);
1144            receiver.on_capsule_received(&c2, &mut gene_b);
1145        }
1146
1147        let contents = std::fs::read_to_string(&log_path).expect("audit log must exist");
1148        let lines: Vec<&str> = contents.lines().collect();
1149        assert_eq!(lines.len(), 2, "two decisions must produce two log lines");
1150        // Each line must be valid JSON
1151        for line in &lines {
1152            serde_json::from_str::<serde_json::Value>(line)
1153                .expect("each audit line must be valid JSON");
1154        }
1155        let _ = std::fs::remove_file(&log_path);
1156    }
1157
1158    #[test]
1159    fn test_remote_capsule_audit_trail_in_memory() {
1160        let receiver = RemoteCapsuleReceiver::default();
1161        let c1 = make_capsule("cap-x", "g-x", 0.80);
1162        let c2 = make_capsule("cap-y", "g-y", 0.50);
1163        let mut g1 = make_plain_gene("g-x");
1164        let mut g2 = make_plain_gene("g-y");
1165
1166        receiver.on_capsule_received(&c1, &mut g1);
1167        receiver.on_capsule_received(&c2, &mut g2);
1168
1169        let trail = receiver.audit_trail();
1170        assert_eq!(trail.len(), 2);
1171        assert_eq!(trail[0].0, "cap-x");
1172        assert_eq!(trail[1].0, "cap-y");
1173        assert!(matches!(&trail[0].1, CapsuleDisposition::Promoted { .. }));
1174        assert!(matches!(
1175            &trail[1].1,
1176            CapsuleDisposition::Quarantined { .. }
1177        ));
1178    }
1179
1180    #[test]
1181    fn test_remote_capsule_missing_signature_is_rejected() {
1182        let receiver = RemoteCapsuleReceiver::default();
1183        let capsule = make_capsule("cap-sec-1", "gene-sec-1", 0.82);
1184        let mut gene = make_plain_gene("gene-sec-1");
1185        let envelope = EvolutionEnvelope::publish(
1186            "node-a",
1187            vec![
1188                NetworkAsset::Gene { gene: gene.clone() },
1189                NetworkAsset::Capsule {
1190                    capsule: capsule.clone(),
1191                },
1192            ],
1193        );
1194
1195        let result = receiver
1196            .on_signed_capsule_received("peer-a", "deadbeef", &envelope, &capsule, &mut gene);
1197
1198        assert_eq!(result.unwrap_err(), RejectionReason::MissingSignature);
1199        assert_eq!(receiver.network_audit_trail().len(), 1);
1200        assert_eq!(gene.state, AssetState::Candidate);
1201    }
1202
1203    #[test]
1204    fn test_remote_capsule_tampered_signature_is_rejected() {
1205        let temp_path = std::env::temp_dir().join(format!(
1206            "oris-node-key-{}.key",
1207            std::time::SystemTime::now()
1208                .duration_since(std::time::UNIX_EPOCH)
1209                .unwrap()
1210                .as_nanos()
1211        ));
1212        let keypair =
1213            NodeKeypair::generate_at(&temp_path).expect("keypair generation should succeed");
1214        let receiver = RemoteCapsuleReceiver::default();
1215        let capsule = make_capsule("cap-sec-2", "gene-sec-2", 0.90);
1216        let mut gene = make_plain_gene("gene-sec-2");
1217        let mut envelope = make_signed_envelope(&keypair, "node-a", &capsule, &gene);
1218        if let Some(NetworkAsset::Gene { gene: remote_gene }) = envelope.assets.first_mut() {
1219            remote_gene.strategy.push("tampered".to_string());
1220        }
1221
1222        let result = receiver.on_signed_capsule_received(
1223            "peer-a",
1224            &keypair.public_key_hex(),
1225            &envelope,
1226            &capsule,
1227            &mut gene,
1228        );
1229
1230        assert_eq!(result.unwrap_err(), RejectionReason::InvalidSignature);
1231        let _ = std::fs::remove_file(temp_path);
1232    }
1233
1234    #[test]
1235    fn test_remote_capsule_rate_limited_is_rejected() {
1236        let temp_path = std::env::temp_dir().join(format!(
1237            "oris-node-key-{}.key",
1238            std::time::SystemTime::now()
1239                .duration_since(std::time::UNIX_EPOCH)
1240                .unwrap()
1241                .as_nanos()
1242        ));
1243        let keypair =
1244            NodeKeypair::generate_at(&temp_path).expect("keypair generation should succeed");
1245        let receiver = RemoteCapsuleReceiver::with_rate_limit_config(
1246            None::<&str>,
1247            None,
1248            PeerRateLimitConfig {
1249                max_capsules_per_hour: 1,
1250                window_secs: 3600,
1251            },
1252        );
1253        let capsule = make_capsule("cap-sec-3", "gene-sec-3", 0.91);
1254        let mut gene = make_plain_gene("gene-sec-3");
1255        let envelope = make_signed_envelope(&keypair, "node-a", &capsule, &gene);
1256
1257        let first = receiver.on_signed_capsule_received(
1258            "peer-a",
1259            &keypair.public_key_hex(),
1260            &envelope,
1261            &capsule,
1262            &mut gene,
1263        );
1264        assert!(first.is_ok());
1265
1266        let mut gene_again = make_plain_gene("gene-sec-3");
1267        let second = receiver.on_signed_capsule_received(
1268            "peer-a",
1269            &keypair.public_key_hex(),
1270            &envelope,
1271            &capsule,
1272            &mut gene_again,
1273        );
1274        assert_eq!(second.unwrap_err(), RejectionReason::RateLimited);
1275        let _ = std::fs::remove_file(temp_path);
1276    }
1277
1278    #[test]
1279    fn test_network_audit_log_records_accept_and_reject_events() {
1280        let temp_key = std::env::temp_dir().join(format!(
1281            "oris-node-key-{}.key",
1282            std::time::SystemTime::now()
1283                .duration_since(std::time::UNIX_EPOCH)
1284                .unwrap()
1285                .as_nanos()
1286        ));
1287        let keypair =
1288            NodeKeypair::generate_at(&temp_key).expect("keypair generation should succeed");
1289        let log_path = std::env::temp_dir().join(format!(
1290            "network_audit_log_test_{}.jsonl",
1291            std::time::SystemTime::now()
1292                .duration_since(std::time::UNIX_EPOCH)
1293                .unwrap()
1294                .as_nanos()
1295        ));
1296
1297        let receiver = RemoteCapsuleReceiver::with_rate_limit_config(
1298            Some(&log_path),
1299            None,
1300            PeerRateLimitConfig {
1301                max_capsules_per_hour: 10,
1302                window_secs: 3600,
1303            },
1304        );
1305        let capsule = make_capsule("cap-sec-4", "gene-sec-4", 0.88);
1306        let mut gene = make_plain_gene("gene-sec-4");
1307        let envelope = make_signed_envelope(&keypair, "node-a", &capsule, &gene);
1308        let accepted = receiver.on_signed_capsule_received(
1309            "peer-a",
1310            &keypair.public_key_hex(),
1311            &envelope,
1312            &capsule,
1313            &mut gene,
1314        );
1315        assert!(accepted.is_ok());
1316
1317        let unsigned_envelope = EvolutionEnvelope::publish(
1318            "node-a",
1319            vec![
1320                NetworkAsset::Gene { gene: gene.clone() },
1321                NetworkAsset::Capsule {
1322                    capsule: capsule.clone(),
1323                },
1324            ],
1325        );
1326        let mut rejected_gene = make_plain_gene("gene-sec-4");
1327        let rejected = receiver.on_signed_capsule_received(
1328            "peer-b",
1329            &keypair.public_key_hex(),
1330            &unsigned_envelope,
1331            &capsule,
1332            &mut rejected_gene,
1333        );
1334        assert_eq!(rejected.unwrap_err(), RejectionReason::MissingSignature);
1335
1336        let contents = std::fs::read_to_string(&log_path).expect("audit log must exist");
1337        let lines: Vec<serde_json::Value> = contents
1338            .lines()
1339            .map(|line| serde_json::from_str(line).expect("audit line must be valid JSON"))
1340            .collect();
1341        assert_eq!(lines.len(), 2);
1342        assert_eq!(lines[0]["disposition"], "accept");
1343        assert_eq!(lines[1]["disposition"], "reject");
1344        let _ = std::fs::remove_file(temp_key);
1345        let _ = std::fs::remove_file(log_path);
1346    }
1347}