1use std::collections::HashMap;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::Instant;
11
12use crate::{EvolutionEnvelope, FetchQuery, FetchResponse, NetworkAsset, SyncAudit};
13use chrono::Utc;
14use oris_evolution::Gene;
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, Deserialize, Serialize)]
19pub struct PeerConfig {
20 pub peers: Vec<PeerEndpoint>,
22 #[serde(default = "default_heartbeat_interval")]
24 pub heartbeat_interval_secs: u64,
25 #[serde(default = "default_peer_timeout_secs")]
27 pub peer_timeout_secs: u64,
28 #[serde(default = "default_fanout")]
30 pub gossip_fanout: usize,
31}
32
33fn default_heartbeat_interval() -> u64 {
34 30
35}
36fn default_peer_timeout_secs() -> u64 {
37 10
38}
39fn default_fanout() -> usize {
40 3
41}
42
43#[derive(Clone, Debug, Deserialize, Serialize)]
45pub struct PeerEndpoint {
46 pub peer_id: String,
48 pub endpoint: String,
50 pub public_key: Option<String>,
52}
53
54#[derive(Clone, Debug, PartialEq)]
56pub enum PeerStatus {
57 Active,
59 Suspected,
61 Offline,
63}
64
65#[derive(Clone, Debug)]
67pub struct PeerInfo {
68 pub endpoint: PeerEndpoint,
69 pub status: PeerStatus,
70 pub last_seen: Instant,
71 pub last_heartbeat: Option<Instant>,
72 pub failure_count: u32,
73}
74
75impl PeerInfo {
76 pub fn new(endpoint: PeerEndpoint) -> Self {
77 Self {
78 endpoint,
79 status: PeerStatus::Active,
80 last_seen: Instant::now(),
81 last_heartbeat: None,
82 failure_count: 0,
83 }
84 }
85
86 pub fn mark_failure(&mut self) {
87 self.failure_count += 1;
88 if self.failure_count >= 3 {
89 self.status = PeerStatus::Offline;
90 } else {
91 self.status = PeerStatus::Suspected;
92 }
93 }
94
95 pub fn mark_success(&mut self) {
96 self.failure_count = 0;
97 self.status = PeerStatus::Active;
98 self.last_seen = Instant::now();
99 }
100}
101
102#[derive(Clone, Debug, Deserialize, Serialize)]
104pub struct GossipMessage {
105 pub message_id: String,
107 pub origin_peer: String,
109 pub sequence: u64,
111 pub kind: GossipKind,
113 pub timestamp: String,
115 pub payload: String,
117}
118
119#[derive(Clone, Debug, Deserialize, Serialize)]
121#[serde(tag = "type", rename_all = "snake_case")]
122pub enum GossipKind {
123 Advertisement { peer_id: String, endpoint: String },
125 AssetUpdate {
127 asset_id: String,
128 asset_type: String,
129 },
130 SyncRequest { since_sequence: u64 },
132 SyncResponse { assets: Vec<String> },
134 Leave { peer_id: String },
136}
137
138#[derive(Clone)]
140pub struct PeerRegistry {
141 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
142 config: PeerConfig,
143 local_peer_id: String,
144}
145
146impl PeerRegistry {
147 pub fn new(config: PeerConfig, local_peer_id: String) -> Self {
149 let peers: HashMap<String, PeerInfo> = config
150 .peers
151 .iter()
152 .map(|e| (e.peer_id.clone(), PeerInfo::new(e.clone())))
153 .collect();
154
155 Self {
156 peers: Arc::new(RwLock::new(peers)),
157 config,
158 local_peer_id,
159 }
160 }
161
162 pub fn get_active_peers(&self) -> Vec<PeerEndpoint> {
164 self.peers
165 .read()
166 .unwrap()
167 .values()
168 .filter(|p| p.status == PeerStatus::Active)
169 .map(|p| p.endpoint.clone())
170 .collect()
171 }
172
173 pub fn get_gossip_peers(&self, count: usize) -> Vec<PeerEndpoint> {
175 let peers = self.peers.read().unwrap();
176 let active: Vec<_> = peers
177 .values()
178 .filter(|p| p.status == PeerStatus::Active)
179 .filter(|p| p.endpoint.peer_id != self.local_peer_id)
180 .map(|p| p.endpoint.clone())
181 .collect();
182
183 if active.is_empty() {
184 return vec![];
185 }
186
187 let count = count.min(active.len());
189 active.into_iter().take(count).collect()
190 }
191
192 pub fn update_peer_status(&self, peer_id: &str, is_alive: bool) {
194 let mut peers = self.peers.write().unwrap();
195 if let Some(peer) = peers.get_mut(peer_id) {
196 if is_alive {
197 peer.mark_success();
198 peer.last_heartbeat = Some(Instant::now());
199 } else {
200 peer.mark_failure();
201 }
202 }
203 }
204
205 pub fn add_peer(&self, endpoint: PeerEndpoint) {
207 let mut peers = self.peers.write().unwrap();
208 if !peers.contains_key(&endpoint.peer_id) {
209 peers.insert(endpoint.peer_id.clone(), PeerInfo::new(endpoint));
210 }
211 }
212
213 pub fn remove_peer(&self, peer_id: &str) {
215 let mut peers = self.peers.write().unwrap();
216 peers.remove(peer_id);
217 }
218
219 pub fn local_peer_id(&self) -> &str {
221 &self.local_peer_id
222 }
223
224 pub fn config(&self) -> &PeerConfig {
226 &self.config
227 }
228}
229
230pub struct GossipBuilder {
232 origin_peer: String,
233 sequence: u64,
234 kind: Option<GossipKind>,
235 payload: Option<String>,
236}
237
238impl GossipBuilder {
239 pub fn new(origin_peer: String, sequence: u64) -> Self {
240 Self {
241 origin_peer,
242 sequence,
243 kind: None,
244 payload: None,
245 }
246 }
247
248 pub fn advertisement(mut self, peer_id: String, endpoint: String) -> Self {
249 self.kind = Some(GossipKind::Advertisement { peer_id, endpoint });
250 self
251 }
252
253 pub fn asset_update(mut self, asset_id: String, asset_type: String) -> Self {
254 self.kind = Some(GossipKind::AssetUpdate {
255 asset_id,
256 asset_type,
257 });
258 self
259 }
260
261 pub fn sync_request(mut self, since_sequence: u64) -> Self {
262 self.kind = Some(GossipKind::SyncRequest { since_sequence });
263 self
264 }
265
266 pub fn sync_response(mut self, assets: Vec<String>) -> Self {
267 self.kind = Some(GossipKind::SyncResponse { assets });
268 self
269 }
270
271 pub fn leave(mut self, peer_id: String) -> Self {
272 self.kind = Some(GossipKind::Leave { peer_id });
273 self
274 }
275
276 pub fn payload(mut self, payload: String) -> Self {
277 self.payload = Some(payload);
278 self
279 }
280
281 pub fn build(self) -> Option<GossipMessage> {
282 let kind = self.kind?;
283 let payload = self
284 .payload
285 .unwrap_or_else(|| serde_json::to_string(&kind).unwrap_or_default());
286
287 Some(GossipMessage {
288 message_id: format!(
289 "gossip-{:x}",
290 Utc::now().timestamp_nanos_opt().unwrap_or_default()
291 ),
292 origin_peer: self.origin_peer,
293 sequence: self.sequence,
294 kind,
295 timestamp: Utc::now().to_rfc3339(),
296 payload,
297 })
298 }
299}
300
301pub type PeerAddress = String;
302
303fn default_sync_interval_secs() -> u64 {
304 30
305}
306
307fn default_broadcast_threshold() -> f32 {
308 0.8
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize)]
312pub struct GossipConfig {
313 #[serde(default)]
314 pub peers: Vec<PeerAddress>,
315 #[serde(default = "default_sync_interval_secs")]
316 pub sync_interval_secs: u64,
317 #[serde(default = "default_broadcast_threshold")]
318 pub broadcast_threshold: f32,
319}
320
321impl Default for GossipConfig {
322 fn default() -> Self {
323 Self {
324 peers: Vec::new(),
325 sync_interval_secs: default_sync_interval_secs(),
326 broadcast_threshold: default_broadcast_threshold(),
327 }
328 }
329}
330
331#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
332pub struct GossipDigestEntry {
333 pub gene_id: String,
334 pub confidence: f32,
335 pub version: u64,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339pub struct GossipDigest {
340 pub sender_id: String,
341 #[serde(default)]
342 pub genes: Vec<GossipDigestEntry>,
343}
344
345#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
346pub struct GossipSyncReport {
347 pub requested_gene_ids: Vec<String>,
348 pub imported_gene_ids: Vec<String>,
349}
350
351#[derive(Clone, Debug)]
352struct LocalGeneRecord {
353 envelope: EvolutionEnvelope,
354 confidence: f32,
355 version: u64,
356}
357
358pub struct GossipSyncEngine {
360 local_peer_id: String,
361 peers: Vec<PeerAddress>,
362 config: GossipConfig,
363 records: Arc<RwLock<HashMap<String, LocalGeneRecord>>>,
364 next_version: Mutex<u64>,
365}
366
367impl GossipSyncEngine {
368 pub fn new(local_peer_id: impl Into<String>, config: GossipConfig) -> Self {
369 Self {
370 local_peer_id: local_peer_id.into(),
371 peers: config.peers.clone(),
372 config,
373 records: Arc::new(RwLock::new(HashMap::new())),
374 next_version: Mutex::new(0),
375 }
376 }
377
378 pub fn peers(&self) -> &[PeerAddress] {
379 &self.peers
380 }
381
382 pub fn config(&self) -> &GossipConfig {
383 &self.config
384 }
385
386 pub fn local_peer_id(&self) -> &str {
387 &self.local_peer_id
388 }
389
390 pub fn has_gene(&self, gene_id: &str) -> bool {
391 self.records.read().unwrap().contains_key(gene_id)
392 }
393
394 pub fn gene_version(&self, gene_id: &str) -> Option<u64> {
395 self.records.read().unwrap().get(gene_id).map(|r| r.version)
396 }
397
398 pub fn register_envelope(&self, envelope: EvolutionEnvelope) -> usize {
399 let genes: Vec<Gene> = envelope
400 .assets
401 .iter()
402 .filter_map(|asset| match asset {
403 NetworkAsset::Gene { gene } => Some(gene.clone()),
404 _ => None,
405 })
406 .collect();
407
408 let mut version_counter = self.next_version.lock().unwrap();
409 let mut records = self.records.write().unwrap();
410 let mut inserted = 0;
411 for gene in genes {
412 *version_counter += 1;
413 let confidence = confidence_for_gene(&envelope.assets, &gene.id);
414 records.insert(
415 gene.id.clone(),
416 LocalGeneRecord {
417 envelope: envelope.clone(),
418 confidence,
419 version: *version_counter,
420 },
421 );
422 inserted += 1;
423 }
424 inserted
425 }
426
427 pub fn build_digest(&self) -> GossipDigest {
428 let genes = self
429 .records
430 .read()
431 .unwrap()
432 .iter()
433 .filter(|(_, record)| record.confidence >= self.config.broadcast_threshold)
434 .map(|(gene_id, record)| GossipDigestEntry {
435 gene_id: gene_id.clone(),
436 confidence: record.confidence,
437 version: record.version,
438 })
439 .collect();
440
441 GossipDigest {
442 sender_id: self.local_peer_id.clone(),
443 genes,
444 }
445 }
446
447 pub fn build_fetch_query_for_digest(&self, digest: &GossipDigest) -> FetchQuery {
448 let local = self.records.read().unwrap();
449 let requested_gene_ids = digest
450 .genes
451 .iter()
452 .filter(|entry| {
453 local
454 .get(&entry.gene_id)
455 .map(|record| record.version < entry.version)
456 .unwrap_or(true)
457 })
458 .map(|entry| entry.gene_id.clone())
459 .collect::<Vec<_>>();
460
461 FetchQuery {
462 sender_id: self.local_peer_id.clone(),
463 signals: requested_gene_ids,
464 since_cursor: None,
465 resume_token: None,
466 }
467 }
468
469 pub fn respond_to_fetch(&self, query: &FetchQuery) -> FetchResponse {
470 let records = self.records.read().unwrap();
471 let mut assets = Vec::new();
472 let mut applied = 0usize;
473 for gene_id in &query.signals {
474 if let Some(record) = records.get(gene_id) {
475 assets.extend(record.envelope.assets.clone());
476 applied += 1;
477 }
478 }
479
480 FetchResponse {
481 sender_id: self.local_peer_id.clone(),
482 assets,
483 next_cursor: None,
484 resume_token: None,
485 sync_audit: SyncAudit {
486 batch_id: format!("gossip-fetch-{}-{}", self.local_peer_id, Utc::now().timestamp()),
487 requested_cursor: None,
488 scanned_count: query.signals.len(),
489 applied_count: applied,
490 skipped_count: query.signals.len().saturating_sub(applied),
491 failed_count: 0,
492 failure_reasons: vec![],
493 },
494 }
495 }
496
497 pub fn apply_fetch_response(&self, response: &FetchResponse) -> Vec<String> {
498 if response.assets.is_empty() {
499 return vec![];
500 }
501 let envelope = EvolutionEnvelope::publish(response.sender_id.clone(), response.assets.clone());
502 let imported = envelope
503 .assets
504 .iter()
505 .filter_map(|asset| match asset {
506 NetworkAsset::Gene { gene } => Some(gene.id.clone()),
507 _ => None,
508 })
509 .collect::<Vec<_>>();
510 let _ = self.register_envelope(envelope);
511 imported
512 }
513
514 pub async fn sync_once_with(&self, remote: &GossipSyncEngine) -> GossipSyncReport {
515 let digest = remote.build_digest();
516 let query = self.build_fetch_query_for_digest(&digest);
517 let requested_gene_ids = query.signals.clone();
518 let response = remote.respond_to_fetch(&query);
519 let imported_gene_ids = self.apply_fetch_response(&response);
520 GossipSyncReport {
521 requested_gene_ids,
522 imported_gene_ids,
523 }
524 }
525
526 pub async fn start_sync_loop(self) -> tokio::task::JoinHandle<()> {
527 tokio::spawn(async move {
528 let interval = std::time::Duration::from_secs(self.config.sync_interval_secs.max(1));
529 loop {
530 tokio::time::sleep(interval).await;
531 }
532 })
533 }
534
535 pub fn serialize_digest_json(digest: &GossipDigest) -> Result<Vec<u8>, serde_json::Error> {
536 serde_json::to_vec(digest)
537 }
538
539 #[cfg(feature = "gossip-msgpack")]
540 pub fn serialize_digest_msgpack(
541 digest: &GossipDigest,
542 ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
543 rmp_serde::to_vec_named(digest)
544 }
545
546 #[cfg(feature = "gossip-msgpack")]
547 pub fn deserialize_digest_msgpack(
548 bytes: &[u8],
549 ) -> Result<GossipDigest, rmp_serde::decode::Error> {
550 rmp_serde::from_slice(bytes)
551 }
552}
553
554fn confidence_for_gene(assets: &[NetworkAsset], gene_id: &str) -> f32 {
555 assets
556 .iter()
557 .filter_map(|asset| match asset {
558 NetworkAsset::Capsule { capsule } if capsule.gene_id == gene_id => Some(capsule.confidence),
559 _ => None,
560 })
561 .fold(0.0_f32, f32::max)
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use crate::EvolutionEnvelope;
568 use oris_evolution::{AssetState, Capsule, EnvFingerprint, Outcome};
569
570 fn sample_gene_asset(id: &str) -> NetworkAsset {
571 NetworkAsset::Gene {
572 gene: Gene {
573 id: id.to_string(),
574 signals: vec!["compiler:error[E0308]".to_string()],
575 strategy: vec!["fix type mismatch".to_string()],
576 validation: vec!["cargo test".to_string()],
577 state: AssetState::Promoted,
578 task_class_id: None,
579 },
580 }
581 }
582
583 fn sample_capsule_asset(id: &str, gene_id: &str, confidence: f32) -> NetworkAsset {
584 NetworkAsset::Capsule {
585 capsule: Capsule {
586 id: id.to_string(),
587 gene_id: gene_id.to_string(),
588 mutation_id: format!("mut-{id}"),
589 run_id: format!("run-{id}"),
590 diff_hash: format!("diff-{id}"),
591 confidence,
592 env: EnvFingerprint {
593 rustc_version: "rustc 1.80.0".to_string(),
594 cargo_lock_hash: "cargo-lock".to_string(),
595 target_triple: "aarch64-apple-darwin".to_string(),
596 os: "macos".to_string(),
597 },
598 outcome: Outcome {
599 success: true,
600 validation_profile: "default".to_string(),
601 validation_duration_ms: 100,
602 changed_files: vec!["src/lib.rs".to_string()],
603 validator_hash: "validator".to_string(),
604 lines_changed: 3,
605 replay_verified: true,
606 },
607 state: AssetState::Promoted,
608 },
609 }
610 }
611
612 fn sample_envelope(gene_id: &str, confidence: f32) -> EvolutionEnvelope {
613 EvolutionEnvelope::publish(
614 "node-a",
615 vec![
616 sample_gene_asset(gene_id),
617 sample_capsule_asset(&format!("capsule-{gene_id}"), gene_id, confidence),
618 ],
619 )
620 }
621
622 #[test]
623 fn test_peer_registry_creation() {
624 let config = PeerConfig {
625 peers: vec![
626 PeerEndpoint {
627 peer_id: "peer1".into(),
628 endpoint: "http://peer1:8080".into(),
629 public_key: None,
630 },
631 PeerEndpoint {
632 peer_id: "peer2".into(),
633 endpoint: "http://peer2:8080".into(),
634 public_key: None,
635 },
636 ],
637 heartbeat_interval_secs: 30,
638 peer_timeout_secs: 10,
639 gossip_fanout: 3,
640 };
641
642 let registry = PeerRegistry::new(config, "local-peer".to_string());
643 let active = registry.get_active_peers();
644 assert_eq!(active.len(), 2);
645 }
646
647 #[test]
648 fn test_peer_failure_tracking() {
649 let config = PeerConfig {
650 peers: vec![PeerEndpoint {
651 peer_id: "peer1".into(),
652 endpoint: "http://peer1:8080".into(),
653 public_key: None,
654 }],
655 heartbeat_interval_secs: 30,
656 peer_timeout_secs: 10,
657 gossip_fanout: 3,
658 };
659
660 let registry = PeerRegistry::new(config, "local-peer".into());
661
662 registry.update_peer_status("peer1", false);
664 registry.update_peer_status("peer1", false);
665
666 let peers = registry.get_active_peers();
667 assert!(peers.is_empty()); registry.update_peer_status("peer1", true);
671 let peers = registry.get_active_peers();
672 assert_eq!(peers.len(), 1);
673 }
674
675 #[test]
676 fn test_gossip_builder() {
677 let msg = GossipBuilder::new("peer1".to_string(), 1)
678 .asset_update("asset-123".to_string(), "gene".to_string())
679 .build();
680
681 assert!(msg.is_some());
682 let msg = msg.unwrap();
683 assert_eq!(msg.origin_peer, "peer1");
684 assert_eq!(msg.sequence, 1);
685 }
686
687 #[test]
688 fn gossip_digest_only_includes_genes_above_threshold() {
689 let engine = GossipSyncEngine::new(
690 "node-a",
691 GossipConfig {
692 peers: vec!["node-b".into()],
693 sync_interval_secs: 30,
694 broadcast_threshold: 0.8,
695 },
696 );
697 engine.register_envelope(sample_envelope("gene-high", 0.92));
698 engine.register_envelope(sample_envelope("gene-low", 0.42));
699
700 let digest = engine.build_digest();
701 assert_eq!(digest.genes.len(), 1);
702 assert_eq!(digest.genes[0].gene_id, "gene-high");
703 assert!(digest.genes[0].confidence >= 0.8);
704 }
705
706 #[test]
707 fn fetch_query_response_round_trip_returns_requested_gene() {
708 let producer = GossipSyncEngine::new("node-a", GossipConfig::default());
709 producer.register_envelope(sample_envelope("gene-1", 0.95));
710
711 let query = FetchQuery {
712 sender_id: "node-b".into(),
713 signals: vec!["gene-1".into()],
714 since_cursor: None,
715 resume_token: None,
716 };
717 let response = producer.respond_to_fetch(&query);
718
719 assert_eq!(response.sender_id, "node-a");
720 assert!(!response.assets.is_empty());
721 assert_eq!(response.sync_audit.applied_count, 1);
722 assert!(response.assets.iter().any(|asset| matches!(
723 asset,
724 NetworkAsset::Gene { gene } if gene.id == "gene-1"
725 )));
726 }
727
728 #[tokio::test]
729 async fn two_in_process_gossip_sync_engines_exchange_gene_within_one_cycle() {
730 let producer = GossipSyncEngine::new(
731 "node-a",
732 GossipConfig {
733 peers: vec!["node-b".into()],
734 sync_interval_secs: 30,
735 broadcast_threshold: 0.8,
736 },
737 );
738 let consumer = GossipSyncEngine::new(
739 "node-b",
740 GossipConfig {
741 peers: vec!["node-a".into()],
742 sync_interval_secs: 30,
743 broadcast_threshold: 0.8,
744 },
745 );
746
747 producer.register_envelope(sample_envelope("gene-sync", 0.91));
748 assert!(!consumer.has_gene("gene-sync"));
749
750 let report = consumer.sync_once_with(&producer).await;
751 assert_eq!(report.requested_gene_ids, vec!["gene-sync".to_string()]);
752 assert_eq!(report.imported_gene_ids, vec!["gene-sync".to_string()]);
753 assert!(consumer.has_gene("gene-sync"));
754 }
755}