Skip to main content

oris_evolution_network/
gossip.rs

1//! Peer discovery and gossip protocol for the Evolution Network.
2//!
3//! This module provides:
4//! - Static peer list configuration
5//! - Peer health monitoring
6//! - Basic gossip protocol for event propagation
7
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::Instant;
11
12use crate::{EvolutionEnvelope, FetchQuery, FetchResponse, NetworkAsset, SyncAudit};
13use chrono::Utc;
14use oris_evolution::Gene;
15use serde::{Deserialize, Serialize};
16
17/// Configuration for peer discovery
18#[derive(Clone, Debug, Deserialize, Serialize)]
19pub struct PeerConfig {
20    /// List of peer endpoints for discovery
21    pub peers: Vec<PeerEndpoint>,
22    /// Heartbeat interval for peer health checks
23    #[serde(default = "default_heartbeat_interval")]
24    pub heartbeat_interval_secs: u64,
25    /// Timeout for peer responses
26    #[serde(default = "default_peer_timeout_secs")]
27    pub peer_timeout_secs: u64,
28    /// Gossip fanout (number of peers to spread messages to)
29    #[serde(default = "default_fanout")]
30    pub gossip_fanout: usize,
31}
32
33fn default_heartbeat_interval() -> u64 {
34    30
35}
36fn default_peer_timeout_secs() -> u64 {
37    10
38}
39fn default_fanout() -> usize {
40    3
41}
42
43/// A peer endpoint in the network
44#[derive(Clone, Debug, Deserialize, Serialize)]
45pub struct PeerEndpoint {
46    /// Unique identifier for the peer
47    pub peer_id: String,
48    /// HTTP endpoint for the peer
49    pub endpoint: String,
50    /// Optional public key for authentication
51    pub public_key: Option<String>,
52}
53
54/// Status of a peer
55#[derive(Clone, Debug, PartialEq)]
56pub enum PeerStatus {
57    /// Peer is active and responding
58    Active,
59    /// Peer is suspected to be offline
60    Suspected,
61    /// Peer is confirmed offline
62    Offline,
63}
64
65/// Information about a known peer
66#[derive(Clone, Debug)]
67pub struct PeerInfo {
68    pub endpoint: PeerEndpoint,
69    pub status: PeerStatus,
70    pub last_seen: Instant,
71    pub last_heartbeat: Option<Instant>,
72    pub failure_count: u32,
73}
74
75impl PeerInfo {
76    pub fn new(endpoint: PeerEndpoint) -> Self {
77        Self {
78            endpoint,
79            status: PeerStatus::Active,
80            last_seen: Instant::now(),
81            last_heartbeat: None,
82            failure_count: 0,
83        }
84    }
85
86    pub fn mark_failure(&mut self) {
87        self.failure_count += 1;
88        if self.failure_count >= 3 {
89            self.status = PeerStatus::Offline;
90        } else {
91            self.status = PeerStatus::Suspected;
92        }
93    }
94
95    pub fn mark_success(&mut self) {
96        self.failure_count = 0;
97        self.status = PeerStatus::Active;
98        self.last_seen = Instant::now();
99    }
100}
101
102/// Gossip message for peer-to-peer communication
103#[derive(Clone, Debug, Deserialize, Serialize)]
104pub struct GossipMessage {
105    /// Unique message identifier
106    pub message_id: String,
107    /// Origin peer ID
108    pub origin_peer: String,
109    /// Sequence number for ordering
110    pub sequence: u64,
111    /// Message type
112    pub kind: GossipKind,
113    /// Timestamp
114    pub timestamp: String,
115    /// Message payload (JSON)
116    pub payload: String,
117}
118
119/// Types of gossip messages
120#[derive(Clone, Debug, Deserialize, Serialize)]
121#[serde(tag = "type", rename_all = "snake_case")]
122pub enum GossipKind {
123    /// Peer advertisement
124    Advertisement { peer_id: String, endpoint: String },
125    /// Asset update (gene, capsule, event)
126    AssetUpdate {
127        asset_id: String,
128        asset_type: String,
129    },
130    /// State synchronization request
131    SyncRequest { since_sequence: u64 },
132    /// State synchronization response
133    SyncResponse { assets: Vec<String> },
134    /// Peer leave notification
135    Leave { peer_id: String },
136}
137
138/// Peer registry for managing known peers
139#[derive(Clone)]
140pub struct PeerRegistry {
141    peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
142    config: PeerConfig,
143    local_peer_id: String,
144}
145
146impl PeerRegistry {
147    /// Create a new peer registry from config
148    pub fn new(config: PeerConfig, local_peer_id: String) -> Self {
149        let peers: HashMap<String, PeerInfo> = config
150            .peers
151            .iter()
152            .map(|e| (e.peer_id.clone(), PeerInfo::new(e.clone())))
153            .collect();
154
155        Self {
156            peers: Arc::new(RwLock::new(peers)),
157            config,
158            local_peer_id,
159        }
160    }
161
162    /// Get all active peers
163    pub fn get_active_peers(&self) -> Vec<PeerEndpoint> {
164        self.peers
165            .read()
166            .unwrap()
167            .values()
168            .filter(|p| p.status == PeerStatus::Active)
169            .map(|p| p.endpoint.clone())
170            .collect()
171    }
172
173    /// Get a random sample of peers for gossip
174    pub fn get_gossip_peers(&self, count: usize) -> Vec<PeerEndpoint> {
175        let peers = self.peers.read().unwrap();
176        let active: Vec<_> = peers
177            .values()
178            .filter(|p| p.status == PeerStatus::Active)
179            .filter(|p| p.endpoint.peer_id != self.local_peer_id)
180            .map(|p| p.endpoint.clone())
181            .collect();
182
183        if active.is_empty() {
184            return vec![];
185        }
186
187        // Simple round-robin selection
188        let count = count.min(active.len());
189        active.into_iter().take(count).collect()
190    }
191
192    /// Update peer status based on heartbeat
193    pub fn update_peer_status(&self, peer_id: &str, is_alive: bool) {
194        let mut peers = self.peers.write().unwrap();
195        if let Some(peer) = peers.get_mut(peer_id) {
196            if is_alive {
197                peer.mark_success();
198                peer.last_heartbeat = Some(Instant::now());
199            } else {
200                peer.mark_failure();
201            }
202        }
203    }
204
205    /// Add a new peer discovered via gossip
206    pub fn add_peer(&self, endpoint: PeerEndpoint) {
207        let mut peers = self.peers.write().unwrap();
208        if !peers.contains_key(&endpoint.peer_id) {
209            peers.insert(endpoint.peer_id.clone(), PeerInfo::new(endpoint));
210        }
211    }
212
213    /// Remove a peer
214    pub fn remove_peer(&self, peer_id: &str) {
215        let mut peers = self.peers.write().unwrap();
216        peers.remove(peer_id);
217    }
218
219    /// Get local peer ID
220    pub fn local_peer_id(&self) -> &str {
221        &self.local_peer_id
222    }
223
224    /// Get config
225    pub fn config(&self) -> &PeerConfig {
226        &self.config
227    }
228}
229
230/// Builder for creating gossip messages
231pub struct GossipBuilder {
232    origin_peer: String,
233    sequence: u64,
234    kind: Option<GossipKind>,
235    payload: Option<String>,
236}
237
238impl GossipBuilder {
239    pub fn new(origin_peer: String, sequence: u64) -> Self {
240        Self {
241            origin_peer,
242            sequence,
243            kind: None,
244            payload: None,
245        }
246    }
247
248    pub fn advertisement(mut self, peer_id: String, endpoint: String) -> Self {
249        self.kind = Some(GossipKind::Advertisement { peer_id, endpoint });
250        self
251    }
252
253    pub fn asset_update(mut self, asset_id: String, asset_type: String) -> Self {
254        self.kind = Some(GossipKind::AssetUpdate {
255            asset_id,
256            asset_type,
257        });
258        self
259    }
260
261    pub fn sync_request(mut self, since_sequence: u64) -> Self {
262        self.kind = Some(GossipKind::SyncRequest { since_sequence });
263        self
264    }
265
266    pub fn sync_response(mut self, assets: Vec<String>) -> Self {
267        self.kind = Some(GossipKind::SyncResponse { assets });
268        self
269    }
270
271    pub fn leave(mut self, peer_id: String) -> Self {
272        self.kind = Some(GossipKind::Leave { peer_id });
273        self
274    }
275
276    pub fn payload(mut self, payload: String) -> Self {
277        self.payload = Some(payload);
278        self
279    }
280
281    pub fn build(self) -> Option<GossipMessage> {
282        let kind = self.kind?;
283        let payload = self
284            .payload
285            .unwrap_or_else(|| serde_json::to_string(&kind).unwrap_or_default());
286
287        Some(GossipMessage {
288            message_id: format!(
289                "gossip-{:x}",
290                Utc::now().timestamp_nanos_opt().unwrap_or_default()
291            ),
292            origin_peer: self.origin_peer,
293            sequence: self.sequence,
294            kind,
295            timestamp: Utc::now().to_rfc3339(),
296            payload,
297        })
298    }
299}
300
301pub type PeerAddress = String;
302
303fn default_sync_interval_secs() -> u64 {
304    30
305}
306
307fn default_broadcast_threshold() -> f32 {
308    0.8
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize)]
312pub struct GossipConfig {
313    #[serde(default)]
314    pub peers: Vec<PeerAddress>,
315    #[serde(default = "default_sync_interval_secs")]
316    pub sync_interval_secs: u64,
317    #[serde(default = "default_broadcast_threshold")]
318    pub broadcast_threshold: f32,
319}
320
321impl Default for GossipConfig {
322    fn default() -> Self {
323        Self {
324            peers: Vec::new(),
325            sync_interval_secs: default_sync_interval_secs(),
326            broadcast_threshold: default_broadcast_threshold(),
327        }
328    }
329}
330
331#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
332pub struct GossipDigestEntry {
333    pub gene_id: String,
334    pub confidence: f32,
335    pub version: u64,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339pub struct GossipDigest {
340    pub sender_id: String,
341    #[serde(default)]
342    pub genes: Vec<GossipDigestEntry>,
343}
344
345#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
346pub struct GossipSyncReport {
347    pub requested_gene_ids: Vec<String>,
348    pub imported_gene_ids: Vec<String>,
349}
350
351#[derive(Clone, Debug)]
352struct LocalGeneRecord {
353    envelope: EvolutionEnvelope,
354    confidence: f32,
355    version: u64,
356}
357
358/// Push-pull gossip engine for in-process synchronization tests and example flows.
359pub struct GossipSyncEngine {
360    local_peer_id: String,
361    peers: Vec<PeerAddress>,
362    config: GossipConfig,
363    records: Arc<RwLock<HashMap<String, LocalGeneRecord>>>,
364    next_version: Mutex<u64>,
365}
366
367impl GossipSyncEngine {
368    pub fn new(local_peer_id: impl Into<String>, config: GossipConfig) -> Self {
369        Self {
370            local_peer_id: local_peer_id.into(),
371            peers: config.peers.clone(),
372            config,
373            records: Arc::new(RwLock::new(HashMap::new())),
374            next_version: Mutex::new(0),
375        }
376    }
377
378    pub fn peers(&self) -> &[PeerAddress] {
379        &self.peers
380    }
381
382    pub fn config(&self) -> &GossipConfig {
383        &self.config
384    }
385
386    pub fn local_peer_id(&self) -> &str {
387        &self.local_peer_id
388    }
389
390    pub fn has_gene(&self, gene_id: &str) -> bool {
391        self.records.read().unwrap().contains_key(gene_id)
392    }
393
394    pub fn gene_version(&self, gene_id: &str) -> Option<u64> {
395        self.records.read().unwrap().get(gene_id).map(|r| r.version)
396    }
397
398    pub fn register_envelope(&self, envelope: EvolutionEnvelope) -> usize {
399        let genes: Vec<Gene> = envelope
400            .assets
401            .iter()
402            .filter_map(|asset| match asset {
403                NetworkAsset::Gene { gene } => Some(gene.clone()),
404                _ => None,
405            })
406            .collect();
407
408        let mut version_counter = self.next_version.lock().unwrap();
409        let mut records = self.records.write().unwrap();
410        let mut inserted = 0;
411        for gene in genes {
412            *version_counter += 1;
413            let confidence = confidence_for_gene(&envelope.assets, &gene.id);
414            records.insert(
415                gene.id.clone(),
416                LocalGeneRecord {
417                    envelope: envelope.clone(),
418                    confidence,
419                    version: *version_counter,
420                },
421            );
422            inserted += 1;
423        }
424        inserted
425    }
426
427    pub fn build_digest(&self) -> GossipDigest {
428        let genes = self
429            .records
430            .read()
431            .unwrap()
432            .iter()
433            .filter(|(_, record)| record.confidence >= self.config.broadcast_threshold)
434            .map(|(gene_id, record)| GossipDigestEntry {
435                gene_id: gene_id.clone(),
436                confidence: record.confidence,
437                version: record.version,
438            })
439            .collect();
440
441        GossipDigest {
442            sender_id: self.local_peer_id.clone(),
443            genes,
444        }
445    }
446
447    pub fn build_fetch_query_for_digest(&self, digest: &GossipDigest) -> FetchQuery {
448        let local = self.records.read().unwrap();
449        let requested_gene_ids = digest
450            .genes
451            .iter()
452            .filter(|entry| {
453                local
454                    .get(&entry.gene_id)
455                    .map(|record| record.version < entry.version)
456                    .unwrap_or(true)
457            })
458            .map(|entry| entry.gene_id.clone())
459            .collect::<Vec<_>>();
460
461        FetchQuery {
462            sender_id: self.local_peer_id.clone(),
463            signals: requested_gene_ids,
464            since_cursor: None,
465            resume_token: None,
466        }
467    }
468
469    pub fn respond_to_fetch(&self, query: &FetchQuery) -> FetchResponse {
470        let records = self.records.read().unwrap();
471        let mut assets = Vec::new();
472        let mut applied = 0usize;
473        for gene_id in &query.signals {
474            if let Some(record) = records.get(gene_id) {
475                assets.extend(record.envelope.assets.clone());
476                applied += 1;
477            }
478        }
479
480        FetchResponse {
481            sender_id: self.local_peer_id.clone(),
482            assets,
483            next_cursor: None,
484            resume_token: None,
485            sync_audit: SyncAudit {
486                batch_id: format!("gossip-fetch-{}-{}", self.local_peer_id, Utc::now().timestamp()),
487                requested_cursor: None,
488                scanned_count: query.signals.len(),
489                applied_count: applied,
490                skipped_count: query.signals.len().saturating_sub(applied),
491                failed_count: 0,
492                failure_reasons: vec![],
493            },
494        }
495    }
496
497    pub fn apply_fetch_response(&self, response: &FetchResponse) -> Vec<String> {
498        if response.assets.is_empty() {
499            return vec![];
500        }
501        let envelope = EvolutionEnvelope::publish(response.sender_id.clone(), response.assets.clone());
502        let imported = envelope
503            .assets
504            .iter()
505            .filter_map(|asset| match asset {
506                NetworkAsset::Gene { gene } => Some(gene.id.clone()),
507                _ => None,
508            })
509            .collect::<Vec<_>>();
510        let _ = self.register_envelope(envelope);
511        imported
512    }
513
514    pub async fn sync_once_with(&self, remote: &GossipSyncEngine) -> GossipSyncReport {
515        let digest = remote.build_digest();
516        let query = self.build_fetch_query_for_digest(&digest);
517        let requested_gene_ids = query.signals.clone();
518        let response = remote.respond_to_fetch(&query);
519        let imported_gene_ids = self.apply_fetch_response(&response);
520        GossipSyncReport {
521            requested_gene_ids,
522            imported_gene_ids,
523        }
524    }
525
526    pub async fn start_sync_loop(self) -> tokio::task::JoinHandle<()> {
527        tokio::spawn(async move {
528            let interval = std::time::Duration::from_secs(self.config.sync_interval_secs.max(1));
529            loop {
530                tokio::time::sleep(interval).await;
531            }
532        })
533    }
534
535    pub fn serialize_digest_json(digest: &GossipDigest) -> Result<Vec<u8>, serde_json::Error> {
536        serde_json::to_vec(digest)
537    }
538
539    #[cfg(feature = "gossip-msgpack")]
540    pub fn serialize_digest_msgpack(
541        digest: &GossipDigest,
542    ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
543        rmp_serde::to_vec_named(digest)
544    }
545
546    #[cfg(feature = "gossip-msgpack")]
547    pub fn deserialize_digest_msgpack(
548        bytes: &[u8],
549    ) -> Result<GossipDigest, rmp_serde::decode::Error> {
550        rmp_serde::from_slice(bytes)
551    }
552}
553
554fn confidence_for_gene(assets: &[NetworkAsset], gene_id: &str) -> f32 {
555    assets
556        .iter()
557        .filter_map(|asset| match asset {
558            NetworkAsset::Capsule { capsule } if capsule.gene_id == gene_id => Some(capsule.confidence),
559            _ => None,
560        })
561        .fold(0.0_f32, f32::max)
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use crate::EvolutionEnvelope;
568    use oris_evolution::{AssetState, Capsule, EnvFingerprint, Outcome};
569
570    fn sample_gene_asset(id: &str) -> NetworkAsset {
571        NetworkAsset::Gene {
572            gene: Gene {
573                id: id.to_string(),
574                signals: vec!["compiler:error[E0308]".to_string()],
575                strategy: vec!["fix type mismatch".to_string()],
576                validation: vec!["cargo test".to_string()],
577                state: AssetState::Promoted,
578                task_class_id: None,
579            },
580        }
581    }
582
583    fn sample_capsule_asset(id: &str, gene_id: &str, confidence: f32) -> NetworkAsset {
584        NetworkAsset::Capsule {
585            capsule: Capsule {
586                id: id.to_string(),
587                gene_id: gene_id.to_string(),
588                mutation_id: format!("mut-{id}"),
589                run_id: format!("run-{id}"),
590                diff_hash: format!("diff-{id}"),
591                confidence,
592                env: EnvFingerprint {
593                    rustc_version: "rustc 1.80.0".to_string(),
594                    cargo_lock_hash: "cargo-lock".to_string(),
595                    target_triple: "aarch64-apple-darwin".to_string(),
596                    os: "macos".to_string(),
597                },
598                outcome: Outcome {
599                    success: true,
600                    validation_profile: "default".to_string(),
601                    validation_duration_ms: 100,
602                    changed_files: vec!["src/lib.rs".to_string()],
603                    validator_hash: "validator".to_string(),
604                    lines_changed: 3,
605                    replay_verified: true,
606                },
607                state: AssetState::Promoted,
608            },
609        }
610    }
611
612    fn sample_envelope(gene_id: &str, confidence: f32) -> EvolutionEnvelope {
613        EvolutionEnvelope::publish(
614            "node-a",
615            vec![
616                sample_gene_asset(gene_id),
617                sample_capsule_asset(&format!("capsule-{gene_id}"), gene_id, confidence),
618            ],
619        )
620    }
621
622    #[test]
623    fn test_peer_registry_creation() {
624        let config = PeerConfig {
625            peers: vec![
626                PeerEndpoint {
627                    peer_id: "peer1".into(),
628                    endpoint: "http://peer1:8080".into(),
629                    public_key: None,
630                },
631                PeerEndpoint {
632                    peer_id: "peer2".into(),
633                    endpoint: "http://peer2:8080".into(),
634                    public_key: None,
635                },
636            ],
637            heartbeat_interval_secs: 30,
638            peer_timeout_secs: 10,
639            gossip_fanout: 3,
640        };
641
642        let registry = PeerRegistry::new(config, "local-peer".to_string());
643        let active = registry.get_active_peers();
644        assert_eq!(active.len(), 2);
645    }
646
647    #[test]
648    fn test_peer_failure_tracking() {
649        let config = PeerConfig {
650            peers: vec![PeerEndpoint {
651                peer_id: "peer1".into(),
652                endpoint: "http://peer1:8080".into(),
653                public_key: None,
654            }],
655            heartbeat_interval_secs: 30,
656            peer_timeout_secs: 10,
657            gossip_fanout: 3,
658        };
659
660        let registry = PeerRegistry::new(config, "local-peer".into());
661
662        // Simulate failures
663        registry.update_peer_status("peer1", false);
664        registry.update_peer_status("peer1", false);
665
666        let peers = registry.get_active_peers();
667        assert!(peers.is_empty()); // Status should be Suspected
668
669        // Recover
670        registry.update_peer_status("peer1", true);
671        let peers = registry.get_active_peers();
672        assert_eq!(peers.len(), 1);
673    }
674
675    #[test]
676    fn test_gossip_builder() {
677        let msg = GossipBuilder::new("peer1".to_string(), 1)
678            .asset_update("asset-123".to_string(), "gene".to_string())
679            .build();
680
681        assert!(msg.is_some());
682        let msg = msg.unwrap();
683        assert_eq!(msg.origin_peer, "peer1");
684        assert_eq!(msg.sequence, 1);
685    }
686
687    #[test]
688    fn gossip_digest_only_includes_genes_above_threshold() {
689        let engine = GossipSyncEngine::new(
690            "node-a",
691            GossipConfig {
692                peers: vec!["node-b".into()],
693                sync_interval_secs: 30,
694                broadcast_threshold: 0.8,
695            },
696        );
697        engine.register_envelope(sample_envelope("gene-high", 0.92));
698        engine.register_envelope(sample_envelope("gene-low", 0.42));
699
700        let digest = engine.build_digest();
701        assert_eq!(digest.genes.len(), 1);
702        assert_eq!(digest.genes[0].gene_id, "gene-high");
703        assert!(digest.genes[0].confidence >= 0.8);
704    }
705
706    #[test]
707    fn fetch_query_response_round_trip_returns_requested_gene() {
708        let producer = GossipSyncEngine::new("node-a", GossipConfig::default());
709        producer.register_envelope(sample_envelope("gene-1", 0.95));
710
711        let query = FetchQuery {
712            sender_id: "node-b".into(),
713            signals: vec!["gene-1".into()],
714            since_cursor: None,
715            resume_token: None,
716        };
717        let response = producer.respond_to_fetch(&query);
718
719        assert_eq!(response.sender_id, "node-a");
720        assert!(!response.assets.is_empty());
721        assert_eq!(response.sync_audit.applied_count, 1);
722        assert!(response.assets.iter().any(|asset| matches!(
723            asset,
724            NetworkAsset::Gene { gene } if gene.id == "gene-1"
725        )));
726    }
727
728    #[tokio::test]
729    async fn two_in_process_gossip_sync_engines_exchange_gene_within_one_cycle() {
730        let producer = GossipSyncEngine::new(
731            "node-a",
732            GossipConfig {
733                peers: vec!["node-b".into()],
734                sync_interval_secs: 30,
735                broadcast_threshold: 0.8,
736            },
737        );
738        let consumer = GossipSyncEngine::new(
739            "node-b",
740            GossipConfig {
741                peers: vec!["node-a".into()],
742                sync_interval_secs: 30,
743                broadcast_threshold: 0.8,
744            },
745        );
746
747        producer.register_envelope(sample_envelope("gene-sync", 0.91));
748        assert!(!consumer.has_gene("gene-sync"));
749
750        let report = consumer.sync_once_with(&producer).await;
751        assert_eq!(report.requested_gene_ids, vec!["gene-sync".to_string()]);
752        assert_eq!(report.imported_gene_ids, vec!["gene-sync".to_string()]);
753        assert!(consumer.has_gene("gene-sync"));
754    }
755}