1use crate::{
25 verify_envelope, EvolutionEnvelope, FetchQuery, FetchResponse, NetworkAsset,
26 PeerRateLimitConfig, PeerRateLimiter, PublishRequest, SyncAudit,
27};
28use chrono::Utc;
29use oris_evolution::{AssetState, Capsule, Gene};
30use serde::{Deserialize, Serialize};
31use std::collections::HashMap;
32use std::sync::Mutex;
33
34#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum QuarantineState {
42 Pending,
44 Validated,
46 Failed,
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize)]
53pub struct QuarantineEntry {
54 pub asset_id: String,
55 pub asset: NetworkAsset,
56 pub origin_peer: String,
57 pub state: QuarantineState,
58 pub received_at: i64,
60 pub failure_reason: Option<String>,
62}
63
64pub struct QuarantineStore {
69 entries: Mutex<HashMap<String, QuarantineEntry>>,
70}
71
72impl Default for QuarantineStore {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl QuarantineStore {
79 pub fn new() -> Self {
80 Self {
81 entries: Mutex::new(HashMap::new()),
82 }
83 }
84
85 pub fn admit(
90 &self,
91 asset_id: impl Into<String>,
92 asset: NetworkAsset,
93 origin_peer: impl Into<String>,
94 ) -> bool {
95 let id = asset_id.into();
96 let mut entries = self.entries.lock().unwrap();
97 if entries.contains_key(&id) {
98 return false;
99 }
100 entries.insert(
101 id.clone(),
102 QuarantineEntry {
103 asset_id: id,
104 asset,
105 origin_peer: origin_peer.into(),
106 state: QuarantineState::Pending,
107 received_at: now_unix_secs(),
108 failure_reason: None,
109 },
110 );
111 true
112 }
113
114 pub fn validate_asset(&self, asset_id: &str) -> bool {
118 let mut entries = self.entries.lock().unwrap();
119 if let Some(entry) = entries.get_mut(asset_id) {
120 entry.state = QuarantineState::Validated;
121 entry.failure_reason = None;
122 true
123 } else {
124 false
125 }
126 }
127
128 pub fn fail_asset(&self, asset_id: &str, reason: impl Into<String>) -> bool {
132 let mut entries = self.entries.lock().unwrap();
133 if let Some(entry) = entries.get_mut(asset_id) {
134 entry.state = QuarantineState::Failed;
135 entry.failure_reason = Some(reason.into());
136 true
137 } else {
138 false
139 }
140 }
141
142 pub fn get(&self, asset_id: &str) -> Option<QuarantineEntry> {
144 self.entries.lock().unwrap().get(asset_id).cloned()
145 }
146
147 pub fn is_selectable(&self, asset_id: &str) -> bool {
149 self.entries
150 .lock()
151 .unwrap()
152 .get(asset_id)
153 .map(|e| e.state == QuarantineState::Validated)
154 .unwrap_or(false)
155 }
156
157 pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
159 self.entries
160 .lock()
161 .unwrap()
162 .values()
163 .filter(|e| e.state == QuarantineState::Pending)
164 .cloned()
165 .collect()
166 }
167
168 pub fn validated_entries(&self) -> Vec<QuarantineEntry> {
170 self.entries
171 .lock()
172 .unwrap()
173 .values()
174 .filter(|e| e.state == QuarantineState::Validated)
175 .cloned()
176 .collect()
177 }
178
179 pub fn len(&self) -> usize {
181 self.entries.lock().unwrap().len()
182 }
183
184 pub fn is_empty(&self) -> bool {
185 self.entries.lock().unwrap().is_empty()
186 }
187}
188
189#[derive(Clone, Debug, Default, Serialize, Deserialize)]
195pub struct SyncStats {
196 pub batches_processed: u64,
197 pub assets_received: u64,
198 pub assets_quarantined: u64,
199 pub assets_skipped_duplicate: u64,
200 pub assets_failed_validation: u64,
201 pub assets_promoted: u64,
202}
203
204pub struct GossipSyncEngine {
211 local_peer_id: String,
212 local_sequence: Mutex<u64>,
214 peer_cursors: Mutex<HashMap<String, u64>>,
216 local_assets: Mutex<Vec<(u64, NetworkAsset)>>,
218 quarantine: QuarantineStore,
219 stats: Mutex<SyncStats>,
220}
221
222impl GossipSyncEngine {
223 pub fn new(local_peer_id: impl Into<String>) -> Self {
224 Self {
225 local_peer_id: local_peer_id.into(),
226 local_sequence: Mutex::new(0),
227 peer_cursors: Mutex::new(HashMap::new()),
228 local_assets: Mutex::new(Vec::new()),
229 quarantine: QuarantineStore::new(),
230 stats: Mutex::new(SyncStats::default()),
231 }
232 }
233
234 pub fn publish_local(&self, asset: NetworkAsset) -> u64 {
237 let mut seq = self.local_sequence.lock().unwrap();
238 *seq += 1;
239 let s = *seq;
240 self.local_assets.lock().unwrap().push((s, asset));
241 s
242 }
243
244 pub fn build_publish_request(&self, since_cursor: u64) -> PublishRequest {
247 let assets: Vec<NetworkAsset> = self
248 .local_assets
249 .lock()
250 .unwrap()
251 .iter()
252 .filter(|(seq, _)| *seq > since_cursor)
253 .map(|(_, a)| a.clone())
254 .collect();
255
256 PublishRequest {
257 sender_id: self.local_peer_id.clone(),
258 assets,
259 since_cursor: if since_cursor > 0 {
260 Some(since_cursor.to_string())
261 } else {
262 None
263 },
264 resume_token: None,
265 }
266 }
267
268 pub fn receive_publish(&self, request: &PublishRequest) -> SyncAudit {
274 let batch_id = format!("batch-{}-{}", request.sender_id, now_unix_secs());
275 let mut applied = 0usize;
276 let mut skipped = 0usize;
277
278 for asset in &request.assets {
279 let asset_id = asset_id_of(asset);
280 let admitted = self
281 .quarantine
282 .admit(&asset_id, asset.clone(), &request.sender_id);
283 if admitted {
284 applied += 1;
285 } else {
286 skipped += 1;
287 }
288 }
289
290 if let Some(cursor_str) = &request.since_cursor {
292 if let Ok(seq) = cursor_str.parse::<u64>() {
293 let mut cursors = self.peer_cursors.lock().unwrap();
294 let entry = cursors.entry(request.sender_id.clone()).or_insert(0);
295 if seq > *entry {
296 *entry = seq;
297 }
298 }
299 }
300
301 {
302 let mut stats = self.stats.lock().unwrap();
303 stats.batches_processed += 1;
304 stats.assets_received += request.assets.len() as u64;
305 stats.assets_quarantined += applied as u64;
306 stats.assets_skipped_duplicate += skipped as u64;
307 }
308
309 SyncAudit {
310 batch_id,
311 requested_cursor: request.since_cursor.clone(),
312 scanned_count: request.assets.len(),
313 applied_count: applied,
314 skipped_count: skipped,
315 failed_count: 0,
316 failure_reasons: vec![],
317 }
318 }
319
320 pub fn build_fetch_query(&self, peer_id: &str, signals: Vec<String>) -> FetchQuery {
323 let cursor = self
324 .peer_cursors
325 .lock()
326 .unwrap()
327 .get(peer_id)
328 .copied()
329 .unwrap_or(0);
330
331 FetchQuery {
332 sender_id: self.local_peer_id.clone(),
333 signals,
334 since_cursor: if cursor > 0 {
335 Some(cursor.to_string())
336 } else {
337 None
338 },
339 resume_token: None,
340 }
341 }
342
343 pub fn receive_fetch_response(&self, peer_id: &str, response: &FetchResponse) -> SyncAudit {
347 let fake_request = PublishRequest {
348 sender_id: peer_id.to_string(),
349 assets: response.assets.clone(),
350 since_cursor: response.next_cursor.clone(),
351 resume_token: response.resume_token.clone(),
352 };
353 self.receive_publish(&fake_request)
354 }
355
356 pub fn validate_and_promote<F>(&self, asset_id: &str, validator: F) -> bool
362 where
363 F: FnOnce(&NetworkAsset) -> Result<(), String>,
364 {
365 let entry = match self.quarantine.get(asset_id) {
366 Some(e) => e,
367 None => return false,
368 };
369
370 match validator(&entry.asset) {
371 Ok(()) => {
372 self.quarantine.validate_asset(asset_id);
373 let mut stats = self.stats.lock().unwrap();
374 stats.assets_promoted += 1;
375 true
376 }
377 Err(reason) => {
378 self.quarantine.fail_asset(asset_id, &reason);
379 let mut stats = self.stats.lock().unwrap();
380 stats.assets_failed_validation += 1;
381 false
382 }
383 }
384 }
385
386 pub fn is_asset_selectable(&self, asset_id: &str) -> bool {
390 self.quarantine.is_selectable(asset_id)
391 }
392
393 pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
395 self.quarantine.pending_entries()
396 }
397
398 pub fn stats(&self) -> SyncStats {
400 self.stats.lock().unwrap().clone()
401 }
402
403 pub fn peer_cursor(&self, peer_id: &str) -> u64 {
405 self.peer_cursors
406 .lock()
407 .unwrap()
408 .get(peer_id)
409 .copied()
410 .unwrap_or(0)
411 }
412}
413
414fn now_unix_secs() -> i64 {
419 std::time::SystemTime::now()
420 .duration_since(std::time::UNIX_EPOCH)
421 .map(|d| d.as_secs() as i64)
422 .unwrap_or(0)
423}
424
425fn asset_id_of(asset: &NetworkAsset) -> String {
427 match asset {
428 NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
429 NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
430 NetworkAsset::EvolutionEvent { event } => {
431 use sha2::{Digest, Sha256};
432 let payload = serde_json::to_vec(event).unwrap_or_default();
433 let mut hasher = Sha256::new();
434 hasher.update(payload);
435 format!("event:{}", hex::encode(hasher.finalize()))
436 }
437 }
438}
439
440pub const PROMOTE_THRESHOLD: f64 = 0.70;
446
447#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
449#[serde(rename_all = "snake_case")]
450pub enum QuarantineReason {
451 LowScore { score: f64 },
453 SignatureInvalid,
455 GeneMissing,
457}
458
459#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
460#[serde(rename_all = "snake_case")]
461pub enum RejectionReason {
462 InvalidSignature,
463 MissingSignature,
464 RateLimited,
465 GeneMissing,
466}
467
468impl std::fmt::Display for RejectionReason {
469 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470 match self {
471 RejectionReason::InvalidSignature => write!(f, "invalid_signature"),
472 RejectionReason::MissingSignature => write!(f, "missing_signature"),
473 RejectionReason::RateLimited => write!(f, "rate_limited"),
474 RejectionReason::GeneMissing => write!(f, "gene_missing"),
475 }
476 }
477}
478
479impl std::fmt::Display for QuarantineReason {
480 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
481 match self {
482 QuarantineReason::LowScore { score } => {
483 write!(f, "low_score:{:.4}", score)
484 }
485 QuarantineReason::SignatureInvalid => write!(f, "signature_invalid"),
486 QuarantineReason::GeneMissing => write!(f, "gene_missing"),
487 }
488 }
489}
490
491#[derive(Clone, Debug, Serialize, Deserialize)]
493#[serde(rename_all = "snake_case", tag = "disposition")]
494pub enum CapsuleDisposition {
495 Promoted {
497 gene_id: String,
498 score: f64,
500 },
501 Quarantined { reason: String },
503}
504
505#[derive(Serialize)]
507struct AuditLogEntry<'a> {
508 timestamp: String,
509 peer_id: &'a str,
510 capsule_id: &'a str,
511 gene_id: &'a str,
512 disposition: NetworkAuditDisposition,
513 #[serde(skip_serializing_if = "Option::is_none")]
514 reason: Option<String>,
515 score: f64,
516}
517
518#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
519#[serde(rename_all = "snake_case")]
520pub enum NetworkAuditDisposition {
521 Accept,
522 Reject,
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize)]
526pub struct NetworkAuditEntry {
527 pub timestamp: String,
528 pub peer_id: String,
529 pub capsule_id: String,
530 pub gene_id: String,
531 pub disposition: NetworkAuditDisposition,
532 #[serde(default, skip_serializing_if = "Option::is_none")]
533 pub reason: Option<String>,
534 #[serde(default, skip_serializing_if = "Option::is_none")]
535 pub score: Option<f64>,
536}
537
538pub struct RemoteCapsuleReceiver {
547 threshold: f64,
549 audit_log_path: Option<std::path::PathBuf>,
551 audit_trail: Mutex<Vec<(String, CapsuleDisposition)>>,
553 network_audit_trail: Mutex<Vec<NetworkAuditEntry>>,
554 rate_limiter: PeerRateLimiter,
555}
556
557impl Default for RemoteCapsuleReceiver {
558 fn default() -> Self {
559 Self::new(None::<&str>, None)
560 }
561}
562
563impl RemoteCapsuleReceiver {
564 pub fn new(
570 audit_log_path: Option<impl AsRef<std::path::Path>>,
571 threshold: Option<f64>,
572 ) -> Self {
573 Self::with_rate_limit_config(audit_log_path, threshold, PeerRateLimitConfig::default())
574 }
575
576 pub fn with_rate_limit_config(
577 audit_log_path: Option<impl AsRef<std::path::Path>>,
578 threshold: Option<f64>,
579 rate_limit_config: PeerRateLimitConfig,
580 ) -> Self {
581 Self {
582 threshold: threshold.unwrap_or(PROMOTE_THRESHOLD),
583 audit_log_path: audit_log_path.map(|p| p.as_ref().to_path_buf()),
584 audit_trail: Mutex::new(Vec::new()),
585 network_audit_trail: Mutex::new(Vec::new()),
586 rate_limiter: PeerRateLimiter::new(rate_limit_config),
587 }
588 }
589
590 pub fn on_capsule_received(&self, capsule: &Capsule, gene: &mut Gene) -> CapsuleDisposition {
604 self.evaluate_capsule("unknown", capsule, gene)
605 }
606
607 pub fn on_signed_capsule_received(
608 &self,
609 peer_id: &str,
610 public_key_hex: &str,
611 envelope: &EvolutionEnvelope,
612 capsule: &Capsule,
613 gene: &mut Gene,
614 ) -> Result<CapsuleDisposition, RejectionReason> {
615 if !self.rate_limiter.check(peer_id) {
616 self.write_rejection_audit_entry(
617 peer_id,
618 capsule,
619 Some(gene.id.as_str()),
620 RejectionReason::RateLimited,
621 );
622 return Err(RejectionReason::RateLimited);
623 }
624
625 if envelope.signature.is_none() {
626 self.write_rejection_audit_entry(
627 peer_id,
628 capsule,
629 Some(gene.id.as_str()),
630 RejectionReason::MissingSignature,
631 );
632 return Err(RejectionReason::MissingSignature);
633 }
634
635 if verify_envelope(public_key_hex, envelope).is_err() {
636 self.write_rejection_audit_entry(
637 peer_id,
638 capsule,
639 Some(gene.id.as_str()),
640 RejectionReason::InvalidSignature,
641 );
642 return Err(RejectionReason::InvalidSignature);
643 }
644
645 let has_capsule = envelope.assets.iter().any(|asset| {
646 matches!(asset, NetworkAsset::Capsule { capsule: remote } if remote.id == capsule.id && remote.gene_id == capsule.gene_id)
647 });
648 let has_gene = envelope.assets.iter().any(
649 |asset| matches!(asset, NetworkAsset::Gene { gene: remote } if remote.id == gene.id),
650 );
651 if !has_capsule || !has_gene || gene.id != capsule.gene_id {
652 self.write_rejection_audit_entry(
653 peer_id,
654 capsule,
655 Some(gene.id.as_str()),
656 RejectionReason::GeneMissing,
657 );
658 return Err(RejectionReason::GeneMissing);
659 }
660
661 Ok(self.evaluate_capsule(peer_id, capsule, gene))
662 }
663
664 pub fn network_audit_trail(&self) -> Vec<NetworkAuditEntry> {
665 self.network_audit_trail.lock().unwrap().clone()
666 }
667
668 fn evaluate_capsule(
669 &self,
670 peer_id: &str,
671 capsule: &Capsule,
672 gene: &mut Gene,
673 ) -> CapsuleDisposition {
674 let score = capsule.confidence as f64;
675
676 let disposition = if score >= self.threshold {
677 gene.state = AssetState::Promoted;
678 CapsuleDisposition::Promoted {
679 gene_id: capsule.gene_id.clone(),
680 score,
681 }
682 } else {
683 let reason = QuarantineReason::LowScore { score };
684 CapsuleDisposition::Quarantined {
685 reason: reason.to_string(),
686 }
687 };
688
689 self.write_accept_audit_entry(peer_id, capsule, &disposition, score);
690 self.audit_trail
691 .lock()
692 .unwrap()
693 .push((capsule.id.clone(), disposition.clone()));
694
695 disposition
696 }
697
698 pub fn audit_trail(&self) -> Vec<(String, CapsuleDisposition)> {
700 self.audit_trail.lock().unwrap().clone()
701 }
702
703 pub fn audit_count(&self) -> usize {
705 self.audit_trail.lock().unwrap().len()
706 }
707
708 fn write_accept_audit_entry(
709 &self,
710 peer_id: &str,
711 capsule: &Capsule,
712 disposition: &CapsuleDisposition,
713 score: f64,
714 ) {
715 let Some(ref path) = self.audit_log_path else {
716 self.network_audit_trail
717 .lock()
718 .unwrap()
719 .push(NetworkAuditEntry {
720 timestamp: Utc::now().to_rfc3339(),
721 peer_id: peer_id.to_string(),
722 capsule_id: capsule.id.clone(),
723 gene_id: capsule.gene_id.clone(),
724 disposition: NetworkAuditDisposition::Accept,
725 reason: match disposition {
726 CapsuleDisposition::Promoted { .. } => None,
727 CapsuleDisposition::Quarantined { reason } => Some(reason.clone()),
728 },
729 score: Some(score),
730 });
731 return;
732 };
733 let entry = AuditLogEntry {
734 timestamp: Utc::now().to_rfc3339(),
735 peer_id,
736 capsule_id: &capsule.id,
737 gene_id: &capsule.gene_id,
738 disposition: NetworkAuditDisposition::Accept,
739 reason: match disposition {
740 CapsuleDisposition::Promoted { .. } => None,
741 CapsuleDisposition::Quarantined { reason } => Some(reason.clone()),
742 },
743 score,
744 };
745 self.network_audit_trail
746 .lock()
747 .unwrap()
748 .push(NetworkAuditEntry {
749 timestamp: entry.timestamp.clone(),
750 peer_id: entry.peer_id.to_string(),
751 capsule_id: entry.capsule_id.to_string(),
752 gene_id: entry.gene_id.to_string(),
753 disposition: entry.disposition.clone(),
754 reason: entry.reason.clone(),
755 score: Some(entry.score),
756 });
757 if let Ok(mut line) = serde_json::to_string(&entry) {
758 line.push('\n');
759 use std::io::Write;
760 if let Ok(mut file) = std::fs::OpenOptions::new()
761 .create(true)
762 .append(true)
763 .open(path)
764 {
765 let _ = file.write_all(line.as_bytes());
766 }
767 }
768 }
769
770 fn write_rejection_audit_entry(
771 &self,
772 peer_id: &str,
773 capsule: &Capsule,
774 gene_id: Option<&str>,
775 reason: RejectionReason,
776 ) {
777 let entry = NetworkAuditEntry {
778 timestamp: Utc::now().to_rfc3339(),
779 peer_id: peer_id.to_string(),
780 capsule_id: capsule.id.clone(),
781 gene_id: gene_id.unwrap_or(&capsule.gene_id).to_string(),
782 disposition: NetworkAuditDisposition::Reject,
783 reason: Some(reason.to_string()),
784 score: Some(capsule.confidence as f64),
785 };
786 self.network_audit_trail.lock().unwrap().push(entry.clone());
787
788 let Some(ref path) = self.audit_log_path else {
789 return;
790 };
791
792 if let Ok(mut line) = serde_json::to_string(&entry) {
793 line.push('\n');
794 use std::io::Write;
795 if let Ok(mut file) = std::fs::OpenOptions::new()
796 .create(true)
797 .append(true)
798 .open(path)
799 {
800 let _ = file.write_all(line.as_bytes());
801 }
802 }
803 }
804}
805
806#[cfg(test)]
811mod tests {
812 use super::*;
813 use crate::{sign_envelope, NodeKeypair};
814 use oris_evolution::{AssetState, Capsule, EnvFingerprint, Gene, Outcome};
815
816 fn make_gene(id: &str) -> NetworkAsset {
817 NetworkAsset::Gene {
818 gene: Gene {
819 id: id.to_string(),
820 signals: vec!["test.fail".into()],
821 strategy: vec!["fix test".into()],
822 validation: vec!["cargo test".into()],
823 state: AssetState::Promoted,
824 task_class_id: None,
825 },
826 }
827 }
828
829 fn make_capsule(id: &str, gene_id: &str, confidence: f32) -> Capsule {
830 Capsule {
831 id: id.to_string(),
832 gene_id: gene_id.to_string(),
833 mutation_id: "mut-1".to_string(),
834 run_id: "run-1".to_string(),
835 diff_hash: "abc123".to_string(),
836 confidence,
837 env: EnvFingerprint {
838 rustc_version: "1.80.0".to_string(),
839 cargo_lock_hash: "hash".to_string(),
840 target_triple: "aarch64-apple-darwin".to_string(),
841 os: "macos".to_string(),
842 },
843 outcome: Outcome {
844 success: true,
845 validation_profile: "default".to_string(),
846 validation_duration_ms: 100,
847 changed_files: vec![],
848 validator_hash: "vh1".to_string(),
849 lines_changed: 5,
850 replay_verified: false,
851 },
852 state: AssetState::Candidate,
853 }
854 }
855
856 fn make_plain_gene(id: &str) -> Gene {
857 Gene {
858 id: id.to_string(),
859 signals: vec!["test.fail".into()],
860 strategy: vec!["fix test".into()],
861 validation: vec!["cargo test".into()],
862 state: AssetState::Candidate,
863 task_class_id: None,
864 }
865 }
866
867 fn make_signed_envelope(
868 keypair: &NodeKeypair,
869 sender_id: &str,
870 capsule: &Capsule,
871 gene: &Gene,
872 ) -> EvolutionEnvelope {
873 let envelope = EvolutionEnvelope::publish(
874 sender_id,
875 vec![
876 NetworkAsset::Gene { gene: gene.clone() },
877 NetworkAsset::Capsule {
878 capsule: capsule.clone(),
879 },
880 ],
881 );
882 sign_envelope(keypair, &envelope)
883 }
884
885 #[test]
890 fn test_two_node_sync_end_to_end() {
891 let node_a = GossipSyncEngine::new("node-a");
892 let node_b = GossipSyncEngine::new("node-b");
893
894 let seq = node_a.publish_local(make_gene("gene-1"));
896 assert_eq!(seq, 1);
897
898 let req = node_a.build_publish_request(0);
900 assert_eq!(req.assets.len(), 1);
901 let audit = node_b.receive_publish(&req);
902 assert_eq!(audit.applied_count, 1);
903 assert_eq!(audit.skipped_count, 0);
904
905 let entry = node_b.quarantine.get("gene:gene-1").unwrap();
907 assert_eq!(entry.state, QuarantineState::Pending);
908 assert_eq!(entry.origin_peer, "node-a");
909 }
910
911 #[test]
912 fn test_incremental_cursor_sync() {
913 let node_a = GossipSyncEngine::new("node-a");
914 let node_b = GossipSyncEngine::new("node-b");
915
916 node_a.publish_local(make_gene("gene-1"));
918 node_a.publish_local(make_gene("gene-2"));
919
920 let req1 = node_a.build_publish_request(0);
922 node_b.receive_publish(&req1);
923 assert_eq!(node_b.quarantine.len(), 2);
924
925 node_a.publish_local(make_gene("gene-3"));
927
928 let req2 = node_a.build_publish_request(2);
930 let audit = node_b.receive_publish(&req2);
931 assert_eq!(audit.applied_count, 1);
933 assert_eq!(node_b.quarantine.len(), 3);
934 }
935
936 #[test]
941 fn test_quarantine_admit_and_validate() {
942 let store = QuarantineStore::new();
943 let asset = make_gene("g-1");
944
945 assert!(store.admit("gene:g-1", asset, "peer-a"));
946 assert_eq!(
947 store.get("gene:g-1").unwrap().state,
948 QuarantineState::Pending
949 );
950 assert!(!store.is_selectable("gene:g-1")); store.validate_asset("gene:g-1");
953 assert_eq!(
954 store.get("gene:g-1").unwrap().state,
955 QuarantineState::Validated
956 );
957 assert!(store.is_selectable("gene:g-1")); }
959
960 #[test]
961 fn test_quarantine_fail_asset() {
962 let store = QuarantineStore::new();
963 store.admit("gene:g-bad", make_gene("g-bad"), "peer-a");
964 store.fail_asset("gene:g-bad", "signature mismatch");
965
966 let entry = store.get("gene:g-bad").unwrap();
967 assert_eq!(entry.state, QuarantineState::Failed);
968 assert_eq!(entry.failure_reason.as_deref(), Some("signature mismatch"));
969 assert!(!store.is_selectable("gene:g-bad"));
970 }
971
972 #[test]
973 fn test_validate_and_promote_via_engine() {
974 let engine = GossipSyncEngine::new("node-b");
975 let req = PublishRequest {
976 sender_id: "node-a".into(),
977 assets: vec![make_gene("g-ok")],
978 since_cursor: None,
979 resume_token: None,
980 };
981 engine.receive_publish(&req);
982
983 let promoted = engine.validate_and_promote("gene:g-ok", |_| Ok(()));
984 assert!(promoted);
985 assert!(engine.is_asset_selectable("gene:g-ok"));
986 }
987
988 #[test]
989 fn test_validate_and_promote_failure_not_selectable() {
990 let engine = GossipSyncEngine::new("node-b");
991 let req = PublishRequest {
992 sender_id: "node-a".into(),
993 assets: vec![make_gene("g-invalid")],
994 since_cursor: None,
995 resume_token: None,
996 };
997 engine.receive_publish(&req);
998
999 let promoted = engine.validate_and_promote("gene:g-invalid", |_| Err("bad hash".into()));
1000 assert!(!promoted);
1001 assert!(!engine.is_asset_selectable("gene:g-invalid"));
1002 }
1003
1004 #[test]
1009 fn test_pending_gene_not_selectable_under_fault() {
1010 let engine = GossipSyncEngine::new("node-b");
1011 let req = PublishRequest {
1014 sender_id: "node-a".into(),
1015 assets: vec![make_gene("g-unvalidated")],
1016 since_cursor: None,
1017 resume_token: None,
1018 };
1019 engine.receive_publish(&req);
1020
1021 assert!(
1023 !engine.is_asset_selectable("gene:g-unvalidated"),
1024 "pending gene must not be selectable (failure-closed guarantee)"
1025 );
1026 assert_eq!(engine.pending_entries().len(), 1);
1027 }
1028
1029 #[test]
1030 fn test_unknown_asset_not_selectable() {
1031 let engine = GossipSyncEngine::new("node-b");
1032 assert!(!engine.is_asset_selectable("gene:nonexistent"));
1033 }
1034
1035 #[test]
1036 fn test_duplicate_admit_is_idempotent() {
1037 let store = QuarantineStore::new();
1038 assert!(store.admit("gene:g", make_gene("g"), "peer-a"));
1039 store.validate_asset("gene:g");
1040 assert!(!store.admit("gene:g", make_gene("g"), "peer-b"));
1042 assert_eq!(
1043 store.get("gene:g").unwrap().state,
1044 QuarantineState::Validated
1045 );
1046 }
1047
1048 #[test]
1049 fn test_stats_accumulate_correctly() {
1050 let engine = GossipSyncEngine::new("me");
1051 let req = PublishRequest {
1052 sender_id: "peer".into(),
1053 assets: vec![make_gene("g1"), make_gene("g2")],
1054 since_cursor: None,
1055 resume_token: None,
1056 };
1057 engine.receive_publish(&req);
1058 engine.validate_and_promote("gene:g1", |_| Ok(()));
1059 engine.validate_and_promote("gene:g2", |_| Err("bad".into()));
1060
1061 let s = engine.stats();
1062 assert_eq!(s.assets_quarantined, 2);
1063 assert_eq!(s.assets_promoted, 1);
1064 assert_eq!(s.assets_failed_validation, 1);
1065 }
1066
1067 #[test]
1072 fn test_remote_capsule_high_score_is_promoted() {
1073 let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
1074 let capsule = make_capsule("cap-1", "gene-1", 0.85);
1075 let mut gene = make_plain_gene("gene-1");
1076
1077 let disposition = receiver.on_capsule_received(&capsule, &mut gene);
1078
1079 match &disposition {
1080 CapsuleDisposition::Promoted { gene_id, score } => {
1081 assert_eq!(gene_id, "gene-1");
1082 assert!(*score >= PROMOTE_THRESHOLD);
1083 }
1084 other => panic!("expected Promoted, got {:?}", other),
1085 }
1086 assert_eq!(gene.state, AssetState::Promoted);
1087 assert_eq!(receiver.audit_count(), 1);
1088 }
1089
1090 #[test]
1091 fn test_remote_capsule_low_score_is_quarantined() {
1092 let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
1093 let capsule = make_capsule("cap-2", "gene-2", 0.40);
1094 let mut gene = make_plain_gene("gene-2");
1095 let original_state = gene.state.clone();
1096
1097 let disposition = receiver.on_capsule_received(&capsule, &mut gene);
1098
1099 match &disposition {
1100 CapsuleDisposition::Quarantined { reason } => {
1101 assert!(reason.starts_with("low_score:"), "reason={}", reason);
1102 }
1103 other => panic!("expected Quarantined, got {:?}", other),
1104 }
1105 assert_eq!(gene.state, original_state);
1107 assert_eq!(receiver.audit_count(), 1);
1108 }
1109
1110 #[test]
1111 fn test_remote_capsule_at_threshold_is_promoted() {
1112 let receiver = RemoteCapsuleReceiver::new(None::<&str>, None);
1113 let capsule = make_capsule("cap-3", "gene-3", 0.75_f32);
1117 let mut gene = make_plain_gene("gene-3");
1118
1119 let disposition = receiver.on_capsule_received(&capsule, &mut gene);
1120 assert!(
1121 matches!(&disposition, CapsuleDisposition::Promoted { .. }),
1122 "capsule at or above threshold must be promoted"
1123 );
1124 }
1125
1126 #[test]
1127 fn test_remote_capsule_audit_log_written() {
1128 let dir = std::env::temp_dir();
1129 let log_path = dir.join(format!(
1130 "capsule_audit_log_test_{}.jsonl",
1131 std::time::SystemTime::now()
1132 .duration_since(std::time::UNIX_EPOCH)
1133 .unwrap()
1134 .as_nanos()
1135 ));
1136
1137 {
1138 let receiver = RemoteCapsuleReceiver::new(Some(&log_path), None);
1139 let c1 = make_capsule("cap-a", "g-a", 0.90);
1140 let c2 = make_capsule("cap-b", "g-b", 0.30);
1141 let mut gene_a = make_plain_gene("g-a");
1142 let mut gene_b = make_plain_gene("g-b");
1143 receiver.on_capsule_received(&c1, &mut gene_a);
1144 receiver.on_capsule_received(&c2, &mut gene_b);
1145 }
1146
1147 let contents = std::fs::read_to_string(&log_path).expect("audit log must exist");
1148 let lines: Vec<&str> = contents.lines().collect();
1149 assert_eq!(lines.len(), 2, "two decisions must produce two log lines");
1150 for line in &lines {
1152 serde_json::from_str::<serde_json::Value>(line)
1153 .expect("each audit line must be valid JSON");
1154 }
1155 let _ = std::fs::remove_file(&log_path);
1156 }
1157
1158 #[test]
1159 fn test_remote_capsule_audit_trail_in_memory() {
1160 let receiver = RemoteCapsuleReceiver::default();
1161 let c1 = make_capsule("cap-x", "g-x", 0.80);
1162 let c2 = make_capsule("cap-y", "g-y", 0.50);
1163 let mut g1 = make_plain_gene("g-x");
1164 let mut g2 = make_plain_gene("g-y");
1165
1166 receiver.on_capsule_received(&c1, &mut g1);
1167 receiver.on_capsule_received(&c2, &mut g2);
1168
1169 let trail = receiver.audit_trail();
1170 assert_eq!(trail.len(), 2);
1171 assert_eq!(trail[0].0, "cap-x");
1172 assert_eq!(trail[1].0, "cap-y");
1173 assert!(matches!(&trail[0].1, CapsuleDisposition::Promoted { .. }));
1174 assert!(matches!(
1175 &trail[1].1,
1176 CapsuleDisposition::Quarantined { .. }
1177 ));
1178 }
1179
1180 #[test]
1181 fn test_remote_capsule_missing_signature_is_rejected() {
1182 let receiver = RemoteCapsuleReceiver::default();
1183 let capsule = make_capsule("cap-sec-1", "gene-sec-1", 0.82);
1184 let mut gene = make_plain_gene("gene-sec-1");
1185 let envelope = EvolutionEnvelope::publish(
1186 "node-a",
1187 vec![
1188 NetworkAsset::Gene { gene: gene.clone() },
1189 NetworkAsset::Capsule {
1190 capsule: capsule.clone(),
1191 },
1192 ],
1193 );
1194
1195 let result = receiver
1196 .on_signed_capsule_received("peer-a", "deadbeef", &envelope, &capsule, &mut gene);
1197
1198 assert_eq!(result.unwrap_err(), RejectionReason::MissingSignature);
1199 assert_eq!(receiver.network_audit_trail().len(), 1);
1200 assert_eq!(gene.state, AssetState::Candidate);
1201 }
1202
1203 #[test]
1204 fn test_remote_capsule_tampered_signature_is_rejected() {
1205 let temp_path = std::env::temp_dir().join(format!(
1206 "oris-node-key-{}.key",
1207 std::time::SystemTime::now()
1208 .duration_since(std::time::UNIX_EPOCH)
1209 .unwrap()
1210 .as_nanos()
1211 ));
1212 let keypair =
1213 NodeKeypair::generate_at(&temp_path).expect("keypair generation should succeed");
1214 let receiver = RemoteCapsuleReceiver::default();
1215 let capsule = make_capsule("cap-sec-2", "gene-sec-2", 0.90);
1216 let mut gene = make_plain_gene("gene-sec-2");
1217 let mut envelope = make_signed_envelope(&keypair, "node-a", &capsule, &gene);
1218 if let Some(NetworkAsset::Gene { gene: remote_gene }) = envelope.assets.first_mut() {
1219 remote_gene.strategy.push("tampered".to_string());
1220 }
1221
1222 let result = receiver.on_signed_capsule_received(
1223 "peer-a",
1224 &keypair.public_key_hex(),
1225 &envelope,
1226 &capsule,
1227 &mut gene,
1228 );
1229
1230 assert_eq!(result.unwrap_err(), RejectionReason::InvalidSignature);
1231 let _ = std::fs::remove_file(temp_path);
1232 }
1233
1234 #[test]
1235 fn test_remote_capsule_rate_limited_is_rejected() {
1236 let temp_path = std::env::temp_dir().join(format!(
1237 "oris-node-key-{}.key",
1238 std::time::SystemTime::now()
1239 .duration_since(std::time::UNIX_EPOCH)
1240 .unwrap()
1241 .as_nanos()
1242 ));
1243 let keypair =
1244 NodeKeypair::generate_at(&temp_path).expect("keypair generation should succeed");
1245 let receiver = RemoteCapsuleReceiver::with_rate_limit_config(
1246 None::<&str>,
1247 None,
1248 PeerRateLimitConfig {
1249 max_capsules_per_hour: 1,
1250 window_secs: 3600,
1251 },
1252 );
1253 let capsule = make_capsule("cap-sec-3", "gene-sec-3", 0.91);
1254 let mut gene = make_plain_gene("gene-sec-3");
1255 let envelope = make_signed_envelope(&keypair, "node-a", &capsule, &gene);
1256
1257 let first = receiver.on_signed_capsule_received(
1258 "peer-a",
1259 &keypair.public_key_hex(),
1260 &envelope,
1261 &capsule,
1262 &mut gene,
1263 );
1264 assert!(first.is_ok());
1265
1266 let mut gene_again = make_plain_gene("gene-sec-3");
1267 let second = receiver.on_signed_capsule_received(
1268 "peer-a",
1269 &keypair.public_key_hex(),
1270 &envelope,
1271 &capsule,
1272 &mut gene_again,
1273 );
1274 assert_eq!(second.unwrap_err(), RejectionReason::RateLimited);
1275 let _ = std::fs::remove_file(temp_path);
1276 }
1277
1278 #[test]
1279 fn test_network_audit_log_records_accept_and_reject_events() {
1280 let temp_key = std::env::temp_dir().join(format!(
1281 "oris-node-key-{}.key",
1282 std::time::SystemTime::now()
1283 .duration_since(std::time::UNIX_EPOCH)
1284 .unwrap()
1285 .as_nanos()
1286 ));
1287 let keypair =
1288 NodeKeypair::generate_at(&temp_key).expect("keypair generation should succeed");
1289 let log_path = std::env::temp_dir().join(format!(
1290 "network_audit_log_test_{}.jsonl",
1291 std::time::SystemTime::now()
1292 .duration_since(std::time::UNIX_EPOCH)
1293 .unwrap()
1294 .as_nanos()
1295 ));
1296
1297 let receiver = RemoteCapsuleReceiver::with_rate_limit_config(
1298 Some(&log_path),
1299 None,
1300 PeerRateLimitConfig {
1301 max_capsules_per_hour: 10,
1302 window_secs: 3600,
1303 },
1304 );
1305 let capsule = make_capsule("cap-sec-4", "gene-sec-4", 0.88);
1306 let mut gene = make_plain_gene("gene-sec-4");
1307 let envelope = make_signed_envelope(&keypair, "node-a", &capsule, &gene);
1308 let accepted = receiver.on_signed_capsule_received(
1309 "peer-a",
1310 &keypair.public_key_hex(),
1311 &envelope,
1312 &capsule,
1313 &mut gene,
1314 );
1315 assert!(accepted.is_ok());
1316
1317 let unsigned_envelope = EvolutionEnvelope::publish(
1318 "node-a",
1319 vec![
1320 NetworkAsset::Gene { gene: gene.clone() },
1321 NetworkAsset::Capsule {
1322 capsule: capsule.clone(),
1323 },
1324 ],
1325 );
1326 let mut rejected_gene = make_plain_gene("gene-sec-4");
1327 let rejected = receiver.on_signed_capsule_received(
1328 "peer-b",
1329 &keypair.public_key_hex(),
1330 &unsigned_envelope,
1331 &capsule,
1332 &mut rejected_gene,
1333 );
1334 assert_eq!(rejected.unwrap_err(), RejectionReason::MissingSignature);
1335
1336 let contents = std::fs::read_to_string(&log_path).expect("audit log must exist");
1337 let lines: Vec<serde_json::Value> = contents
1338 .lines()
1339 .map(|line| serde_json::from_str(line).expect("audit line must be valid JSON"))
1340 .collect();
1341 assert_eq!(lines.len(), 2);
1342 assert_eq!(lines[0]["disposition"], "accept");
1343 assert_eq!(lines[1]["disposition"], "reject");
1344 let _ = std::fs::remove_file(temp_key);
1345 let _ = std::fs::remove_file(log_path);
1346 }
1347}