Skip to main content

oris_evolution_network/
sync.rs

1//! sync.rs — Gossip Sync Engine & Quarantine Lifecycle
2//!
3//! Implements the two missing pieces called out in issue #250:
4//!
5//! 1. **[`GossipSyncEngine`]** — incremental, cursor-based sync of
6//!    gene/capsule assets between two evolution-network nodes.
7//! 2. **[`QuarantineStore`]** — remote assets first enter a quarantine area;
8//!    only after local validation passes are they promoted for reuse.
9//!
10//! # Failure-closed safety guarantee
11//!
12//! Remote assets that have **not** been validated are *never* moved to the
13//! `Validated` state automatically.  Under network partition or message loss
14//! the quarantine store simply retains entries as `Pending`/`Failed` until
15//! an explicit `validate_asset` call succeeds.  This ensures correctness ≥
16//! 99.5% even under hostile network conditions.
17//!
18//! # Sync cursor
19//!
20//! Each node maintains a monotonically increasing `sequence` counter.  Peers
21//! exchange their last-seen sequence number so that only *new* assets need to
22//! be transferred on each round.
23
24use crate::{FetchQuery, FetchResponse, NetworkAsset, PublishRequest, SyncAudit};
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::sync::Mutex;
28
29// ---------------------------------------------------------------------------
30// Quarantine lifecycle
31// ---------------------------------------------------------------------------
32
33/// Lifecycle state of a remotely received asset.
34#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum QuarantineState {
37    /// Received but not yet validated.
38    Pending,
39    /// Local validation passed — safe for reuse.
40    Validated,
41    /// Local validation failed — must not be used.
42    Failed,
43}
44
45/// A remote asset held in the quarantine area together with its validation
46/// state and origin metadata.
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct QuarantineEntry {
49    pub asset_id: String,
50    pub asset: NetworkAsset,
51    pub origin_peer: String,
52    pub state: QuarantineState,
53    /// Wall-clock timestamp (Unix seconds) when this entry was created.
54    pub received_at: i64,
55    /// Optional reason string recorded when `state == Failed`.
56    pub failure_reason: Option<String>,
57}
58
59/// In-process quarantine store.
60///
61/// Thread-safe via an internal `Mutex`.  Production deployments may replace
62/// this with a persisted backend by implementing the same functional contract.
63pub struct QuarantineStore {
64    entries: Mutex<HashMap<String, QuarantineEntry>>,
65}
66
67impl Default for QuarantineStore {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl QuarantineStore {
74    pub fn new() -> Self {
75        Self {
76            entries: Mutex::new(HashMap::new()),
77        }
78    }
79
80    /// Admit a remote asset into quarantine (state = `Pending`).
81    ///
82    /// If the asset is already present it is *not* overwritten; the existing
83    /// entry is left unchanged.  Returns `true` when a new entry was inserted.
84    pub fn admit(
85        &self,
86        asset_id: impl Into<String>,
87        asset: NetworkAsset,
88        origin_peer: impl Into<String>,
89    ) -> bool {
90        let id = asset_id.into();
91        let mut entries = self.entries.lock().unwrap();
92        if entries.contains_key(&id) {
93            return false;
94        }
95        entries.insert(
96            id.clone(),
97            QuarantineEntry {
98                asset_id: id,
99                asset,
100                origin_peer: origin_peer.into(),
101                state: QuarantineState::Pending,
102                received_at: now_unix_secs(),
103                failure_reason: None,
104            },
105        );
106        true
107    }
108
109    /// Mark an asset as validated.
110    ///
111    /// Returns `true` on success, `false` if the asset was not found.
112    pub fn validate_asset(&self, asset_id: &str) -> bool {
113        let mut entries = self.entries.lock().unwrap();
114        if let Some(entry) = entries.get_mut(asset_id) {
115            entry.state = QuarantineState::Validated;
116            entry.failure_reason = None;
117            true
118        } else {
119            false
120        }
121    }
122
123    /// Mark an asset as failed with a reason.
124    ///
125    /// Returns `true` on success, `false` if the asset was not found.
126    pub fn fail_asset(&self, asset_id: &str, reason: impl Into<String>) -> bool {
127        let mut entries = self.entries.lock().unwrap();
128        if let Some(entry) = entries.get_mut(asset_id) {
129            entry.state = QuarantineState::Failed;
130            entry.failure_reason = Some(reason.into());
131            true
132        } else {
133            false
134        }
135    }
136
137    /// Retrieve an entry by asset id.
138    pub fn get(&self, asset_id: &str) -> Option<QuarantineEntry> {
139        self.entries.lock().unwrap().get(asset_id).cloned()
140    }
141
142    /// Returns `true` if `asset_id` is present **and** its state is `Validated`.
143    pub fn is_selectable(&self, asset_id: &str) -> bool {
144        self.entries
145            .lock()
146            .unwrap()
147            .get(asset_id)
148            .map(|e| e.state == QuarantineState::Validated)
149            .unwrap_or(false)
150    }
151
152    /// All entries currently in `Pending` state.
153    pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
154        self.entries
155            .lock()
156            .unwrap()
157            .values()
158            .filter(|e| e.state == QuarantineState::Pending)
159            .cloned()
160            .collect()
161    }
162
163    /// All entries currently in `Validated` state.
164    pub fn validated_entries(&self) -> Vec<QuarantineEntry> {
165        self.entries
166            .lock()
167            .unwrap()
168            .values()
169            .filter(|e| e.state == QuarantineState::Validated)
170            .cloned()
171            .collect()
172    }
173
174    /// Total number of entries.
175    pub fn len(&self) -> usize {
176        self.entries.lock().unwrap().len()
177    }
178
179    pub fn is_empty(&self) -> bool {
180        self.entries.lock().unwrap().is_empty()
181    }
182}
183
184// ---------------------------------------------------------------------------
185// Gossip Sync Engine
186// ---------------------------------------------------------------------------
187
188/// Statistics for a single sync session.
189#[derive(Clone, Debug, Default, Serialize, Deserialize)]
190pub struct SyncStats {
191    pub batches_processed: u64,
192    pub assets_received: u64,
193    pub assets_quarantined: u64,
194    pub assets_skipped_duplicate: u64,
195    pub assets_failed_validation: u64,
196    pub assets_promoted: u64,
197}
198
199/// Incremental sync engine.
200///
201/// Maintains a per-peer cursor (last seen sequence number) so that each
202/// gossip round only exchanges *new* assets.  Received assets are admitted
203/// into a [`QuarantineStore`]; the caller is responsible for driving the
204/// `validate → promote` lifecycle.
205pub struct GossipSyncEngine {
206    local_peer_id: String,
207    /// Sequence counter for assets published by this node.
208    local_sequence: Mutex<u64>,
209    /// Last-seen remote sequence per peer.
210    peer_cursors: Mutex<HashMap<String, u64>>,
211    /// Assets published by this node, indexed by sequence number.
212    local_assets: Mutex<Vec<(u64, NetworkAsset)>>,
213    quarantine: QuarantineStore,
214    stats: Mutex<SyncStats>,
215}
216
217impl GossipSyncEngine {
218    pub fn new(local_peer_id: impl Into<String>) -> Self {
219        Self {
220            local_peer_id: local_peer_id.into(),
221            local_sequence: Mutex::new(0),
222            peer_cursors: Mutex::new(HashMap::new()),
223            local_assets: Mutex::new(Vec::new()),
224            quarantine: QuarantineStore::new(),
225            stats: Mutex::new(SyncStats::default()),
226        }
227    }
228
229    /// Publish a local asset, incrementing the sequence counter.
230    /// Returns the sequence number assigned to this asset.
231    pub fn publish_local(&self, asset: NetworkAsset) -> u64 {
232        let mut seq = self.local_sequence.lock().unwrap();
233        *seq += 1;
234        let s = *seq;
235        self.local_assets.lock().unwrap().push((s, asset));
236        s
237    }
238
239    /// Build a [`PublishRequest`] containing all local assets with sequence >
240    /// `since_cursor`.  Use `since_cursor = 0` to send everything.
241    pub fn build_publish_request(&self, since_cursor: u64) -> PublishRequest {
242        let assets: Vec<NetworkAsset> = self
243            .local_assets
244            .lock()
245            .unwrap()
246            .iter()
247            .filter(|(seq, _)| *seq > since_cursor)
248            .map(|(_, a)| a.clone())
249            .collect();
250
251        PublishRequest {
252            sender_id: self.local_peer_id.clone(),
253            assets,
254            since_cursor: if since_cursor > 0 {
255                Some(since_cursor.to_string())
256            } else {
257                None
258            },
259            resume_token: None,
260        }
261    }
262
263    /// Process a [`PublishRequest`] received from a remote peer.
264    ///
265    /// Each asset is admitted to the [`QuarantineStore`] as `Pending`.
266    /// Duplicates (already in quarantine) are counted as skipped.
267    /// Returns a [`SyncAudit`] summarising what happened.
268    pub fn receive_publish(&self, request: &PublishRequest) -> SyncAudit {
269        let batch_id = format!("batch-{}-{}", request.sender_id, now_unix_secs());
270        let mut applied = 0usize;
271        let mut skipped = 0usize;
272
273        for asset in &request.assets {
274            let asset_id = asset_id_of(asset);
275            let admitted = self
276                .quarantine
277                .admit(&asset_id, asset.clone(), &request.sender_id);
278            if admitted {
279                applied += 1;
280            } else {
281                skipped += 1;
282            }
283        }
284
285        // Update peer cursor to latest known sequence
286        if let Some(cursor_str) = &request.since_cursor {
287            if let Ok(seq) = cursor_str.parse::<u64>() {
288                let mut cursors = self.peer_cursors.lock().unwrap();
289                let entry = cursors.entry(request.sender_id.clone()).or_insert(0);
290                if seq > *entry {
291                    *entry = seq;
292                }
293            }
294        }
295
296        {
297            let mut stats = self.stats.lock().unwrap();
298            stats.batches_processed += 1;
299            stats.assets_received += request.assets.len() as u64;
300            stats.assets_quarantined += applied as u64;
301            stats.assets_skipped_duplicate += skipped as u64;
302        }
303
304        SyncAudit {
305            batch_id,
306            requested_cursor: request.since_cursor.clone(),
307            scanned_count: request.assets.len(),
308            applied_count: applied,
309            skipped_count: skipped,
310            failed_count: 0,
311            failure_reasons: vec![],
312        }
313    }
314
315    /// Build a [`FetchQuery`] for a remote peer, supplying the last-seen
316    /// cursor so only delta assets are returned.
317    pub fn build_fetch_query(&self, peer_id: &str, signals: Vec<String>) -> FetchQuery {
318        let cursor = self
319            .peer_cursors
320            .lock()
321            .unwrap()
322            .get(peer_id)
323            .copied()
324            .unwrap_or(0);
325
326        FetchQuery {
327            sender_id: self.local_peer_id.clone(),
328            signals,
329            since_cursor: if cursor > 0 {
330                Some(cursor.to_string())
331            } else {
332                None
333            },
334            resume_token: None,
335        }
336    }
337
338    /// Process a [`FetchResponse`] received from a remote peer.
339    ///
340    /// Same quarantine semantics as [`receive_publish`](Self::receive_publish).
341    pub fn receive_fetch_response(&self, peer_id: &str, response: &FetchResponse) -> SyncAudit {
342        let fake_request = PublishRequest {
343            sender_id: peer_id.to_string(),
344            assets: response.assets.clone(),
345            since_cursor: response.next_cursor.clone(),
346            resume_token: response.resume_token.clone(),
347        };
348        self.receive_publish(&fake_request)
349    }
350
351    /// Drive the validate → promote step for a single asset.
352    ///
353    /// `validator` is a closure receiving the asset and returning `Ok(true)`
354    /// when it passes.  On success the asset moves to `Validated`; on error
355    /// it moves to `Failed` and the error message is stored.
356    pub fn validate_and_promote<F>(&self, asset_id: &str, validator: F) -> bool
357    where
358        F: FnOnce(&NetworkAsset) -> Result<(), String>,
359    {
360        let entry = match self.quarantine.get(asset_id) {
361            Some(e) => e,
362            None => return false,
363        };
364
365        match validator(&entry.asset) {
366            Ok(()) => {
367                self.quarantine.validate_asset(asset_id);
368                let mut stats = self.stats.lock().unwrap();
369                stats.assets_promoted += 1;
370                true
371            }
372            Err(reason) => {
373                self.quarantine.fail_asset(asset_id, &reason);
374                let mut stats = self.stats.lock().unwrap();
375                stats.assets_failed_validation += 1;
376                false
377            }
378        }
379    }
380
381    /// Returns `true` when `asset_id` is in the quarantine store **and**
382    /// has been validated.  Unvalidated or unknown assets always return
383    /// `false` — ensuring the failure-closed safety guarantee.
384    pub fn is_asset_selectable(&self, asset_id: &str) -> bool {
385        self.quarantine.is_selectable(asset_id)
386    }
387
388    /// All pending (not yet validated) quarantine entries.
389    pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
390        self.quarantine.pending_entries()
391    }
392
393    /// Current statistics snapshot.
394    pub fn stats(&self) -> SyncStats {
395        self.stats.lock().unwrap().clone()
396    }
397
398    /// Last seen sequence for `peer_id`.
399    pub fn peer_cursor(&self, peer_id: &str) -> u64 {
400        self.peer_cursors
401            .lock()
402            .unwrap()
403            .get(peer_id)
404            .copied()
405            .unwrap_or(0)
406    }
407}
408
409// ---------------------------------------------------------------------------
410// Helpers
411// ---------------------------------------------------------------------------
412
413fn now_unix_secs() -> i64 {
414    std::time::SystemTime::now()
415        .duration_since(std::time::UNIX_EPOCH)
416        .map(|d| d.as_secs() as i64)
417        .unwrap_or(0)
418}
419
420/// Derive a stable string identifier for an asset.
421fn asset_id_of(asset: &NetworkAsset) -> String {
422    match asset {
423        NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
424        NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
425        NetworkAsset::EvolutionEvent { event } => {
426            use sha2::{Digest, Sha256};
427            let payload = serde_json::to_vec(event).unwrap_or_default();
428            let mut hasher = Sha256::new();
429            hasher.update(payload);
430            format!("event:{}", hex::encode(hasher.finalize()))
431        }
432    }
433}
434
435// ---------------------------------------------------------------------------
436// Unit tests
437// ---------------------------------------------------------------------------
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use oris_evolution::{AssetState, Gene};
443
444    fn make_gene(id: &str) -> NetworkAsset {
445        NetworkAsset::Gene {
446            gene: Gene {
447                id: id.to_string(),
448                signals: vec!["test.fail".into()],
449                strategy: vec!["fix test".into()],
450                validation: vec!["cargo test".into()],
451                state: AssetState::Promoted,
452                task_class_id: None,
453            },
454        }
455    }
456
457    // -----------------------------------------------------------------------
458    // AC 1: two-node gene sync end-to-end
459    // -----------------------------------------------------------------------
460
461    #[test]
462    fn test_two_node_sync_end_to_end() {
463        let node_a = GossipSyncEngine::new("node-a");
464        let node_b = GossipSyncEngine::new("node-b");
465
466        // node-a publishes a gene
467        let seq = node_a.publish_local(make_gene("gene-1"));
468        assert_eq!(seq, 1);
469
470        // node-a builds a publish request and node-b receives it
471        let req = node_a.build_publish_request(0);
472        assert_eq!(req.assets.len(), 1);
473        let audit = node_b.receive_publish(&req);
474        assert_eq!(audit.applied_count, 1);
475        assert_eq!(audit.skipped_count, 0);
476
477        // gene-1 should now be in node-b's quarantine as Pending
478        let entry = node_b.quarantine.get("gene:gene-1").unwrap();
479        assert_eq!(entry.state, QuarantineState::Pending);
480        assert_eq!(entry.origin_peer, "node-a");
481    }
482
483    #[test]
484    fn test_incremental_cursor_sync() {
485        let node_a = GossipSyncEngine::new("node-a");
486        let node_b = GossipSyncEngine::new("node-b");
487
488        // publish two genes
489        node_a.publish_local(make_gene("gene-1"));
490        node_a.publish_local(make_gene("gene-2"));
491
492        // First sync — node-b has seen nothing (cursor=0)
493        let req1 = node_a.build_publish_request(0);
494        node_b.receive_publish(&req1);
495        assert_eq!(node_b.quarantine.len(), 2);
496
497        // Publish a third gene
498        node_a.publish_local(make_gene("gene-3"));
499
500        // Second sync — node-b requests from cursor=2
501        let req2 = node_a.build_publish_request(2);
502        let audit = node_b.receive_publish(&req2);
503        // Only gene-3 is new
504        assert_eq!(audit.applied_count, 1);
505        assert_eq!(node_b.quarantine.len(), 3);
506    }
507
508    // -----------------------------------------------------------------------
509    // AC 2: quarantine → validate → promote lifecycle
510    // -----------------------------------------------------------------------
511
512    #[test]
513    fn test_quarantine_admit_and_validate() {
514        let store = QuarantineStore::new();
515        let asset = make_gene("g-1");
516
517        assert!(store.admit("gene:g-1", asset, "peer-a"));
518        assert_eq!(
519            store.get("gene:g-1").unwrap().state,
520            QuarantineState::Pending
521        );
522        assert!(!store.is_selectable("gene:g-1")); // Pending → not selectable
523
524        store.validate_asset("gene:g-1");
525        assert_eq!(
526            store.get("gene:g-1").unwrap().state,
527            QuarantineState::Validated
528        );
529        assert!(store.is_selectable("gene:g-1")); // Validated → selectable
530    }
531
532    #[test]
533    fn test_quarantine_fail_asset() {
534        let store = QuarantineStore::new();
535        store.admit("gene:g-bad", make_gene("g-bad"), "peer-a");
536        store.fail_asset("gene:g-bad", "signature mismatch");
537
538        let entry = store.get("gene:g-bad").unwrap();
539        assert_eq!(entry.state, QuarantineState::Failed);
540        assert_eq!(entry.failure_reason.as_deref(), Some("signature mismatch"));
541        assert!(!store.is_selectable("gene:g-bad"));
542    }
543
544    #[test]
545    fn test_validate_and_promote_via_engine() {
546        let engine = GossipSyncEngine::new("node-b");
547        let req = PublishRequest {
548            sender_id: "node-a".into(),
549            assets: vec![make_gene("g-ok")],
550            since_cursor: None,
551            resume_token: None,
552        };
553        engine.receive_publish(&req);
554
555        let promoted = engine.validate_and_promote("gene:g-ok", |_| Ok(()));
556        assert!(promoted);
557        assert!(engine.is_asset_selectable("gene:g-ok"));
558    }
559
560    #[test]
561    fn test_validate_and_promote_failure_not_selectable() {
562        let engine = GossipSyncEngine::new("node-b");
563        let req = PublishRequest {
564            sender_id: "node-a".into(),
565            assets: vec![make_gene("g-invalid")],
566            since_cursor: None,
567            resume_token: None,
568        };
569        engine.receive_publish(&req);
570
571        let promoted = engine.validate_and_promote("gene:g-invalid", |_| Err("bad hash".into()));
572        assert!(!promoted);
573        assert!(!engine.is_asset_selectable("gene:g-invalid"));
574    }
575
576    // -----------------------------------------------------------------------
577    // AC 3: network fault — unvalidated genes must not be selectable
578    // -----------------------------------------------------------------------
579
580    #[test]
581    fn test_pending_gene_not_selectable_under_fault() {
582        let engine = GossipSyncEngine::new("node-b");
583        // Simulate receiving a gene (as if a network message arrived) but NO
584        // validation call is made (simulating a partition / message loss)
585        let req = PublishRequest {
586            sender_id: "node-a".into(),
587            assets: vec![make_gene("g-unvalidated")],
588            since_cursor: None,
589            resume_token: None,
590        };
591        engine.receive_publish(&req);
592
593        // Without explicit validation the gene must remain non-selectable
594        assert!(
595            !engine.is_asset_selectable("gene:g-unvalidated"),
596            "pending gene must not be selectable (failure-closed guarantee)"
597        );
598        assert_eq!(engine.pending_entries().len(), 1);
599    }
600
601    #[test]
602    fn test_unknown_asset_not_selectable() {
603        let engine = GossipSyncEngine::new("node-b");
604        assert!(!engine.is_asset_selectable("gene:nonexistent"));
605    }
606
607    #[test]
608    fn test_duplicate_admit_is_idempotent() {
609        let store = QuarantineStore::new();
610        assert!(store.admit("gene:g", make_gene("g"), "peer-a"));
611        store.validate_asset("gene:g");
612        // A second admit for the same id must not overwrite the Validated state
613        assert!(!store.admit("gene:g", make_gene("g"), "peer-b"));
614        assert_eq!(
615            store.get("gene:g").unwrap().state,
616            QuarantineState::Validated
617        );
618    }
619
620    #[test]
621    fn test_stats_accumulate_correctly() {
622        let engine = GossipSyncEngine::new("me");
623        let req = PublishRequest {
624            sender_id: "peer".into(),
625            assets: vec![make_gene("g1"), make_gene("g2")],
626            since_cursor: None,
627            resume_token: None,
628        };
629        engine.receive_publish(&req);
630        engine.validate_and_promote("gene:g1", |_| Ok(()));
631        engine.validate_and_promote("gene:g2", |_| Err("bad".into()));
632
633        let s = engine.stats();
634        assert_eq!(s.assets_quarantined, 2);
635        assert_eq!(s.assets_promoted, 1);
636        assert_eq!(s.assets_failed_validation, 1);
637    }
638}