Skip to main content

oris_evolution_network/
gossip.rs

1//! Peer discovery and gossip protocol for the Evolution Network.
2//!
3//! This module provides:
4//! - Static peer list configuration
5//! - Peer health monitoring
6//! - Basic gossip protocol for event propagation
7
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::Instant;
11
12use crate::{EvolutionEnvelope, FetchQuery, FetchResponse, NetworkAsset, SyncAudit};
13use chrono::Utc;
14use oris_evolution::Gene;
15use serde::{Deserialize, Serialize};
16
17/// Configuration for peer discovery
18#[derive(Clone, Debug, Deserialize, Serialize)]
19pub struct PeerConfig {
20    /// List of peer endpoints for discovery
21    pub peers: Vec<PeerEndpoint>,
22    /// Heartbeat interval for peer health checks
23    #[serde(default = "default_heartbeat_interval")]
24    pub heartbeat_interval_secs: u64,
25    /// Timeout for peer responses
26    #[serde(default = "default_peer_timeout_secs")]
27    pub peer_timeout_secs: u64,
28    /// Gossip fanout (number of peers to spread messages to)
29    #[serde(default = "default_fanout")]
30    pub gossip_fanout: usize,
31}
32
33fn default_heartbeat_interval() -> u64 {
34    30
35}
36fn default_peer_timeout_secs() -> u64 {
37    10
38}
39fn default_fanout() -> usize {
40    3
41}
42
43/// A peer endpoint in the network
44#[derive(Clone, Debug, Deserialize, Serialize)]
45pub struct PeerEndpoint {
46    /// Unique identifier for the peer
47    pub peer_id: String,
48    /// HTTP endpoint for the peer
49    pub endpoint: String,
50    /// Optional public key for authentication
51    pub public_key: Option<String>,
52}
53
54/// Status of a peer
55#[derive(Clone, Debug, PartialEq)]
56pub enum PeerStatus {
57    /// Peer is active and responding
58    Active,
59    /// Peer is suspected to be offline
60    Suspected,
61    /// Peer is confirmed offline
62    Offline,
63}
64
65/// Information about a known peer
66#[derive(Clone, Debug)]
67pub struct PeerInfo {
68    pub endpoint: PeerEndpoint,
69    pub status: PeerStatus,
70    pub last_seen: Instant,
71    pub last_heartbeat: Option<Instant>,
72    pub failure_count: u32,
73}
74
75impl PeerInfo {
76    pub fn new(endpoint: PeerEndpoint) -> Self {
77        Self {
78            endpoint,
79            status: PeerStatus::Active,
80            last_seen: Instant::now(),
81            last_heartbeat: None,
82            failure_count: 0,
83        }
84    }
85
86    pub fn mark_failure(&mut self) {
87        self.failure_count += 1;
88        if self.failure_count >= 3 {
89            self.status = PeerStatus::Offline;
90        } else {
91            self.status = PeerStatus::Suspected;
92        }
93    }
94
95    pub fn mark_success(&mut self) {
96        self.failure_count = 0;
97        self.status = PeerStatus::Active;
98        self.last_seen = Instant::now();
99    }
100}
101
102/// Gossip message for peer-to-peer communication
103#[derive(Clone, Debug, Deserialize, Serialize)]
104pub struct GossipMessage {
105    /// Unique message identifier
106    pub message_id: String,
107    /// Origin peer ID
108    pub origin_peer: String,
109    /// Sequence number for ordering
110    pub sequence: u64,
111    /// Message type
112    pub kind: GossipKind,
113    /// Timestamp
114    pub timestamp: String,
115    /// Message payload (JSON)
116    pub payload: String,
117}
118
119/// Types of gossip messages
120#[derive(Clone, Debug, Deserialize, Serialize)]
121#[serde(tag = "type", rename_all = "snake_case")]
122pub enum GossipKind {
123    /// Peer advertisement
124    Advertisement { peer_id: String, endpoint: String },
125    /// Asset update (gene, capsule, event)
126    AssetUpdate {
127        asset_id: String,
128        asset_type: String,
129    },
130    /// State synchronization request
131    SyncRequest { since_sequence: u64 },
132    /// State synchronization response
133    SyncResponse { assets: Vec<String> },
134    /// Peer leave notification
135    Leave { peer_id: String },
136}
137
138/// Peer registry for managing known peers
139#[derive(Clone)]
140pub struct PeerRegistry {
141    peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
142    config: PeerConfig,
143    local_peer_id: String,
144}
145
146impl PeerRegistry {
147    /// Create a new peer registry from config
148    pub fn new(config: PeerConfig, local_peer_id: String) -> Self {
149        let peers: HashMap<String, PeerInfo> = config
150            .peers
151            .iter()
152            .map(|e| (e.peer_id.clone(), PeerInfo::new(e.clone())))
153            .collect();
154
155        Self {
156            peers: Arc::new(RwLock::new(peers)),
157            config,
158            local_peer_id,
159        }
160    }
161
162    /// Get all active peers
163    pub fn get_active_peers(&self) -> Vec<PeerEndpoint> {
164        self.peers
165            .read()
166            .unwrap()
167            .values()
168            .filter(|p| p.status == PeerStatus::Active)
169            .map(|p| p.endpoint.clone())
170            .collect()
171    }
172
173    /// Get a random sample of peers for gossip
174    pub fn get_gossip_peers(&self, count: usize) -> Vec<PeerEndpoint> {
175        let peers = self.peers.read().unwrap();
176        let active: Vec<_> = peers
177            .values()
178            .filter(|p| p.status == PeerStatus::Active)
179            .filter(|p| p.endpoint.peer_id != self.local_peer_id)
180            .map(|p| p.endpoint.clone())
181            .collect();
182
183        if active.is_empty() {
184            return vec![];
185        }
186
187        // Simple round-robin selection
188        let count = count.min(active.len());
189        active.into_iter().take(count).collect()
190    }
191
192    /// Update peer status based on heartbeat
193    pub fn update_peer_status(&self, peer_id: &str, is_alive: bool) {
194        let mut peers = self.peers.write().unwrap();
195        if let Some(peer) = peers.get_mut(peer_id) {
196            if is_alive {
197                peer.mark_success();
198                peer.last_heartbeat = Some(Instant::now());
199            } else {
200                peer.mark_failure();
201            }
202        }
203    }
204
205    /// Add a new peer discovered via gossip
206    pub fn add_peer(&self, endpoint: PeerEndpoint) {
207        let mut peers = self.peers.write().unwrap();
208        if !peers.contains_key(&endpoint.peer_id) {
209            peers.insert(endpoint.peer_id.clone(), PeerInfo::new(endpoint));
210        }
211    }
212
213    /// Remove a peer
214    pub fn remove_peer(&self, peer_id: &str) {
215        let mut peers = self.peers.write().unwrap();
216        peers.remove(peer_id);
217    }
218
219    /// Get local peer ID
220    pub fn local_peer_id(&self) -> &str {
221        &self.local_peer_id
222    }
223
224    /// Get config
225    pub fn config(&self) -> &PeerConfig {
226        &self.config
227    }
228}
229
230/// Builder for creating gossip messages
231pub struct GossipBuilder {
232    origin_peer: String,
233    sequence: u64,
234    kind: Option<GossipKind>,
235    payload: Option<String>,
236}
237
238impl GossipBuilder {
239    pub fn new(origin_peer: String, sequence: u64) -> Self {
240        Self {
241            origin_peer,
242            sequence,
243            kind: None,
244            payload: None,
245        }
246    }
247
248    pub fn advertisement(mut self, peer_id: String, endpoint: String) -> Self {
249        self.kind = Some(GossipKind::Advertisement { peer_id, endpoint });
250        self
251    }
252
253    pub fn asset_update(mut self, asset_id: String, asset_type: String) -> Self {
254        self.kind = Some(GossipKind::AssetUpdate {
255            asset_id,
256            asset_type,
257        });
258        self
259    }
260
261    pub fn sync_request(mut self, since_sequence: u64) -> Self {
262        self.kind = Some(GossipKind::SyncRequest { since_sequence });
263        self
264    }
265
266    pub fn sync_response(mut self, assets: Vec<String>) -> Self {
267        self.kind = Some(GossipKind::SyncResponse { assets });
268        self
269    }
270
271    pub fn leave(mut self, peer_id: String) -> Self {
272        self.kind = Some(GossipKind::Leave { peer_id });
273        self
274    }
275
276    pub fn payload(mut self, payload: String) -> Self {
277        self.payload = Some(payload);
278        self
279    }
280
281    pub fn build(self) -> Option<GossipMessage> {
282        let kind = self.kind?;
283        let payload = self
284            .payload
285            .unwrap_or_else(|| serde_json::to_string(&kind).unwrap_or_default());
286
287        Some(GossipMessage {
288            message_id: format!(
289                "gossip-{:x}",
290                Utc::now().timestamp_nanos_opt().unwrap_or_default()
291            ),
292            origin_peer: self.origin_peer,
293            sequence: self.sequence,
294            kind,
295            timestamp: Utc::now().to_rfc3339(),
296            payload,
297        })
298    }
299}
300
301pub type PeerAddress = String;
302
303fn default_sync_interval_secs() -> u64 {
304    30
305}
306
307fn default_broadcast_threshold() -> f32 {
308    0.8
309}
310
311#[derive(Clone, Debug, Serialize, Deserialize)]
312pub struct GossipConfig {
313    #[serde(default)]
314    pub peers: Vec<PeerAddress>,
315    #[serde(default = "default_sync_interval_secs")]
316    pub sync_interval_secs: u64,
317    #[serde(default = "default_broadcast_threshold")]
318    pub broadcast_threshold: f32,
319}
320
321impl Default for GossipConfig {
322    fn default() -> Self {
323        Self {
324            peers: Vec::new(),
325            sync_interval_secs: default_sync_interval_secs(),
326            broadcast_threshold: default_broadcast_threshold(),
327        }
328    }
329}
330
331#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
332pub struct GossipDigestEntry {
333    pub gene_id: String,
334    pub confidence: f32,
335    pub version: u64,
336}
337
338#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
339pub struct GossipDigest {
340    pub sender_id: String,
341    #[serde(default)]
342    pub genes: Vec<GossipDigestEntry>,
343}
344
345#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
346pub struct GossipSyncReport {
347    pub requested_gene_ids: Vec<String>,
348    pub imported_gene_ids: Vec<String>,
349}
350
351#[derive(Clone, Debug)]
352struct LocalGeneRecord {
353    envelope: EvolutionEnvelope,
354    confidence: f32,
355    version: u64,
356}
357
358/// Push-pull gossip engine for in-process synchronization tests and example flows.
359pub struct GossipSyncEngine {
360    local_peer_id: String,
361    peers: Vec<PeerAddress>,
362    config: GossipConfig,
363    records: Arc<RwLock<HashMap<String, LocalGeneRecord>>>,
364    next_version: Mutex<u64>,
365}
366
367impl GossipSyncEngine {
368    pub fn new(local_peer_id: impl Into<String>, config: GossipConfig) -> Self {
369        Self {
370            local_peer_id: local_peer_id.into(),
371            peers: config.peers.clone(),
372            config,
373            records: Arc::new(RwLock::new(HashMap::new())),
374            next_version: Mutex::new(0),
375        }
376    }
377
378    pub fn peers(&self) -> &[PeerAddress] {
379        &self.peers
380    }
381
382    pub fn config(&self) -> &GossipConfig {
383        &self.config
384    }
385
386    pub fn local_peer_id(&self) -> &str {
387        &self.local_peer_id
388    }
389
390    pub fn has_gene(&self, gene_id: &str) -> bool {
391        self.records.read().unwrap().contains_key(gene_id)
392    }
393
394    pub fn gene_version(&self, gene_id: &str) -> Option<u64> {
395        self.records.read().unwrap().get(gene_id).map(|r| r.version)
396    }
397
398    pub fn register_envelope(&self, envelope: EvolutionEnvelope) -> usize {
399        let genes: Vec<Gene> = envelope
400            .assets
401            .iter()
402            .filter_map(|asset| match asset {
403                NetworkAsset::Gene { gene } => Some(gene.clone()),
404                _ => None,
405            })
406            .collect();
407
408        let mut version_counter = self.next_version.lock().unwrap();
409        let mut records = self.records.write().unwrap();
410        let mut inserted = 0;
411        for gene in genes {
412            *version_counter += 1;
413            let confidence = confidence_for_gene(&envelope.assets, &gene.id);
414            records.insert(
415                gene.id.clone(),
416                LocalGeneRecord {
417                    envelope: envelope.clone(),
418                    confidence,
419                    version: *version_counter,
420                },
421            );
422            inserted += 1;
423        }
424        inserted
425    }
426
427    pub fn build_digest(&self) -> GossipDigest {
428        let genes = self
429            .records
430            .read()
431            .unwrap()
432            .iter()
433            .filter(|(_, record)| record.confidence >= self.config.broadcast_threshold)
434            .map(|(gene_id, record)| GossipDigestEntry {
435                gene_id: gene_id.clone(),
436                confidence: record.confidence,
437                version: record.version,
438            })
439            .collect();
440
441        GossipDigest {
442            sender_id: self.local_peer_id.clone(),
443            genes,
444        }
445    }
446
447    pub fn build_fetch_query_for_digest(&self, digest: &GossipDigest) -> FetchQuery {
448        let local = self.records.read().unwrap();
449        let requested_gene_ids = digest
450            .genes
451            .iter()
452            .filter(|entry| {
453                local
454                    .get(&entry.gene_id)
455                    .map(|record| record.version < entry.version)
456                    .unwrap_or(true)
457            })
458            .map(|entry| entry.gene_id.clone())
459            .collect::<Vec<_>>();
460
461        FetchQuery {
462            sender_id: self.local_peer_id.clone(),
463            signals: requested_gene_ids,
464            since_cursor: None,
465            resume_token: None,
466        }
467    }
468
469    pub fn respond_to_fetch(&self, query: &FetchQuery) -> FetchResponse {
470        let records = self.records.read().unwrap();
471        let mut assets = Vec::new();
472        let mut applied = 0usize;
473        for gene_id in &query.signals {
474            if let Some(record) = records.get(gene_id) {
475                assets.extend(record.envelope.assets.clone());
476                applied += 1;
477            }
478        }
479
480        FetchResponse {
481            sender_id: self.local_peer_id.clone(),
482            assets,
483            next_cursor: None,
484            resume_token: None,
485            sync_audit: SyncAudit {
486                batch_id: format!(
487                    "gossip-fetch-{}-{}",
488                    self.local_peer_id,
489                    Utc::now().timestamp()
490                ),
491                requested_cursor: None,
492                scanned_count: query.signals.len(),
493                applied_count: applied,
494                skipped_count: query.signals.len().saturating_sub(applied),
495                failed_count: 0,
496                failure_reasons: vec![],
497            },
498        }
499    }
500
501    pub fn apply_fetch_response(&self, response: &FetchResponse) -> Vec<String> {
502        if response.assets.is_empty() {
503            return vec![];
504        }
505        let envelope =
506            EvolutionEnvelope::publish(response.sender_id.clone(), response.assets.clone());
507        let imported = envelope
508            .assets
509            .iter()
510            .filter_map(|asset| match asset {
511                NetworkAsset::Gene { gene } => Some(gene.id.clone()),
512                _ => None,
513            })
514            .collect::<Vec<_>>();
515        let _ = self.register_envelope(envelope);
516        imported
517    }
518
519    pub async fn sync_once_with(&self, remote: &GossipSyncEngine) -> GossipSyncReport {
520        let digest = remote.build_digest();
521        let query = self.build_fetch_query_for_digest(&digest);
522        let requested_gene_ids = query.signals.clone();
523        let response = remote.respond_to_fetch(&query);
524        let imported_gene_ids = self.apply_fetch_response(&response);
525        GossipSyncReport {
526            requested_gene_ids,
527            imported_gene_ids,
528        }
529    }
530
531    pub async fn start_sync_loop(self) -> tokio::task::JoinHandle<()> {
532        tokio::spawn(async move {
533            let interval = std::time::Duration::from_secs(self.config.sync_interval_secs.max(1));
534            loop {
535                tokio::time::sleep(interval).await;
536            }
537        })
538    }
539
540    pub fn serialize_digest_json(digest: &GossipDigest) -> Result<Vec<u8>, serde_json::Error> {
541        serde_json::to_vec(digest)
542    }
543
544    #[cfg(feature = "gossip-msgpack")]
545    pub fn serialize_digest_msgpack(
546        digest: &GossipDigest,
547    ) -> Result<Vec<u8>, rmp_serde::encode::Error> {
548        rmp_serde::to_vec_named(digest)
549    }
550
551    #[cfg(feature = "gossip-msgpack")]
552    pub fn deserialize_digest_msgpack(
553        bytes: &[u8],
554    ) -> Result<GossipDigest, rmp_serde::decode::Error> {
555        rmp_serde::from_slice(bytes)
556    }
557}
558
559fn confidence_for_gene(assets: &[NetworkAsset], gene_id: &str) -> f32 {
560    assets
561        .iter()
562        .filter_map(|asset| match asset {
563            NetworkAsset::Capsule { capsule } if capsule.gene_id == gene_id => {
564                Some(capsule.confidence)
565            }
566            _ => None,
567        })
568        .fold(0.0_f32, f32::max)
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::EvolutionEnvelope;
575    use oris_evolution::{AssetState, Capsule, EnvFingerprint, Outcome};
576
577    fn sample_gene_asset(id: &str) -> NetworkAsset {
578        NetworkAsset::Gene {
579            gene: Gene {
580                id: id.to_string(),
581                signals: vec!["compiler:error[E0308]".to_string()],
582                strategy: vec!["fix type mismatch".to_string()],
583                validation: vec!["cargo test".to_string()],
584                state: AssetState::Promoted,
585                task_class_id: None,
586            },
587        }
588    }
589
590    fn sample_capsule_asset(id: &str, gene_id: &str, confidence: f32) -> NetworkAsset {
591        NetworkAsset::Capsule {
592            capsule: Capsule {
593                id: id.to_string(),
594                gene_id: gene_id.to_string(),
595                mutation_id: format!("mut-{id}"),
596                run_id: format!("run-{id}"),
597                diff_hash: format!("diff-{id}"),
598                confidence,
599                env: EnvFingerprint {
600                    rustc_version: "rustc 1.80.0".to_string(),
601                    cargo_lock_hash: "cargo-lock".to_string(),
602                    target_triple: "aarch64-apple-darwin".to_string(),
603                    os: "macos".to_string(),
604                },
605                outcome: Outcome {
606                    success: true,
607                    validation_profile: "default".to_string(),
608                    validation_duration_ms: 100,
609                    changed_files: vec!["src/lib.rs".to_string()],
610                    validator_hash: "validator".to_string(),
611                    lines_changed: 3,
612                    replay_verified: true,
613                },
614                state: AssetState::Promoted,
615            },
616        }
617    }
618
619    fn sample_envelope(gene_id: &str, confidence: f32) -> EvolutionEnvelope {
620        EvolutionEnvelope::publish(
621            "node-a",
622            vec![
623                sample_gene_asset(gene_id),
624                sample_capsule_asset(&format!("capsule-{gene_id}"), gene_id, confidence),
625            ],
626        )
627    }
628
629    #[test]
630    fn test_peer_registry_creation() {
631        let config = PeerConfig {
632            peers: vec![
633                PeerEndpoint {
634                    peer_id: "peer1".into(),
635                    endpoint: "http://peer1:8080".into(),
636                    public_key: None,
637                },
638                PeerEndpoint {
639                    peer_id: "peer2".into(),
640                    endpoint: "http://peer2:8080".into(),
641                    public_key: None,
642                },
643            ],
644            heartbeat_interval_secs: 30,
645            peer_timeout_secs: 10,
646            gossip_fanout: 3,
647        };
648
649        let registry = PeerRegistry::new(config, "local-peer".to_string());
650        let active = registry.get_active_peers();
651        assert_eq!(active.len(), 2);
652    }
653
654    #[test]
655    fn test_peer_failure_tracking() {
656        let config = PeerConfig {
657            peers: vec![PeerEndpoint {
658                peer_id: "peer1".into(),
659                endpoint: "http://peer1:8080".into(),
660                public_key: None,
661            }],
662            heartbeat_interval_secs: 30,
663            peer_timeout_secs: 10,
664            gossip_fanout: 3,
665        };
666
667        let registry = PeerRegistry::new(config, "local-peer".into());
668
669        // Simulate failures
670        registry.update_peer_status("peer1", false);
671        registry.update_peer_status("peer1", false);
672
673        let peers = registry.get_active_peers();
674        assert!(peers.is_empty()); // Status should be Suspected
675
676        // Recover
677        registry.update_peer_status("peer1", true);
678        let peers = registry.get_active_peers();
679        assert_eq!(peers.len(), 1);
680    }
681
682    #[test]
683    fn test_gossip_builder() {
684        let msg = GossipBuilder::new("peer1".to_string(), 1)
685            .asset_update("asset-123".to_string(), "gene".to_string())
686            .build();
687
688        assert!(msg.is_some());
689        let msg = msg.unwrap();
690        assert_eq!(msg.origin_peer, "peer1");
691        assert_eq!(msg.sequence, 1);
692    }
693
694    #[test]
695    fn gossip_digest_only_includes_genes_above_threshold() {
696        let engine = GossipSyncEngine::new(
697            "node-a",
698            GossipConfig {
699                peers: vec!["node-b".into()],
700                sync_interval_secs: 30,
701                broadcast_threshold: 0.8,
702            },
703        );
704        engine.register_envelope(sample_envelope("gene-high", 0.92));
705        engine.register_envelope(sample_envelope("gene-low", 0.42));
706
707        let digest = engine.build_digest();
708        assert_eq!(digest.genes.len(), 1);
709        assert_eq!(digest.genes[0].gene_id, "gene-high");
710        assert!(digest.genes[0].confidence >= 0.8);
711    }
712
713    #[test]
714    fn fetch_query_response_round_trip_returns_requested_gene() {
715        let producer = GossipSyncEngine::new("node-a", GossipConfig::default());
716        producer.register_envelope(sample_envelope("gene-1", 0.95));
717
718        let query = FetchQuery {
719            sender_id: "node-b".into(),
720            signals: vec!["gene-1".into()],
721            since_cursor: None,
722            resume_token: None,
723        };
724        let response = producer.respond_to_fetch(&query);
725
726        assert_eq!(response.sender_id, "node-a");
727        assert!(!response.assets.is_empty());
728        assert_eq!(response.sync_audit.applied_count, 1);
729        assert!(response.assets.iter().any(|asset| matches!(
730            asset,
731            NetworkAsset::Gene { gene } if gene.id == "gene-1"
732        )));
733    }
734
735    #[tokio::test]
736    async fn two_in_process_gossip_sync_engines_exchange_gene_within_one_cycle() {
737        let producer = GossipSyncEngine::new(
738            "node-a",
739            GossipConfig {
740                peers: vec!["node-b".into()],
741                sync_interval_secs: 30,
742                broadcast_threshold: 0.8,
743            },
744        );
745        let consumer = GossipSyncEngine::new(
746            "node-b",
747            GossipConfig {
748                peers: vec!["node-a".into()],
749                sync_interval_secs: 30,
750                broadcast_threshold: 0.8,
751            },
752        );
753
754        producer.register_envelope(sample_envelope("gene-sync", 0.91));
755        assert!(!consumer.has_gene("gene-sync"));
756
757        let report = consumer.sync_once_with(&producer).await;
758        assert_eq!(report.requested_gene_ids, vec!["gene-sync".to_string()]);
759        assert_eq!(report.imported_gene_ids, vec!["gene-sync".to_string()]);
760        assert!(consumer.has_gene("gene-sync"));
761    }
762}