1use crate::{FetchQuery, FetchResponse, NetworkAsset, PublishRequest, SyncAudit};
25use oris_evolution::{AssetState, Capsule, Gene};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Mutex;
29
30#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "snake_case")]
37pub enum QuarantineState {
38 Pending,
40 Validated,
42 Failed,
44}
45
46#[derive(Clone, Debug, Serialize, Deserialize)]
49pub struct QuarantineEntry {
50 pub asset_id: String,
51 pub asset: NetworkAsset,
52 pub origin_peer: String,
53 pub state: QuarantineState,
54 pub received_at: i64,
56 pub failure_reason: Option<String>,
58}
59
60pub struct QuarantineStore {
65 entries: Mutex<HashMap<String, QuarantineEntry>>,
66}
67
68impl Default for QuarantineStore {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl QuarantineStore {
75 pub fn new() -> Self {
76 Self {
77 entries: Mutex::new(HashMap::new()),
78 }
79 }
80
81 pub fn admit(
86 &self,
87 asset_id: impl Into<String>,
88 asset: NetworkAsset,
89 origin_peer: impl Into<String>,
90 ) -> bool {
91 let id = asset_id.into();
92 let mut entries = self.entries.lock().unwrap();
93 if entries.contains_key(&id) {
94 return false;
95 }
96 entries.insert(
97 id.clone(),
98 QuarantineEntry {
99 asset_id: id,
100 asset,
101 origin_peer: origin_peer.into(),
102 state: QuarantineState::Pending,
103 received_at: now_unix_secs(),
104 failure_reason: None,
105 },
106 );
107 true
108 }
109
110 pub fn validate_asset(&self, asset_id: &str) -> bool {
114 let mut entries = self.entries.lock().unwrap();
115 if let Some(entry) = entries.get_mut(asset_id) {
116 entry.state = QuarantineState::Validated;
117 entry.failure_reason = None;
118 true
119 } else {
120 false
121 }
122 }
123
124 pub fn fail_asset(&self, asset_id: &str, reason: impl Into<String>) -> bool {
128 let mut entries = self.entries.lock().unwrap();
129 if let Some(entry) = entries.get_mut(asset_id) {
130 entry.state = QuarantineState::Failed;
131 entry.failure_reason = Some(reason.into());
132 true
133 } else {
134 false
135 }
136 }
137
138 pub fn get(&self, asset_id: &str) -> Option<QuarantineEntry> {
140 self.entries.lock().unwrap().get(asset_id).cloned()
141 }
142
143 pub fn is_selectable(&self, asset_id: &str) -> bool {
145 self.entries
146 .lock()
147 .unwrap()
148 .get(asset_id)
149 .map(|e| e.state == QuarantineState::Validated)
150 .unwrap_or(false)
151 }
152
153 pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
155 self.entries
156 .lock()
157 .unwrap()
158 .values()
159 .filter(|e| e.state == QuarantineState::Pending)
160 .cloned()
161 .collect()
162 }
163
164 pub fn validated_entries(&self) -> Vec<QuarantineEntry> {
166 self.entries
167 .lock()
168 .unwrap()
169 .values()
170 .filter(|e| e.state == QuarantineState::Validated)
171 .cloned()
172 .collect()
173 }
174
175 pub fn len(&self) -> usize {
177 self.entries.lock().unwrap().len()
178 }
179
180 pub fn is_empty(&self) -> bool {
181 self.entries.lock().unwrap().is_empty()
182 }
183}
184
185#[derive(Clone, Debug, Default, Serialize, Deserialize)]
191pub struct SyncStats {
192 pub batches_processed: u64,
193 pub assets_received: u64,
194 pub assets_quarantined: u64,
195 pub assets_skipped_duplicate: u64,
196 pub assets_failed_validation: u64,
197 pub assets_promoted: u64,
198}
199
200pub struct GossipSyncEngine {
207 local_peer_id: String,
208 local_sequence: Mutex<u64>,
210 peer_cursors: Mutex<HashMap<String, u64>>,
212 local_assets: Mutex<Vec<(u64, NetworkAsset)>>,
214 quarantine: QuarantineStore,
215 stats: Mutex<SyncStats>,
216}
217
218impl GossipSyncEngine {
219 pub fn new(local_peer_id: impl Into<String>) -> Self {
220 Self {
221 local_peer_id: local_peer_id.into(),
222 local_sequence: Mutex::new(0),
223 peer_cursors: Mutex::new(HashMap::new()),
224 local_assets: Mutex::new(Vec::new()),
225 quarantine: QuarantineStore::new(),
226 stats: Mutex::new(SyncStats::default()),
227 }
228 }
229
230 pub fn publish_local(&self, asset: NetworkAsset) -> u64 {
233 let mut seq = self.local_sequence.lock().unwrap();
234 *seq += 1;
235 let s = *seq;
236 self.local_assets.lock().unwrap().push((s, asset));
237 s
238 }
239
240 pub fn build_publish_request(&self, since_cursor: u64) -> PublishRequest {
243 let assets: Vec<NetworkAsset> = self
244 .local_assets
245 .lock()
246 .unwrap()
247 .iter()
248 .filter(|(seq, _)| *seq > since_cursor)
249 .map(|(_, a)| a.clone())
250 .collect();
251
252 PublishRequest {
253 sender_id: self.local_peer_id.clone(),
254 assets,
255 since_cursor: if since_cursor > 0 {
256 Some(since_cursor.to_string())
257 } else {
258 None
259 },
260 resume_token: None,
261 }
262 }
263
264 pub fn receive_publish(&self, request: &PublishRequest) -> SyncAudit {
270 let batch_id = format!("batch-{}-{}", request.sender_id, now_unix_secs());
271 let mut applied = 0usize;
272 let mut skipped = 0usize;
273
274 for asset in &request.assets {
275 let asset_id = asset_id_of(asset);
276 let admitted = self
277 .quarantine
278 .admit(&asset_id, asset.clone(), &request.sender_id);
279 if admitted {
280 applied += 1;
281 } else {
282 skipped += 1;
283 }
284 }
285
286 if let Some(cursor_str) = &request.since_cursor {
288 if let Ok(seq) = cursor_str.parse::<u64>() {
289 let mut cursors = self.peer_cursors.lock().unwrap();
290 let entry = cursors.entry(request.sender_id.clone()).or_insert(0);
291 if seq > *entry {
292 *entry = seq;
293 }
294 }
295 }
296
297 {
298 let mut stats = self.stats.lock().unwrap();
299 stats.batches_processed += 1;
300 stats.assets_received += request.assets.len() as u64;
301 stats.assets_quarantined += applied as u64;
302 stats.assets_skipped_duplicate += skipped as u64;
303 }
304
305 SyncAudit {
306 batch_id,
307 requested_cursor: request.since_cursor.clone(),
308 scanned_count: request.assets.len(),
309 applied_count: applied,
310 skipped_count: skipped,
311 failed_count: 0,
312 failure_reasons: vec![],
313 }
314 }
315
316 pub fn build_fetch_query(&self, peer_id: &str, signals: Vec<String>) -> FetchQuery {
319 let cursor = self
320 .peer_cursors
321 .lock()
322 .unwrap()
323 .get(peer_id)
324 .copied()
325 .unwrap_or(0);
326
327 FetchQuery {
328 sender_id: self.local_peer_id.clone(),
329 signals,
330 since_cursor: if cursor > 0 {
331 Some(cursor.to_string())
332 } else {
333 None
334 },
335 resume_token: None,
336 }
337 }
338
339 pub fn receive_fetch_response(&self, peer_id: &str, response: &FetchResponse) -> SyncAudit {
343 let fake_request = PublishRequest {
344 sender_id: peer_id.to_string(),
345 assets: response.assets.clone(),
346 since_cursor: response.next_cursor.clone(),
347 resume_token: response.resume_token.clone(),
348 };
349 self.receive_publish(&fake_request)
350 }
351
352 pub fn validate_and_promote<F>(&self, asset_id: &str, validator: F) -> bool
358 where
359 F: FnOnce(&NetworkAsset) -> Result<(), String>,
360 {
361 let entry = match self.quarantine.get(asset_id) {
362 Some(e) => e,
363 None => return false,
364 };
365
366 match validator(&entry.asset) {
367 Ok(()) => {
368 self.quarantine.validate_asset(asset_id);
369 let mut stats = self.stats.lock().unwrap();
370 stats.assets_promoted += 1;
371 true
372 }
373 Err(reason) => {
374 self.quarantine.fail_asset(asset_id, &reason);
375 let mut stats = self.stats.lock().unwrap();
376 stats.assets_failed_validation += 1;
377 false
378 }
379 }
380 }
381
382 pub fn is_asset_selectable(&self, asset_id: &str) -> bool {
386 self.quarantine.is_selectable(asset_id)
387 }
388
389 pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
391 self.quarantine.pending_entries()
392 }
393
394 pub fn stats(&self) -> SyncStats {
396 self.stats.lock().unwrap().clone()
397 }
398
399 pub fn peer_cursor(&self, peer_id: &str) -> u64 {
401 self.peer_cursors
402 .lock()
403 .unwrap()
404 .get(peer_id)
405 .copied()
406 .unwrap_or(0)
407 }
408}
409
410fn now_unix_secs() -> i64 {
415 std::time::SystemTime::now()
416 .duration_since(std::time::UNIX_EPOCH)
417 .map(|d| d.as_secs() as i64)
418 .unwrap_or(0)
419}
420
421fn asset_id_of(asset: &NetworkAsset) -> String {
423 match asset {
424 NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
425 NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
426 NetworkAsset::EvolutionEvent { event } => {
427 use sha2::{Digest, Sha256};
428 let payload = serde_json::to_vec(event).unwrap_or_default();
429 let mut hasher = Sha256::new();
430 hasher.update(payload);
431 format!("event:{}", hex::encode(hasher.finalize()))
432 }
433 }
434}
435
436pub const PROMOTE_THRESHOLD: f64 = 0.70;
442
443#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
445#[serde(rename_all = "snake_case")]
446pub enum QuarantineReason {
447 LowScore { score: f64 },
449 SignatureInvalid,
451 GeneMissing,
453}
454
455impl std::fmt::Display for QuarantineReason {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 match self {
458 QuarantineReason::LowScore { score } => {
459 write!(f, "low_score:{:.4}", score)
460 }
461 QuarantineReason::SignatureInvalid => write!(f, "signature_invalid"),
462 QuarantineReason::GeneMissing => write!(f, "gene_missing"),
463 }
464 }
465}
466
467#[derive(Clone, Debug, Serialize, Deserialize)]
469#[serde(rename_all = "snake_case", tag = "disposition")]
470pub enum CapsuleDisposition {
471 Promoted {
473 gene_id: String,
474 score: f64,
476 },
477 Quarantined { reason: String },
479}
480
481#[derive(Serialize)]
483struct AuditLogEntry<'a> {
484 capsule_id: &'a str,
485 gene_id: &'a str,
486 score: f64,
487 disposition: &'a CapsuleDisposition,
488 timestamp_secs: i64,
489}
490
491pub struct RemoteCapsuleReceiver {
500 threshold: f64,
502 audit_log_path: Option<std::path::PathBuf>,
504 audit_trail: Mutex<Vec<(String, CapsuleDisposition)>>,
506}
507
508impl Default for RemoteCapsuleReceiver {
509 fn default() -> Self {
510 Self::new(None::<&str>, None)
511 }
512}
513
514impl RemoteCapsuleReceiver {
515 pub fn new(
521 audit_log_path: Option<impl AsRef<std::path::Path>>,
522 threshold: Option<f64>,
523 ) -> Self {
524 Self {
525 threshold: threshold.unwrap_or(PROMOTE_THRESHOLD),
526 audit_log_path: audit_log_path.map(|p| p.as_ref().to_path_buf()),
527 audit_trail: Mutex::new(Vec::new()),
528 }
529 }
530
531 pub fn on_capsule_received(&self, capsule: &Capsule, gene: &mut Gene) -> CapsuleDisposition {
545 let score = capsule.confidence as f64;
546
547 let disposition = if score >= self.threshold {
548 gene.state = AssetState::Promoted;
549 CapsuleDisposition::Promoted {
550 gene_id: capsule.gene_id.clone(),
551 score,
552 }
553 } else {
554 let reason = QuarantineReason::LowScore { score };
555 CapsuleDisposition::Quarantined {
556 reason: reason.to_string(),
557 }
558 };
559
560 self.write_audit_entry(capsule, &disposition, score);
561 self.audit_trail
562 .lock()
563 .unwrap()
564 .push((capsule.id.clone(), disposition.clone()));
565
566 disposition
567 }
568
569 pub fn audit_trail(&self) -> Vec<(String, CapsuleDisposition)> {
571 self.audit_trail.lock().unwrap().clone()
572 }
573
574 pub fn audit_count(&self) -> usize {
576 self.audit_trail.lock().unwrap().len()
577 }
578
579 fn write_audit_entry(&self, capsule: &Capsule, disposition: &CapsuleDisposition, score: f64) {
580 let Some(ref path) = self.audit_log_path else {
581 return;
582 };
583 let entry = AuditLogEntry {
584 capsule_id: &capsule.id,
585 gene_id: &capsule.gene_id,
586 score,
587 disposition,
588 timestamp_secs: now_unix_secs(),
589 };
590 if let Ok(mut line) = serde_json::to_string(&entry) {
591 line.push('\n');
592 use std::io::Write;
593 if let Ok(mut file) = std::fs::OpenOptions::new()
594 .create(true)
595 .append(true)
596 .open(path)
597 {
598 let _ = file.write_all(line.as_bytes());
599 }
600 }
601 }
602}
603
604#[cfg(test)]
609mod tests {
610 use super::*;
611 use oris_evolution::{AssetState, Capsule, EnvFingerprint, Gene, Outcome};
612
613 fn make_gene(id: &str) -> NetworkAsset {
614 NetworkAsset::Gene {
615 gene: Gene {
616 id: id.to_string(),
617 signals: vec!["test.fail".into()],
618 strategy: vec!["fix test".into()],
619 validation: vec!["cargo test".into()],
620 state: AssetState::Promoted,
621 task_class_id: None,
622 },
623 }
624 }
625
626 fn make_capsule(id: &str, gene_id: &str, confidence: f32) -> Capsule {
627 Capsule {
628 id: id.to_string(),
629 gene_id: gene_id.to_string(),
630 mutation_id: "mut-1".to_string(),
631 run_id: "run-1".to_string(),
632 diff_hash: "abc123".to_string(),
633 confidence,
634 env: EnvFingerprint {
635 rustc_version: "1.80.0".to_string(),
636 cargo_lock_hash: "hash".to_string(),
637 target_triple: "aarch64-apple-darwin".to_string(),
638 os: "macos".to_string(),
639 },
640 outcome: Outcome {
641 success: true,
642 validation_profile: "default".to_string(),
643 validation_duration_ms: 100,
644 changed_files: vec![],
645 validator_hash: "vh1".to_string(),
646 lines_changed: 5,
647 replay_verified: false,
648 },
649 state: AssetState::Candidate,
650 }
651 }
652
653 fn make_plain_gene(id: &str) -> Gene {
654 Gene {
655 id: id.to_string(),
656 signals: vec!["test.fail".into()],
657 strategy: vec!["fix test".into()],
658 validation: vec!["cargo test".into()],
659 state: AssetState::Candidate,
660 task_class_id: None,
661 }
662 }
663
664 #[test]
669 fn test_two_node_sync_end_to_end() {
670 let node_a = GossipSyncEngine::new("node-a");
671 let node_b = GossipSyncEngine::new("node-b");
672
673 let seq = node_a.publish_local(make_gene("gene-1"));
675 assert_eq!(seq, 1);
676
677 let req = node_a.build_publish_request(0);
679 assert_eq!(req.assets.len(), 1);
680 let audit = node_b.receive_publish(&req);
681 assert_eq!(audit.applied_count, 1);
682 assert_eq!(audit.skipped_count, 0);
683
684 let entry = node_b.quarantine.get("gene:gene-1").unwrap();
686 assert_eq!(entry.state, QuarantineState::Pending);
687 assert_eq!(entry.origin_peer, "node-a");
688 }
689
690 #[test]
691 fn test_incremental_cursor_sync() {
692 let node_a = GossipSyncEngine::new("node-a");
693 let node_b = GossipSyncEngine::new("node-b");
694
695 node_a.publish_local(make_gene("gene-1"));
697 node_a.publish_local(make_gene("gene-2"));
698
699 let req1 = node_a.build_publish_request(0);
701 node_b.receive_publish(&req1);
702 assert_eq!(node_b.quarantine.len(), 2);
703
704 node_a.publish_local(make_gene("gene-3"));
706
707 let req2 = node_a.build_publish_request(2);
709 let audit = node_b.receive_publish(&req2);
710 assert_eq!(audit.applied_count, 1);
712 assert_eq!(node_b.quarantine.len(), 3);
713 }
714
715 #[test]
720 fn test_quarantine_admit_and_validate() {
721 let store = QuarantineStore::new();
722 let asset = make_gene("g-1");
723
724 assert!(store.admit("gene:g-1", asset, "peer-a"));
725 assert_eq!(
726 store.get("gene:g-1").unwrap().state,
727 QuarantineState::Pending
728 );
729 assert!(!store.is_selectable("gene:g-1")); store.validate_asset("gene:g-1");
732 assert_eq!(
733 store.get("gene:g-1").unwrap().state,
734 QuarantineState::Validated
735 );
736 assert!(store.is_selectable("gene:g-1")); }
738
739 #[test]
740 fn test_quarantine_fail_asset() {
741 let store = QuarantineStore::new();
742 store.admit("gene:g-bad", make_gene("g-bad"), "peer-a");
743 store.fail_asset("gene:g-bad", "signature mismatch");
744
745 let entry = store.get("gene:g-bad").unwrap();
746 assert_eq!(entry.state, QuarantineState::Failed);
747 assert_eq!(entry.failure_reason.as_deref(), Some("signature mismatch"));
748 assert!(!store.is_selectable("gene:g-bad"));
749 }
750
751 #[test]
752 fn test_validate_and_promote_via_engine() {
753 let engine = GossipSyncEngine::new("node-b");
754 let req = PublishRequest {
755 sender_id: "node-a".into(),
756 assets: vec![make_gene("g-ok")],
757 since_cursor: None,
758 resume_token: None,
759 };
760 engine.receive_publish(&req);
761
762 let promoted = engine.validate_and_promote("gene:g-ok", |_| Ok(()));
763 assert!(promoted);
764 assert!(engine.is_asset_selectable("gene:g-ok"));
765 }
766
767 #[test]
768 fn test_validate_and_promote_failure_not_selectable() {
769 let engine = GossipSyncEngine::new("node-b");
770 let req = PublishRequest {
771 sender_id: "node-a".into(),
772 assets: vec![make_gene("g-invalid")],
773 since_cursor: None,
774 resume_token: None,
775 };
776 engine.receive_publish(&req);
777
778 let promoted = engine.validate_and_promote("gene:g-invalid", |_| Err("bad hash".into()));
779 assert!(!promoted);
780 assert!(!engine.is_asset_selectable("gene:g-invalid"));
781 }
782
783 #[test]
788 fn test_pending_gene_not_selectable_under_fault() {
789 let engine = GossipSyncEngine::new("node-b");
790 let req = PublishRequest {
793 sender_id: "node-a".into(),
794 assets: vec![make_gene("g-unvalidated")],
795 since_cursor: None,
796 resume_token: None,
797 };
798 engine.receive_publish(&req);
799
800 assert!(
802 !engine.is_asset_selectable("gene:g-unvalidated"),
803 "pending gene must not be selectable (failure-closed guarantee)"
804 );
805 assert_eq!(engine.pending_entries().len(), 1);
806 }
807
808 #[test]
809 fn test_unknown_asset_not_selectable() {
810 let engine = GossipSyncEngine::new("node-b");
811 assert!(!engine.is_asset_selectable("gene:nonexistent"));
812 }
813
814 #[test]
815 fn test_duplicate_admit_is_idempotent() {
816 let store = QuarantineStore::new();
817 assert!(store.admit("gene:g", make_gene("g"), "peer-a"));
818 store.validate_asset("gene:g");
819 assert!(!store.admit("gene:g", make_gene("g"), "peer-b"));
821 assert_eq!(
822 store.get("gene:g").unwrap().state,
823 QuarantineState::Validated
824 );
825 }
826
827 #[test]
828 fn test_stats_accumulate_correctly() {
829 let engine = GossipSyncEngine::new("me");
830 let req = PublishRequest {
831 sender_id: "peer".into(),
832 assets: vec![make_gene("g1"), make_gene("g2")],
833 since_cursor: None,
834 resume_token: None,
835 };
836 engine.receive_publish(&req);
837 engine.validate_and_promote("gene:g1", |_| Ok(()));
838 engine.validate_and_promote("gene:g2", |_| Err("bad".into()));
839
840 let s = engine.stats();
841 assert_eq!(s.assets_quarantined, 2);
842 assert_eq!(s.assets_promoted, 1);
843 assert_eq!(s.assets_failed_validation, 1);
844 }
845
846 #[test]
851 fn test_remote_capsule_high_score_is_promoted() {
852 let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
853 let capsule = make_capsule("cap-1", "gene-1", 0.85);
854 let mut gene = make_plain_gene("gene-1");
855
856 let disposition = receiver.on_capsule_received(&capsule, &mut gene);
857
858 match &disposition {
859 CapsuleDisposition::Promoted { gene_id, score } => {
860 assert_eq!(gene_id, "gene-1");
861 assert!(*score >= PROMOTE_THRESHOLD);
862 }
863 other => panic!("expected Promoted, got {:?}", other),
864 }
865 assert_eq!(gene.state, AssetState::Promoted);
866 assert_eq!(receiver.audit_count(), 1);
867 }
868
869 #[test]
870 fn test_remote_capsule_low_score_is_quarantined() {
871 let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
872 let capsule = make_capsule("cap-2", "gene-2", 0.40);
873 let mut gene = make_plain_gene("gene-2");
874 let original_state = gene.state.clone();
875
876 let disposition = receiver.on_capsule_received(&capsule, &mut gene);
877
878 match &disposition {
879 CapsuleDisposition::Quarantined { reason } => {
880 assert!(reason.starts_with("low_score:"), "reason={}", reason);
881 }
882 other => panic!("expected Quarantined, got {:?}", other),
883 }
884 assert_eq!(gene.state, original_state);
886 assert_eq!(receiver.audit_count(), 1);
887 }
888
889 #[test]
890 fn test_remote_capsule_at_threshold_is_promoted() {
891 let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
892 let capsule = make_capsule("cap-3", "gene-3", 0.75_f32);
896 let mut gene = make_plain_gene("gene-3");
897
898 let disposition = receiver.on_capsule_received(&capsule, &mut gene);
899 assert!(
900 matches!(&disposition, CapsuleDisposition::Promoted { .. }),
901 "capsule at or above threshold must be promoted"
902 );
903 }
904
905 #[test]
906 fn test_remote_capsule_audit_log_written() {
907 let dir = std::env::temp_dir();
908 let log_path = dir.join(format!(
909 "capsule_audit_log_test_{}.jsonl",
910 std::time::SystemTime::now()
911 .duration_since(std::time::UNIX_EPOCH)
912 .unwrap()
913 .as_nanos()
914 ));
915
916 {
917 let receiver = RemoteCapsuleReceiver::new(Some(&log_path), None);
918 let c1 = make_capsule("cap-a", "g-a", 0.90);
919 let c2 = make_capsule("cap-b", "g-b", 0.30);
920 let mut gene_a = make_plain_gene("g-a");
921 let mut gene_b = make_plain_gene("g-b");
922 receiver.on_capsule_received(&c1, &mut gene_a);
923 receiver.on_capsule_received(&c2, &mut gene_b);
924 }
925
926 let contents = std::fs::read_to_string(&log_path).expect("audit log must exist");
927 let lines: Vec<&str> = contents.lines().collect();
928 assert_eq!(lines.len(), 2, "two decisions must produce two log lines");
929 for line in &lines {
931 serde_json::from_str::<serde_json::Value>(line)
932 .expect("each audit line must be valid JSON");
933 }
934 let _ = std::fs::remove_file(&log_path);
935 }
936
937 #[test]
938 fn test_remote_capsule_audit_trail_in_memory() {
939 let receiver = RemoteCapsuleReceiver::default();
940 let c1 = make_capsule("cap-x", "g-x", 0.80);
941 let c2 = make_capsule("cap-y", "g-y", 0.50);
942 let mut g1 = make_plain_gene("g-x");
943 let mut g2 = make_plain_gene("g-y");
944
945 receiver.on_capsule_received(&c1, &mut g1);
946 receiver.on_capsule_received(&c2, &mut g2);
947
948 let trail = receiver.audit_trail();
949 assert_eq!(trail.len(), 2);
950 assert_eq!(trail[0].0, "cap-x");
951 assert_eq!(trail[1].0, "cap-y");
952 assert!(matches!(&trail[0].1, CapsuleDisposition::Promoted { .. }));
953 assert!(matches!(
954 &trail[1].1,
955 CapsuleDisposition::Quarantined { .. }
956 ));
957 }
958}