1use crate::{FetchQuery, FetchResponse, NetworkAsset, PublishRequest, SyncAudit};
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::sync::Mutex;
28
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum QuarantineState {
37 Pending,
39 Validated,
41 Failed,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct QuarantineEntry {
49 pub asset_id: String,
50 pub asset: NetworkAsset,
51 pub origin_peer: String,
52 pub state: QuarantineState,
53 pub received_at: i64,
55 pub failure_reason: Option<String>,
57}
58
59pub struct QuarantineStore {
64 entries: Mutex<HashMap<String, QuarantineEntry>>,
65}
66
67impl Default for QuarantineStore {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl QuarantineStore {
74 pub fn new() -> Self {
75 Self {
76 entries: Mutex::new(HashMap::new()),
77 }
78 }
79
80 pub fn admit(
85 &self,
86 asset_id: impl Into<String>,
87 asset: NetworkAsset,
88 origin_peer: impl Into<String>,
89 ) -> bool {
90 let id = asset_id.into();
91 let mut entries = self.entries.lock().unwrap();
92 if entries.contains_key(&id) {
93 return false;
94 }
95 entries.insert(
96 id.clone(),
97 QuarantineEntry {
98 asset_id: id,
99 asset,
100 origin_peer: origin_peer.into(),
101 state: QuarantineState::Pending,
102 received_at: now_unix_secs(),
103 failure_reason: None,
104 },
105 );
106 true
107 }
108
109 pub fn validate_asset(&self, asset_id: &str) -> bool {
113 let mut entries = self.entries.lock().unwrap();
114 if let Some(entry) = entries.get_mut(asset_id) {
115 entry.state = QuarantineState::Validated;
116 entry.failure_reason = None;
117 true
118 } else {
119 false
120 }
121 }
122
123 pub fn fail_asset(&self, asset_id: &str, reason: impl Into<String>) -> bool {
127 let mut entries = self.entries.lock().unwrap();
128 if let Some(entry) = entries.get_mut(asset_id) {
129 entry.state = QuarantineState::Failed;
130 entry.failure_reason = Some(reason.into());
131 true
132 } else {
133 false
134 }
135 }
136
137 pub fn get(&self, asset_id: &str) -> Option<QuarantineEntry> {
139 self.entries.lock().unwrap().get(asset_id).cloned()
140 }
141
142 pub fn is_selectable(&self, asset_id: &str) -> bool {
144 self.entries
145 .lock()
146 .unwrap()
147 .get(asset_id)
148 .map(|e| e.state == QuarantineState::Validated)
149 .unwrap_or(false)
150 }
151
152 pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
154 self.entries
155 .lock()
156 .unwrap()
157 .values()
158 .filter(|e| e.state == QuarantineState::Pending)
159 .cloned()
160 .collect()
161 }
162
163 pub fn validated_entries(&self) -> Vec<QuarantineEntry> {
165 self.entries
166 .lock()
167 .unwrap()
168 .values()
169 .filter(|e| e.state == QuarantineState::Validated)
170 .cloned()
171 .collect()
172 }
173
174 pub fn len(&self) -> usize {
176 self.entries.lock().unwrap().len()
177 }
178
179 pub fn is_empty(&self) -> bool {
180 self.entries.lock().unwrap().is_empty()
181 }
182}
183
184#[derive(Clone, Debug, Default, Serialize, Deserialize)]
190pub struct SyncStats {
191 pub batches_processed: u64,
192 pub assets_received: u64,
193 pub assets_quarantined: u64,
194 pub assets_skipped_duplicate: u64,
195 pub assets_failed_validation: u64,
196 pub assets_promoted: u64,
197}
198
199pub struct GossipSyncEngine {
206 local_peer_id: String,
207 local_sequence: Mutex<u64>,
209 peer_cursors: Mutex<HashMap<String, u64>>,
211 local_assets: Mutex<Vec<(u64, NetworkAsset)>>,
213 quarantine: QuarantineStore,
214 stats: Mutex<SyncStats>,
215}
216
217impl GossipSyncEngine {
218 pub fn new(local_peer_id: impl Into<String>) -> Self {
219 Self {
220 local_peer_id: local_peer_id.into(),
221 local_sequence: Mutex::new(0),
222 peer_cursors: Mutex::new(HashMap::new()),
223 local_assets: Mutex::new(Vec::new()),
224 quarantine: QuarantineStore::new(),
225 stats: Mutex::new(SyncStats::default()),
226 }
227 }
228
229 pub fn publish_local(&self, asset: NetworkAsset) -> u64 {
232 let mut seq = self.local_sequence.lock().unwrap();
233 *seq += 1;
234 let s = *seq;
235 self.local_assets.lock().unwrap().push((s, asset));
236 s
237 }
238
239 pub fn build_publish_request(&self, since_cursor: u64) -> PublishRequest {
242 let assets: Vec<NetworkAsset> = self
243 .local_assets
244 .lock()
245 .unwrap()
246 .iter()
247 .filter(|(seq, _)| *seq > since_cursor)
248 .map(|(_, a)| a.clone())
249 .collect();
250
251 PublishRequest {
252 sender_id: self.local_peer_id.clone(),
253 assets,
254 since_cursor: if since_cursor > 0 {
255 Some(since_cursor.to_string())
256 } else {
257 None
258 },
259 resume_token: None,
260 }
261 }
262
263 pub fn receive_publish(&self, request: &PublishRequest) -> SyncAudit {
269 let batch_id = format!("batch-{}-{}", request.sender_id, now_unix_secs());
270 let mut applied = 0usize;
271 let mut skipped = 0usize;
272
273 for asset in &request.assets {
274 let asset_id = asset_id_of(asset);
275 let admitted = self
276 .quarantine
277 .admit(&asset_id, asset.clone(), &request.sender_id);
278 if admitted {
279 applied += 1;
280 } else {
281 skipped += 1;
282 }
283 }
284
285 if let Some(cursor_str) = &request.since_cursor {
287 if let Ok(seq) = cursor_str.parse::<u64>() {
288 let mut cursors = self.peer_cursors.lock().unwrap();
289 let entry = cursors.entry(request.sender_id.clone()).or_insert(0);
290 if seq > *entry {
291 *entry = seq;
292 }
293 }
294 }
295
296 {
297 let mut stats = self.stats.lock().unwrap();
298 stats.batches_processed += 1;
299 stats.assets_received += request.assets.len() as u64;
300 stats.assets_quarantined += applied as u64;
301 stats.assets_skipped_duplicate += skipped as u64;
302 }
303
304 SyncAudit {
305 batch_id,
306 requested_cursor: request.since_cursor.clone(),
307 scanned_count: request.assets.len(),
308 applied_count: applied,
309 skipped_count: skipped,
310 failed_count: 0,
311 failure_reasons: vec![],
312 }
313 }
314
315 pub fn build_fetch_query(&self, peer_id: &str, signals: Vec<String>) -> FetchQuery {
318 let cursor = self
319 .peer_cursors
320 .lock()
321 .unwrap()
322 .get(peer_id)
323 .copied()
324 .unwrap_or(0);
325
326 FetchQuery {
327 sender_id: self.local_peer_id.clone(),
328 signals,
329 since_cursor: if cursor > 0 {
330 Some(cursor.to_string())
331 } else {
332 None
333 },
334 resume_token: None,
335 }
336 }
337
338 pub fn receive_fetch_response(&self, peer_id: &str, response: &FetchResponse) -> SyncAudit {
342 let fake_request = PublishRequest {
343 sender_id: peer_id.to_string(),
344 assets: response.assets.clone(),
345 since_cursor: response.next_cursor.clone(),
346 resume_token: response.resume_token.clone(),
347 };
348 self.receive_publish(&fake_request)
349 }
350
351 pub fn validate_and_promote<F>(&self, asset_id: &str, validator: F) -> bool
357 where
358 F: FnOnce(&NetworkAsset) -> Result<(), String>,
359 {
360 let entry = match self.quarantine.get(asset_id) {
361 Some(e) => e,
362 None => return false,
363 };
364
365 match validator(&entry.asset) {
366 Ok(()) => {
367 self.quarantine.validate_asset(asset_id);
368 let mut stats = self.stats.lock().unwrap();
369 stats.assets_promoted += 1;
370 true
371 }
372 Err(reason) => {
373 self.quarantine.fail_asset(asset_id, &reason);
374 let mut stats = self.stats.lock().unwrap();
375 stats.assets_failed_validation += 1;
376 false
377 }
378 }
379 }
380
381 pub fn is_asset_selectable(&self, asset_id: &str) -> bool {
385 self.quarantine.is_selectable(asset_id)
386 }
387
388 pub fn pending_entries(&self) -> Vec<QuarantineEntry> {
390 self.quarantine.pending_entries()
391 }
392
393 pub fn stats(&self) -> SyncStats {
395 self.stats.lock().unwrap().clone()
396 }
397
398 pub fn peer_cursor(&self, peer_id: &str) -> u64 {
400 self.peer_cursors
401 .lock()
402 .unwrap()
403 .get(peer_id)
404 .copied()
405 .unwrap_or(0)
406 }
407}
408
409fn now_unix_secs() -> i64 {
414 std::time::SystemTime::now()
415 .duration_since(std::time::UNIX_EPOCH)
416 .map(|d| d.as_secs() as i64)
417 .unwrap_or(0)
418}
419
420fn asset_id_of(asset: &NetworkAsset) -> String {
422 match asset {
423 NetworkAsset::Gene { gene } => format!("gene:{}", gene.id),
424 NetworkAsset::Capsule { capsule } => format!("capsule:{}", capsule.id),
425 NetworkAsset::EvolutionEvent { event } => {
426 use sha2::{Digest, Sha256};
427 let payload = serde_json::to_vec(event).unwrap_or_default();
428 let mut hasher = Sha256::new();
429 hasher.update(payload);
430 format!("event:{}", hex::encode(hasher.finalize()))
431 }
432 }
433}
434
435#[cfg(test)]
440mod tests {
441 use super::*;
442 use oris_evolution::{AssetState, Gene};
443
444 fn make_gene(id: &str) -> NetworkAsset {
445 NetworkAsset::Gene {
446 gene: Gene {
447 id: id.to_string(),
448 signals: vec!["test.fail".into()],
449 strategy: vec!["fix test".into()],
450 validation: vec!["cargo test".into()],
451 state: AssetState::Promoted,
452 task_class_id: None,
453 },
454 }
455 }
456
457 #[test]
462 fn test_two_node_sync_end_to_end() {
463 let node_a = GossipSyncEngine::new("node-a");
464 let node_b = GossipSyncEngine::new("node-b");
465
466 let seq = node_a.publish_local(make_gene("gene-1"));
468 assert_eq!(seq, 1);
469
470 let req = node_a.build_publish_request(0);
472 assert_eq!(req.assets.len(), 1);
473 let audit = node_b.receive_publish(&req);
474 assert_eq!(audit.applied_count, 1);
475 assert_eq!(audit.skipped_count, 0);
476
477 let entry = node_b.quarantine.get("gene:gene-1").unwrap();
479 assert_eq!(entry.state, QuarantineState::Pending);
480 assert_eq!(entry.origin_peer, "node-a");
481 }
482
483 #[test]
484 fn test_incremental_cursor_sync() {
485 let node_a = GossipSyncEngine::new("node-a");
486 let node_b = GossipSyncEngine::new("node-b");
487
488 node_a.publish_local(make_gene("gene-1"));
490 node_a.publish_local(make_gene("gene-2"));
491
492 let req1 = node_a.build_publish_request(0);
494 node_b.receive_publish(&req1);
495 assert_eq!(node_b.quarantine.len(), 2);
496
497 node_a.publish_local(make_gene("gene-3"));
499
500 let req2 = node_a.build_publish_request(2);
502 let audit = node_b.receive_publish(&req2);
503 assert_eq!(audit.applied_count, 1);
505 assert_eq!(node_b.quarantine.len(), 3);
506 }
507
508 #[test]
513 fn test_quarantine_admit_and_validate() {
514 let store = QuarantineStore::new();
515 let asset = make_gene("g-1");
516
517 assert!(store.admit("gene:g-1", asset, "peer-a"));
518 assert_eq!(
519 store.get("gene:g-1").unwrap().state,
520 QuarantineState::Pending
521 );
522 assert!(!store.is_selectable("gene:g-1")); store.validate_asset("gene:g-1");
525 assert_eq!(
526 store.get("gene:g-1").unwrap().state,
527 QuarantineState::Validated
528 );
529 assert!(store.is_selectable("gene:g-1")); }
531
532 #[test]
533 fn test_quarantine_fail_asset() {
534 let store = QuarantineStore::new();
535 store.admit("gene:g-bad", make_gene("g-bad"), "peer-a");
536 store.fail_asset("gene:g-bad", "signature mismatch");
537
538 let entry = store.get("gene:g-bad").unwrap();
539 assert_eq!(entry.state, QuarantineState::Failed);
540 assert_eq!(entry.failure_reason.as_deref(), Some("signature mismatch"));
541 assert!(!store.is_selectable("gene:g-bad"));
542 }
543
544 #[test]
545 fn test_validate_and_promote_via_engine() {
546 let engine = GossipSyncEngine::new("node-b");
547 let req = PublishRequest {
548 sender_id: "node-a".into(),
549 assets: vec![make_gene("g-ok")],
550 since_cursor: None,
551 resume_token: None,
552 };
553 engine.receive_publish(&req);
554
555 let promoted = engine.validate_and_promote("gene:g-ok", |_| Ok(()));
556 assert!(promoted);
557 assert!(engine.is_asset_selectable("gene:g-ok"));
558 }
559
560 #[test]
561 fn test_validate_and_promote_failure_not_selectable() {
562 let engine = GossipSyncEngine::new("node-b");
563 let req = PublishRequest {
564 sender_id: "node-a".into(),
565 assets: vec![make_gene("g-invalid")],
566 since_cursor: None,
567 resume_token: None,
568 };
569 engine.receive_publish(&req);
570
571 let promoted = engine.validate_and_promote("gene:g-invalid", |_| Err("bad hash".into()));
572 assert!(!promoted);
573 assert!(!engine.is_asset_selectable("gene:g-invalid"));
574 }
575
576 #[test]
581 fn test_pending_gene_not_selectable_under_fault() {
582 let engine = GossipSyncEngine::new("node-b");
583 let req = PublishRequest {
586 sender_id: "node-a".into(),
587 assets: vec![make_gene("g-unvalidated")],
588 since_cursor: None,
589 resume_token: None,
590 };
591 engine.receive_publish(&req);
592
593 assert!(
595 !engine.is_asset_selectable("gene:g-unvalidated"),
596 "pending gene must not be selectable (failure-closed guarantee)"
597 );
598 assert_eq!(engine.pending_entries().len(), 1);
599 }
600
601 #[test]
602 fn test_unknown_asset_not_selectable() {
603 let engine = GossipSyncEngine::new("node-b");
604 assert!(!engine.is_asset_selectable("gene:nonexistent"));
605 }
606
607 #[test]
608 fn test_duplicate_admit_is_idempotent() {
609 let store = QuarantineStore::new();
610 assert!(store.admit("gene:g", make_gene("g"), "peer-a"));
611 store.validate_asset("gene:g");
612 assert!(!store.admit("gene:g", make_gene("g"), "peer-b"));
614 assert_eq!(
615 store.get("gene:g").unwrap().state,
616 QuarantineState::Validated
617 );
618 }
619
620 #[test]
621 fn test_stats_accumulate_correctly() {
622 let engine = GossipSyncEngine::new("me");
623 let req = PublishRequest {
624 sender_id: "peer".into(),
625 assets: vec![make_gene("g1"), make_gene("g2")],
626 since_cursor: None,
627 resume_token: None,
628 };
629 engine.receive_publish(&req);
630 engine.validate_and_promote("gene:g1", |_| Ok(()));
631 engine.validate_and_promote("gene:g2", |_| Err("bad".into()));
632
633 let s = engine.stats();
634 assert_eq!(s.assets_quarantined, 2);
635 assert_eq!(s.assets_promoted, 1);
636 assert_eq!(s.assets_failed_validation, 1);
637 }
638}