Skip to main content

peat_mesh/discovery/
mod.rs

1//! Peer discovery strategies for mesh networks
2//!
3//! This module provides pluggable discovery mechanisms for finding peers:
4//! - **StaticDiscovery**: Pre-configured peer list from TOML files
5//! - **MdnsDiscovery**: Local network discovery via mDNS/DNS-SD
6//! - **HybridDiscovery**: Combines multiple strategies
7
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::time::Instant;
13use thiserror::Error;
14use tokio::sync::mpsc;
15
16pub mod hybrid;
17#[cfg(feature = "kubernetes")]
18pub mod kubernetes;
19pub mod mdns;
20pub mod static_config;
21
22// Re-export main types for convenience
23pub use hybrid::HybridDiscovery;
24#[cfg(feature = "kubernetes")]
25pub use kubernetes::{KubernetesDiscovery, KubernetesDiscoveryConfig};
26pub use mdns::MdnsDiscovery;
27pub use static_config::{DiscoveryConfig, StaticDiscovery, StaticPeerConfig};
28
29#[derive(Debug, Error)]
30pub enum DiscoveryError {
31    #[error("mDNS error: {0}")]
32    MdnsError(String),
33
34    #[error("Configuration error: {0}")]
35    ConfigError(String),
36
37    #[error("I/O error: {0}")]
38    IoError(#[from] std::io::Error),
39
40    #[error("Event stream already consumed")]
41    EventStreamConsumed,
42
43    #[error("Kubernetes API error: {0}")]
44    KubernetesError(String),
45}
46
47pub type Result<T> = std::result::Result<T, DiscoveryError>;
48
49// Helper function for serde default
50fn instant_now() -> Instant {
51    Instant::now()
52}
53
54/// Information about a discovered peer
55#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
56pub struct PeerInfo {
57    /// Unique identifier for the peer node
58    pub node_id: String,
59
60    /// Network addresses where the peer can be reached
61    pub addresses: Vec<SocketAddr>,
62
63    /// Optional relay server URL for NAT traversal
64    pub relay_url: Option<String>,
65
66    /// When this peer was last seen (not serialized)
67    #[serde(skip, default = "instant_now")]
68    pub last_seen: Instant,
69
70    /// Additional metadata about the peer
71    pub metadata: HashMap<String, String>,
72}
73
74impl PeerInfo {
75    pub fn new(node_id: String, addresses: Vec<SocketAddr>) -> Self {
76        Self {
77            node_id,
78            addresses,
79            relay_url: None,
80            last_seen: Instant::now(),
81            metadata: HashMap::new(),
82        }
83    }
84
85    pub fn with_relay(mut self, relay_url: String) -> Self {
86        self.relay_url = Some(relay_url);
87        self
88    }
89
90    pub fn with_metadata(mut self, key: String, value: String) -> Self {
91        self.metadata.insert(key, value);
92        self
93    }
94
95    pub fn update_last_seen(&mut self) {
96        self.last_seen = Instant::now();
97    }
98}
99
100/// Events emitted by discovery strategies
101#[derive(Clone, Debug)]
102#[allow(clippy::enum_variant_names)]
103pub enum DiscoveryEvent {
104    /// A new peer has been discovered
105    PeerFound(PeerInfo),
106
107    /// A previously discovered peer is no longer available
108    PeerLost(String), // node_id
109
110    /// A peer's information has been updated
111    PeerUpdated(PeerInfo),
112}
113
114/// Trait for peer discovery strategies
115#[async_trait]
116pub trait DiscoveryStrategy: Send + Sync {
117    /// Start the discovery process
118    async fn start(&mut self) -> Result<()>;
119
120    /// Stop discovery
121    async fn stop(&mut self) -> Result<()>;
122
123    /// Advertise this node so other peers can discover it.
124    ///
125    /// Not all strategies need this (e.g., Kubernetes uses EndpointSlices),
126    /// so the default is a no-op.
127    async fn advertise(&self, _node_id: &str, _port: u16) -> Result<()> {
128        Ok(())
129    }
130
131    /// Get currently discovered peers
132    async fn discovered_peers(&self) -> Vec<PeerInfo>;
133
134    /// Subscribe to discovery events
135    /// Note: This can only be called once per strategy instance
136    fn event_stream(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>>;
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[test]
144    fn test_peer_info_creation() {
145        let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
146        let peer = PeerInfo::new("test-node".to_string(), vec![addr])
147            .with_relay("https://relay.example.com".to_string())
148            .with_metadata("role".to_string(), "squad-leader".to_string());
149
150        assert_eq!(peer.node_id, "test-node");
151        assert_eq!(peer.addresses.len(), 1);
152        assert_eq!(
153            peer.relay_url,
154            Some("https://relay.example.com".to_string())
155        );
156        assert_eq!(peer.metadata.get("role"), Some(&"squad-leader".to_string()));
157    }
158
159    #[test]
160    fn test_peer_info_no_relay() {
161        let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
162        let peer = PeerInfo::new("node-2".to_string(), vec![addr]);
163        assert!(peer.relay_url.is_none());
164        assert!(peer.metadata.is_empty());
165    }
166
167    #[test]
168    fn test_peer_info_update_last_seen() {
169        let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
170        let mut peer = PeerInfo::new("node-1".to_string(), vec![addr]);
171        let before = peer.last_seen;
172        std::thread::sleep(std::time::Duration::from_millis(2));
173        peer.update_last_seen();
174        assert!(peer.last_seen >= before);
175    }
176
177    #[test]
178    fn test_peer_info_multiple_metadata() {
179        let peer = PeerInfo::new("node-1".to_string(), vec![])
180            .with_metadata("role".to_string(), "leader".to_string())
181            .with_metadata("unit".to_string(), "alpha".to_string());
182
183        assert_eq!(peer.metadata.len(), 2);
184        assert_eq!(peer.metadata.get("unit"), Some(&"alpha".to_string()));
185    }
186
187    #[test]
188    fn test_discovery_error_display() {
189        let err = DiscoveryError::MdnsError("timeout".into());
190        assert_eq!(err.to_string(), "mDNS error: timeout");
191
192        let err = DiscoveryError::ConfigError("bad toml".into());
193        assert_eq!(err.to_string(), "Configuration error: bad toml");
194
195        let err = DiscoveryError::EventStreamConsumed;
196        assert_eq!(err.to_string(), "Event stream already consumed");
197    }
198
199    #[test]
200    fn test_discovery_error_from_io() {
201        let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file gone");
202        let err: DiscoveryError = io_err.into();
203        assert!(err.to_string().contains("file gone"));
204    }
205
206    #[test]
207    fn test_discovery_event_variants() {
208        let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
209        let peer = PeerInfo::new("p1".to_string(), vec![addr]);
210
211        let found = DiscoveryEvent::PeerFound(peer.clone());
212        let updated = DiscoveryEvent::PeerUpdated(peer);
213        let lost = DiscoveryEvent::PeerLost("p1".to_string());
214
215        // Just verify Debug works (no panics)
216        let _ = format!("{:?}", found);
217        let _ = format!("{:?}", updated);
218        let _ = format!("{:?}", lost);
219    }
220
221    #[test]
222    fn test_peer_info_serialization() {
223        let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
224        let peer = PeerInfo::new("node-1".to_string(), vec![addr])
225            .with_relay("https://relay.example.com".to_string());
226        let json = serde_json::to_string(&peer).unwrap();
227        let deserialized: PeerInfo = serde_json::from_str(&json).unwrap();
228        assert_eq!(deserialized.node_id, "node-1");
229        assert_eq!(deserialized.addresses.len(), 1);
230        assert_eq!(
231            deserialized.relay_url,
232            Some("https://relay.example.com".to_string())
233        );
234    }
235}