1use std::collections::HashMap;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::Instant;
11
12use crate::{EvolutionEnvelope, FetchQuery, FetchResponse, NetworkAsset, SyncAudit};
13use chrono::Utc;
14use oris_evolution::Gene;
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
19pub struct PeerConfig {
20 pub peers: Vec<PeerEndpoint>,
22 #[serde(default = "default_heartbeat_interval")]
24 pub heartbeat_interval_secs: u64,
25 #[serde(default = "default_peer_timeout_secs")]
27 pub peer_timeout_secs: u64,
28 #[serde(default = "default_fanout")]
30 pub gossip_fanout: usize,
31}
32
33fn default_heartbeat_interval() -> u64 {
34 30
35}
36fn default_peer_timeout_secs() -> u64 {
37 10
38}
39fn default_fanout() -> usize {
40 3
41}
42
43#[derive(Clone, Debug, Deserialize, Serialize)]
45pub struct PeerEndpoint {
46 pub peer_id: String,
48 pub endpoint: String,
50 pub public_key: Option<String>,
52}
53
54#[derive(Clone, Debug, PartialEq)]
56pub enum PeerStatus {
57 Active,
59 Suspected,
61 Offline,
63}
64
65#[derive(Clone, Debug)]
67pub struct PeerInfo {
68 pub endpoint: PeerEndpoint,
69 pub status: PeerStatus,
70 pub last_seen: Instant,
71 pub last_heartbeat: Option<Instant>,
72 pub failure_count: u32,
73}
74
75impl PeerInfo {
76 pub fn new(endpoint: PeerEndpoint) -> Self {
77 Self {
78 endpoint,
79 status: PeerStatus::Active,
80 last_seen: Instant::now(),
81 last_heartbeat: None,
82 failure_count: 0,
83 }
84 }
85
86 pub fn mark_failure(&mut self) {
87 self.failure_count += 1;
88 if self.failure_count >= 3 {
89 self.status = PeerStatus::Offline;
90 } else {
91 self.status = PeerStatus::Suspected;
92 }
93 }
94
95 pub fn mark_success(&mut self) {
96 self.failure_count = 0;
97 self.status = PeerStatus::Active;
98 self.last_seen = Instant::now();
99 }
100}
101
102#[derive(Clone, Debug, Deserialize, Serialize)]
104pub struct GossipMessage {
105 pub message_id: String,
107 pub origin_peer: String,
109 pub sequence: u64,
111 pub kind: GossipKind,
113 pub timestamp: String,
115 pub payload: String,
117}
118
119#[derive(Clone, Debug, Deserialize, Serialize)]
121#[serde(tag = "type", rename_all = "snake_case")]
122pub enum GossipKind {
123 Advertisement { peer_id: String, endpoint: String },
125 AssetUpdate {
127 asset_id: String,
128 asset_type: String,
129 },
130 SyncRequest { since_sequence: u64 },
132 SyncResponse { assets: Vec<String> },
134 Leave { peer_id: String },
136}
137
138#[derive(Clone)]
140pub struct PeerRegistry {
141 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
142 config: PeerConfig,
143 local_peer_id: String,
144}
145
146impl PeerRegistry {
147 pub fn new(config: PeerConfig, local_peer_id: String) -> Self {
149 let peers: HashMap<String, PeerInfo> = config
150 .peers
151 .iter()
152 .map(|e| (e.peer_id.clone(), PeerInfo::new(e.clone())))
153 .collect();
154
155 Self {
156 peers: Arc::new(RwLock::new(peers)),
157 config,
158 local_peer_id,
159 }
160 }
161
162 pub fn get_active_peers(&self) -> Vec<PeerEndpoint> {
164 self.peers
165 .read()
166 .unwrap()
167 .values()
168 .filter(|p| p.status == PeerStatus::Active)
169 .map(|p| p.endpoint.clone())
170 .collect()
171 }
172
173 pub fn get_gossip_peers(&self, count: usize) -> Vec<PeerEndpoint> {
175 let peers = self.peers.read().unwrap();
176 let active: Vec<_> = peers
177 .values()
178 .filter(|p| p.status == PeerStatus::Active)
179 .filter(|p| p.endpoint.peer_id != self.local_peer_id)
180 .map(|p| p.endpoint.clone())
181 .collect();
182
183 if active.is_empty() {
184 return vec![];
185 }
186
187 let count = count.min(active.len());
189 active.into_iter().take(count).collect()
190 }
191
192 pub fn update_peer_status(&self, peer_id: &str, is_alive: bool) {
194 let mut peers = self.peers.write().unwrap();
195 if let Some(peer) = peers.get_mut(peer_id) {
196 if is_alive {
197 peer.mark_success();
198 peer.last_heartbeat = Some(Instant::now());
199 } else {
200 peer.mark_failure();
201 }
202 }
203 }
204
205 pub fn add_peer(&self, endpoint: PeerEndpoint) {
207 let mut peers = self.peers.write().unwrap();
208 if !peers.contains_key(&endpoint.peer_id) {
209 peers.insert(endpoint.peer_id.clone(), PeerInfo::new(endpoint));
210 }
211 }
212
213 pub fn remove_peer(&self, peer_id: &str) {
215 let mut peers = self.peers.write().unwrap();
216 peers.remove(peer_id);
217 }
218
219 pub fn local_peer_id(&self) -> &str {
221 &self.local_peer_id
222 }
223
224 pub fn config(&self) -> &PeerConfig {
226 &self.config
227 }
228}
229
230pub struct GossipBuilder {
232 origin_peer: String,
233 sequence: u64,
234 kind: Option<GossipKind>,
235 payload: Option<String>,
236}
237
238impl GossipBuilder {
239 pub fn new(origin_peer: String, sequence: u64) -> Self {
240 Self {
241 origin_peer,
242 sequence,
243 kind: None,
244 payload: None,
245 }
246 }
247
248 pub fn advertisement(mut self, peer_id: String, endpoint: String) -> Self {
249 self.kind = Some(GossipKind::Advertisement { peer_id, endpoint });
250 self
251 }
252
253 pub fn asset_update(mut self, asset_id: String, asset_type: String) -> Self {
254 self.kind = Some(GossipKind::AssetUpdate {
255 asset_id,
256 asset_type,
257 });
258 self
259 }
260
261 pub fn sync_request(mut self, since_sequence: u64) -> Self {
262 self.kind = Some(GossipKind::SyncRequest { since_sequence });
263 self
264 }
265
266 pub fn sync_response(mut self, assets: Vec<String>) -> Self {
267 self.kind = Some(GossipKind::SyncResponse { assets });
268 self
269 }
270
271 pub fn leave(mut self, peer_id: String) -> Self {
272 self.kind = Some(GossipKind::Leave { peer_id });
273 self
274 }
275
276 pub fn payload(mut self, payload: String) -> Self {
277 self.payload = Some(payload);
278 self
279 }
280
281 pub fn build(self) -> Option<GossipMessage> {
282 let kind = self.kind?;
283 let payload = self
284 .payload
285 .unwrap_or_else(|| serde_json::to_string(&kind).unwrap_or_default());
286
287 Some(GossipMessage {
288 message_id: format!(
289 "gossip-{:x}",
290 Utc::now().timestamp_nanos_opt().unwrap_or_default()
291 ),
292 origin_peer: self.origin_peer,
293 sequence: self.sequence,
294 kind,
295 timestamp: Utc::now().to_rfc3339(),
296 payload,
297 })
298 }
299}
300
301pub type PeerAddress = String;
302
303fn default_sync_interval_secs() -> u64 {
304 30
305}
306
307fn default_broadcast_threshold() -> f32 {
308 0.8
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize)]
312pub struct GossipConfig {
313 #[serde(default)]
314 pub peers: Vec<PeerAddress>,
315 #[serde(default = "default_sync_interval_secs")]
316 pub sync_interval_secs: u64,
317 #[serde(default = "default_broadcast_threshold")]
318 pub broadcast_threshold: f32,
319}
320
321impl Default for GossipConfig {
322 fn default() -> Self {
323 Self {
324 peers: Vec::new(),
325 sync_interval_secs: default_sync_interval_secs(),
326 broadcast_threshold: default_broadcast_threshold(),
327 }
328 }
329}
330
331#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
332pub struct GossipDigestEntry {
333 pub gene_id: String,
334 pub confidence: f32,
335 pub version: u64,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339pub struct GossipDigest {
340 pub sender_id: String,
341 #[serde(default)]
342 pub genes: Vec<GossipDigestEntry>,
343}
344
345#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
346pub struct GossipSyncReport {
347 pub requested_gene_ids: Vec<String>,
348 pub imported_gene_ids: Vec<String>,
349}
350
351#[derive(Clone, Debug)]
352struct LocalGeneRecord {
353 envelope: EvolutionEnvelope,
354 confidence: f32,
355 version: u64,
356}
357
358pub struct GossipSyncEngine {
360 local_peer_id: String,
361 peers: Vec<PeerAddress>,
362 config: GossipConfig,
363 records: Arc<RwLock<HashMap<String, LocalGeneRecord>>>,
364 next_version: Mutex<u64>,
365}
366
367impl GossipSyncEngine {
368 pub fn new(local_peer_id: impl Into<String>, config: GossipConfig) -> Self {
369 Self {
370 local_peer_id: local_peer_id.into(),
371 peers: config.peers.clone(),
372 config,
373 records: Arc::new(RwLock::new(HashMap::new())),
374 next_version: Mutex::new(0),
375 }
376 }
377
378 pub fn peers(&self) -> &[PeerAddress] {
379 &self.peers
380 }
381
382 pub fn config(&self) -> &GossipConfig {
383 &self.config
384 }
385
386 pub fn local_peer_id(&self) -> &str {
387 &self.local_peer_id
388 }
389
390 pub fn has_gene(&self, gene_id: &str) -> bool {
391 self.records.read().unwrap().contains_key(gene_id)
392 }
393
394 pub fn gene_version(&self, gene_id: &str) -> Option<u64> {
395 self.records.read().unwrap().get(gene_id).map(|r| r.version)
396 }
397
398 pub fn register_envelope(&self, envelope: EvolutionEnvelope) -> usize {
399 let genes: Vec<Gene> = envelope
400 .assets
401 .iter()
402 .filter_map(|asset| match asset {
403 NetworkAsset::Gene { gene } => Some(gene.clone()),
404 _ => None,
405 })
406 .collect();
407
408 let mut version_counter = self.next_version.lock().unwrap();
409 let mut records = self.records.write().unwrap();
410 let mut inserted = 0;
411 for gene in genes {
412 *version_counter += 1;
413 let confidence = confidence_for_gene(&envelope.assets, &gene.id);
414 records.insert(
415 gene.id.clone(),
416 LocalGeneRecord {
417 envelope: envelope.clone(),
418 confidence,
419 version: *version_counter,
420 },
421 );
422 inserted += 1;
423 }
424 inserted
425 }
426
427 pub fn build_digest(&self) -> GossipDigest {
428 let genes = self
429 .records
430 .read()
431 .unwrap()
432 .iter()
433 .filter(|(_, record)| record.confidence >= self.config.broadcast_threshold)
434 .map(|(gene_id, record)| GossipDigestEntry {
435 gene_id: gene_id.clone(),
436 confidence: record.confidence,
437 version: record.version,
438 })
439 .collect();
440
441 GossipDigest {
442 sender_id: self.local_peer_id.clone(),
443 genes,
444 }
445 }
446
447 pub fn build_fetch_query_for_digest(&self, digest: &GossipDigest) -> FetchQuery {
448 let local = self.records.read().unwrap();
449 let requested_gene_ids = digest
450 .genes
451 .iter()
452 .filter(|entry| {
453 local
454 .get(&entry.gene_id)
455 .map(|record| record.version < entry.version)
456 .unwrap_or(true)
457 })
458 .map(|entry| entry.gene_id.clone())
459 .collect::<Vec<_>>();
460
461 FetchQuery {
462 sender_id: self.local_peer_id.clone(),
463 signals: requested_gene_ids,
464 since_cursor: None,
465 resume_token: None,
466 }
467 }
468
469 pub fn respond_to_fetch(&self, query: &FetchQuery) -> FetchResponse {
470 let records = self.records.read().unwrap();
471 let mut assets = Vec::new();
472 let mut applied = 0usize;
473 for gene_id in &query.signals {
474 if let Some(record) = records.get(gene_id) {
475 assets.extend(record.envelope.assets.clone());
476 applied += 1;
477 }
478 }
479
480 FetchResponse {
481 sender_id: self.local_peer_id.clone(),
482 assets,
483 next_cursor: None,
484 resume_token: None,
485 sync_audit: SyncAudit {
486 batch_id: format!(
487 "gossip-fetch-{}-{}",
488 self.local_peer_id,
489 Utc::now().timestamp()
490 ),
491 requested_cursor: None,
492 scanned_count: query.signals.len(),
493 applied_count: applied,
494 skipped_count: query.signals.len().saturating_sub(applied),
495 failed_count: 0,
496 failure_reasons: vec![],
497 },
498 }
499 }
500
501 pub fn apply_fetch_response(&self, response: &FetchResponse) -> Vec<String> {
502 if response.assets.is_empty() {
503 return vec![];
504 }
505 let envelope =
506 EvolutionEnvelope::publish(response.sender_id.clone(), response.assets.clone());
507 let imported = envelope
508 .assets
509 .iter()
510 .filter_map(|asset| match asset {
511 NetworkAsset::Gene { gene } => Some(gene.id.clone()),
512 _ => None,
513 })
514 .collect::<Vec<_>>();
515 let _ = self.register_envelope(envelope);
516 imported
517 }
518
519 pub async fn sync_once_with(&self, remote: &GossipSyncEngine) -> GossipSyncReport {
520 let digest = remote.build_digest();
521 let query = self.build_fetch_query_for_digest(&digest);
522 let requested_gene_ids = query.signals.clone();
523 let response = remote.respond_to_fetch(&query);
524 let imported_gene_ids = self.apply_fetch_response(&response);
525 GossipSyncReport {
526 requested_gene_ids,
527 imported_gene_ids,
528 }
529 }
530
531 pub async fn start_sync_loop(self) -> tokio::task::JoinHandle<()> {
532 tokio::spawn(async move {
533 let interval = std::time::Duration::from_secs(self.config.sync_interval_secs.max(1));
534 loop {
535 tokio::time::sleep(interval).await;
536 }
537 })
538 }
539
540 pub fn serialize_digest_json(digest: &GossipDigest) -> Result<Vec<u8>, serde_json::Error> {
541 serde_json::to_vec(digest)
542 }
543
544 #[cfg(feature = "gossip-msgpack")]
545 pub fn serialize_digest_msgpack(
546 digest: &GossipDigest,
547 ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
548 rmp_serde::to_vec_named(digest)
549 }
550
551 #[cfg(feature = "gossip-msgpack")]
552 pub fn deserialize_digest_msgpack(
553 bytes: &[u8],
554 ) -> Result<GossipDigest, rmp_serde::decode::Error> {
555 rmp_serde::from_slice(bytes)
556 }
557}
558
559fn confidence_for_gene(assets: &[NetworkAsset], gene_id: &str) -> f32 {
560 assets
561 .iter()
562 .filter_map(|asset| match asset {
563 NetworkAsset::Capsule { capsule } if capsule.gene_id == gene_id => {
564 Some(capsule.confidence)
565 }
566 _ => None,
567 })
568 .fold(0.0_f32, f32::max)
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::EvolutionEnvelope;
575 use oris_evolution::{AssetState, Capsule, EnvFingerprint, Outcome};
576
577 fn sample_gene_asset(id: &str) -> NetworkAsset {
578 NetworkAsset::Gene {
579 gene: Gene {
580 id: id.to_string(),
581 signals: vec!["compiler:error[E0308]".to_string()],
582 strategy: vec!["fix type mismatch".to_string()],
583 validation: vec!["cargo test".to_string()],
584 state: AssetState::Promoted,
585 task_class_id: None,
586 },
587 }
588 }
589
590 fn sample_capsule_asset(id: &str, gene_id: &str, confidence: f32) -> NetworkAsset {
591 NetworkAsset::Capsule {
592 capsule: Capsule {
593 id: id.to_string(),
594 gene_id: gene_id.to_string(),
595 mutation_id: format!("mut-{id}"),
596 run_id: format!("run-{id}"),
597 diff_hash: format!("diff-{id}"),
598 confidence,
599 env: EnvFingerprint {
600 rustc_version: "rustc 1.80.0".to_string(),
601 cargo_lock_hash: "cargo-lock".to_string(),
602 target_triple: "aarch64-apple-darwin".to_string(),
603 os: "macos".to_string(),
604 },
605 outcome: Outcome {
606 success: true,
607 validation_profile: "default".to_string(),
608 validation_duration_ms: 100,
609 changed_files: vec!["src/lib.rs".to_string()],
610 validator_hash: "validator".to_string(),
611 lines_changed: 3,
612 replay_verified: true,
613 },
614 state: AssetState::Promoted,
615 },
616 }
617 }
618
619 fn sample_envelope(gene_id: &str, confidence: f32) -> EvolutionEnvelope {
620 EvolutionEnvelope::publish(
621 "node-a",
622 vec![
623 sample_gene_asset(gene_id),
624 sample_capsule_asset(&format!("capsule-{gene_id}"), gene_id, confidence),
625 ],
626 )
627 }
628
629 #[test]
630 fn test_peer_registry_creation() {
631 let config = PeerConfig {
632 peers: vec![
633 PeerEndpoint {
634 peer_id: "peer1".into(),
635 endpoint: "http://peer1:8080".into(),
636 public_key: None,
637 },
638 PeerEndpoint {
639 peer_id: "peer2".into(),
640 endpoint: "http://peer2:8080".into(),
641 public_key: None,
642 },
643 ],
644 heartbeat_interval_secs: 30,
645 peer_timeout_secs: 10,
646 gossip_fanout: 3,
647 };
648
649 let registry = PeerRegistry::new(config, "local-peer".to_string());
650 let active = registry.get_active_peers();
651 assert_eq!(active.len(), 2);
652 }
653
654 #[test]
655 fn test_peer_failure_tracking() {
656 let config = PeerConfig {
657 peers: vec![PeerEndpoint {
658 peer_id: "peer1".into(),
659 endpoint: "http://peer1:8080".into(),
660 public_key: None,
661 }],
662 heartbeat_interval_secs: 30,
663 peer_timeout_secs: 10,
664 gossip_fanout: 3,
665 };
666
667 let registry = PeerRegistry::new(config, "local-peer".into());
668
669 registry.update_peer_status("peer1", false);
671 registry.update_peer_status("peer1", false);
672
673 let peers = registry.get_active_peers();
674 assert!(peers.is_empty()); registry.update_peer_status("peer1", true);
678 let peers = registry.get_active_peers();
679 assert_eq!(peers.len(), 1);
680 }
681
682 #[test]
683 fn test_gossip_builder() {
684 let msg = GossipBuilder::new("peer1".to_string(), 1)
685 .asset_update("asset-123".to_string(), "gene".to_string())
686 .build();
687
688 assert!(msg.is_some());
689 let msg = msg.unwrap();
690 assert_eq!(msg.origin_peer, "peer1");
691 assert_eq!(msg.sequence, 1);
692 }
693
694 #[test]
695 fn gossip_digest_only_includes_genes_above_threshold() {
696 let engine = GossipSyncEngine::new(
697 "node-a",
698 GossipConfig {
699 peers: vec!["node-b".into()],
700 sync_interval_secs: 30,
701 broadcast_threshold: 0.8,
702 },
703 );
704 engine.register_envelope(sample_envelope("gene-high", 0.92));
705 engine.register_envelope(sample_envelope("gene-low", 0.42));
706
707 let digest = engine.build_digest();
708 assert_eq!(digest.genes.len(), 1);
709 assert_eq!(digest.genes[0].gene_id, "gene-high");
710 assert!(digest.genes[0].confidence >= 0.8);
711 }
712
713 #[test]
714 fn fetch_query_response_round_trip_returns_requested_gene() {
715 let producer = GossipSyncEngine::new("node-a", GossipConfig::default());
716 producer.register_envelope(sample_envelope("gene-1", 0.95));
717
718 let query = FetchQuery {
719 sender_id: "node-b".into(),
720 signals: vec!["gene-1".into()],
721 since_cursor: None,
722 resume_token: None,
723 };
724 let response = producer.respond_to_fetch(&query);
725
726 assert_eq!(response.sender_id, "node-a");
727 assert!(!response.assets.is_empty());
728 assert_eq!(response.sync_audit.applied_count, 1);
729 assert!(response.assets.iter().any(|asset| matches!(
730 asset,
731 NetworkAsset::Gene { gene } if gene.id == "gene-1"
732 )));
733 }
734
735 #[tokio::test]
736 async fn two_in_process_gossip_sync_engines_exchange_gene_within_one_cycle() {
737 let producer = GossipSyncEngine::new(
738 "node-a",
739 GossipConfig {
740 peers: vec!["node-b".into()],
741 sync_interval_secs: 30,
742 broadcast_threshold: 0.8,
743 },
744 );
745 let consumer = GossipSyncEngine::new(
746 "node-b",
747 GossipConfig {
748 peers: vec!["node-a".into()],
749 sync_interval_secs: 30,
750 broadcast_threshold: 0.8,
751 },
752 );
753
754 producer.register_envelope(sample_envelope("gene-sync", 0.91));
755 assert!(!consumer.has_gene("gene-sync"));
756
757 let report = consumer.sync_once_with(&producer).await;
758 assert_eq!(report.requested_gene_ids, vec!["gene-sync".to_string()]);
759 assert_eq!(report.imported_gene_ids, vec!["gene-sync".to_string()]);
760 assert!(consumer.has_gene("gene-sync"));
761 }
762}