Skip to main content

oris_evolution_network/
gossip.rs

1//! Peer discovery and gossip protocol for the Evolution Network.
2//!
3//! This module provides:
4//! - Static peer list configuration
5//! - Peer health monitoring
6//! - Basic gossip protocol for event propagation
7
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use chrono::Utc;
13use serde::{Deserialize, Serialize};
14
15/// Configuration for peer discovery
16#[derive(Clone, Debug, Deserialize, Serialize)]
17pub struct PeerConfig {
18    /// List of peer endpoints for discovery
19    pub peers: Vec<PeerEndpoint>,
20    /// Heartbeat interval for peer health checks
21    #[serde(default = "default_heartbeat_interval")]
22    pub heartbeat_interval_secs: u64,
23    /// Timeout for peer responses
24    #[serde(default = "default_peer_timeout_secs")]
25    pub peer_timeout_secs: u64,
26    /// Gossip fanout (number of peers to spread messages to)
27    #[serde(default = "default_fanout")]
28    pub gossip_fanout: usize,
29}
30
31fn default_heartbeat_interval() -> u64 {
32    30
33}
34fn default_peer_timeout_secs() -> u64 {
35    10
36}
37fn default_fanout() -> usize {
38    3
39}
40
41/// A peer endpoint in the network
42#[derive(Clone, Debug, Deserialize, Serialize)]
43pub struct PeerEndpoint {
44    /// Unique identifier for the peer
45    pub peer_id: String,
46    /// HTTP endpoint for the peer
47    pub endpoint: String,
48    /// Optional public key for authentication
49    pub public_key: Option<String>,
50}
51
52/// Status of a peer
53#[derive(Clone, Debug, PartialEq)]
54pub enum PeerStatus {
55    /// Peer is active and responding
56    Active,
57    /// Peer is suspected to be offline
58    Suspected,
59    /// Peer is confirmed offline
60    Offline,
61}
62
63/// Information about a known peer
64#[derive(Clone, Debug)]
65pub struct PeerInfo {
66    pub endpoint: PeerEndpoint,
67    pub status: PeerStatus,
68    pub last_seen: Instant,
69    pub last_heartbeat: Option<Instant>,
70    pub failure_count: u32,
71}
72
73impl PeerInfo {
74    pub fn new(endpoint: PeerEndpoint) -> Self {
75        Self {
76            endpoint,
77            status: PeerStatus::Active,
78            last_seen: Instant::now(),
79            last_heartbeat: None,
80            failure_count: 0,
81        }
82    }
83
84    pub fn mark_failure(&mut self) {
85        self.failure_count += 1;
86        if self.failure_count >= 3 {
87            self.status = PeerStatus::Offline;
88        } else {
89            self.status = PeerStatus::Suspected;
90        }
91    }
92
93    pub fn mark_success(&mut self) {
94        self.failure_count = 0;
95        self.status = PeerStatus::Active;
96        self.last_seen = Instant::now();
97    }
98}
99
100/// Gossip message for peer-to-peer communication
101#[derive(Clone, Debug, Deserialize, Serialize)]
102pub struct GossipMessage {
103    /// Unique message identifier
104    pub message_id: String,
105    /// Origin peer ID
106    pub origin_peer: String,
107    /// Sequence number for ordering
108    pub sequence: u64,
109    /// Message type
110    pub kind: GossipKind,
111    /// Timestamp
112    pub timestamp: String,
113    /// Message payload (JSON)
114    pub payload: String,
115}
116
117/// Types of gossip messages
118#[derive(Clone, Debug, Deserialize, Serialize)]
119#[serde(tag = "type", rename_all = "snake_case")]
120pub enum GossipKind {
121    /// Peer advertisement
122    Advertisement { peer_id: String, endpoint: String },
123    /// Asset update (gene, capsule, event)
124    AssetUpdate {
125        asset_id: String,
126        asset_type: String,
127    },
128    /// State synchronization request
129    SyncRequest { since_sequence: u64 },
130    /// State synchronization response
131    SyncResponse { assets: Vec<String> },
132    /// Peer leave notification
133    Leave { peer_id: String },
134}
135
136/// Peer registry for managing known peers
137#[derive(Clone)]
138pub struct PeerRegistry {
139    peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
140    config: PeerConfig,
141    local_peer_id: String,
142}
143
144impl PeerRegistry {
145    /// Create a new peer registry from config
146    pub fn new(config: PeerConfig, local_peer_id: String) -> Self {
147        let peers: HashMap<String, PeerInfo> = config
148            .peers
149            .iter()
150            .map(|e| (e.peer_id.clone(), PeerInfo::new(e.clone())))
151            .collect();
152
153        Self {
154            peers: Arc::new(RwLock::new(peers)),
155            config,
156            local_peer_id,
157        }
158    }
159
160    /// Get all active peers
161    pub fn get_active_peers(&self) -> Vec<PeerEndpoint> {
162        self.peers
163            .read()
164            .unwrap()
165            .values()
166            .filter(|p| p.status == PeerStatus::Active)
167            .map(|p| p.endpoint.clone())
168            .collect()
169    }
170
171    /// Get a random sample of peers for gossip
172    pub fn get_gossip_peers(&self, count: usize) -> Vec<PeerEndpoint> {
173        let peers = self.peers.read().unwrap();
174        let active: Vec<_> = peers
175            .values()
176            .filter(|p| p.status == PeerStatus::Active)
177            .filter(|p| p.endpoint.peer_id != self.local_peer_id)
178            .map(|p| p.endpoint.clone())
179            .collect();
180
181        if active.is_empty() {
182            return vec![];
183        }
184
185        // Simple round-robin selection
186        let count = count.min(active.len());
187        active.into_iter().take(count).collect()
188    }
189
190    /// Update peer status based on heartbeat
191    pub fn update_peer_status(&self, peer_id: &str, is_alive: bool) {
192        let mut peers = self.peers.write().unwrap();
193        if let Some(peer) = peers.get_mut(peer_id) {
194            if is_alive {
195                peer.mark_success();
196                peer.last_heartbeat = Some(Instant::now());
197            } else {
198                peer.mark_failure();
199            }
200        }
201    }
202
203    /// Add a new peer discovered via gossip
204    pub fn add_peer(&self, endpoint: PeerEndpoint) {
205        let mut peers = self.peers.write().unwrap();
206        if !peers.contains_key(&endpoint.peer_id) {
207            peers.insert(endpoint.peer_id.clone(), PeerInfo::new(endpoint));
208        }
209    }
210
211    /// Remove a peer
212    pub fn remove_peer(&self, peer_id: &str) {
213        let mut peers = self.peers.write().unwrap();
214        peers.remove(peer_id);
215    }
216
217    /// Get local peer ID
218    pub fn local_peer_id(&self) -> &str {
219        &self.local_peer_id
220    }
221
222    /// Get config
223    pub fn config(&self) -> &PeerConfig {
224        &self.config
225    }
226}
227
228/// Builder for creating gossip messages
229pub struct GossipBuilder {
230    origin_peer: String,
231    sequence: u64,
232    kind: Option<GossipKind>,
233    payload: Option<String>,
234}
235
236impl GossipBuilder {
237    pub fn new(origin_peer: String, sequence: u64) -> Self {
238        Self {
239            origin_peer,
240            sequence,
241            kind: None,
242            payload: None,
243        }
244    }
245
246    pub fn advertisement(mut self, peer_id: String, endpoint: String) -> Self {
247        self.kind = Some(GossipKind::Advertisement { peer_id, endpoint });
248        self
249    }
250
251    pub fn asset_update(mut self, asset_id: String, asset_type: String) -> Self {
252        self.kind = Some(GossipKind::AssetUpdate {
253            asset_id,
254            asset_type,
255        });
256        self
257    }
258
259    pub fn sync_request(mut self, since_sequence: u64) -> Self {
260        self.kind = Some(GossipKind::SyncRequest { since_sequence });
261        self
262    }
263
264    pub fn sync_response(mut self, assets: Vec<String>) -> Self {
265        self.kind = Some(GossipKind::SyncResponse { assets });
266        self
267    }
268
269    pub fn leave(mut self, peer_id: String) -> Self {
270        self.kind = Some(GossipKind::Leave { peer_id });
271        self
272    }
273
274    pub fn payload(mut self, payload: String) -> Self {
275        self.payload = Some(payload);
276        self
277    }
278
279    pub fn build(self) -> Option<GossipMessage> {
280        let kind = self.kind?;
281        let payload = self
282            .payload
283            .unwrap_or_else(|| serde_json::to_string(&kind).unwrap_or_default());
284
285        Some(GossipMessage {
286            message_id: format!(
287                "gossip-{:x}",
288                Utc::now().timestamp_nanos_opt().unwrap_or_default()
289            ),
290            origin_peer: self.origin_peer,
291            sequence: self.sequence,
292            kind,
293            timestamp: Utc::now().to_rfc3339(),
294            payload,
295        })
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_peer_registry_creation() {
305        let config = PeerConfig {
306            peers: vec![
307                PeerEndpoint {
308                    peer_id: "peer1".into(),
309                    endpoint: "http://peer1:8080".into(),
310                    public_key: None,
311                },
312                PeerEndpoint {
313                    peer_id: "peer2".into(),
314                    endpoint: "http://peer2:8080".into(),
315                    public_key: None,
316                },
317            ],
318            heartbeat_interval_secs: 30,
319            peer_timeout_secs: 10,
320            gossip_fanout: 3,
321        };
322
323        let registry = PeerRegistry::new(config, "local-peer".to_string());
324        let active = registry.get_active_peers();
325        assert_eq!(active.len(), 2);
326    }
327
328    #[test]
329    fn test_peer_failure_tracking() {
330        let config = PeerConfig {
331            peers: vec![PeerEndpoint {
332                peer_id: "peer1".into(),
333                endpoint: "http://peer1:8080".into(),
334                public_key: None,
335            }],
336            heartbeat_interval_secs: 30,
337            peer_timeout_secs: 10,
338            gossip_fanout: 3,
339        };
340
341        let registry = PeerRegistry::new(config, "local-peer".into());
342
343        // Simulate failures
344        registry.update_peer_status("peer1", false);
345        registry.update_peer_status("peer1", false);
346
347        let peers = registry.get_active_peers();
348        assert!(peers.is_empty()); // Status should be Suspected
349
350        // Recover
351        registry.update_peer_status("peer1", true);
352        let peers = registry.get_active_peers();
353        assert_eq!(peers.len(), 1);
354    }
355
356    #[test]
357    fn test_gossip_builder() {
358        let msg = GossipBuilder::new("peer1".to_string(), 1)
359            .asset_update("asset-123".to_string(), "gene".to_string())
360            .build();
361
362        assert!(msg.is_some());
363        let msg = msg.unwrap();
364        assert_eq!(msg.origin_peer, "peer1");
365        assert_eq!(msg.sequence, 1);
366    }
367}