Skip to main content

peat_mesh/discovery/
mod.rs

1//! Peer discovery strategies for mesh networks
2//!
3//! This module provides pluggable discovery mechanisms for finding peers:
4//! - **StaticDiscovery**: Pre-configured peer list from TOML files
5//! - **MdnsDiscovery**: Local network discovery via mDNS/DNS-SD
6//! - **HybridDiscovery**: Combines multiple strategies
7
8use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::time::Instant;
13use thiserror::Error;
14use tokio::sync::mpsc;
15
16pub mod hybrid;
17#[cfg(feature = "kubernetes")]
18pub mod kubernetes;
19pub mod mdns;
20pub mod static_config;
21
22// Re-export main types for convenience
23pub use hybrid::HybridDiscovery;
24#[cfg(feature = "kubernetes")]
25pub use kubernetes::{KubernetesDiscovery, KubernetesDiscoveryConfig};
26pub use mdns::MdnsDiscovery;
27pub use static_config::{DiscoveryConfig, StaticDiscovery, StaticPeerConfig};
28
29#[derive(Debug, Error)]
30pub enum DiscoveryError {
31    #[error("mDNS error: {0}")]
32    MdnsError(String),
33
34    #[error("Configuration error: {0}")]
35    ConfigError(String),
36
37    #[error("I/O error: {0}")]
38    IoError(#[from] std::io::Error),
39
40    #[error("Event stream already consumed")]
41    EventStreamConsumed,
42
43    #[error("Kubernetes API error: {0}")]
44    KubernetesError(String),
45}
46
47pub type Result<T> = std::result::Result<T, DiscoveryError>;
48
49// Helper function for serde default
50fn instant_now() -> Instant {
51    Instant::now()
52}
53
54/// Information about a discovered peer
55#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
56pub struct PeerInfo {
57    /// Unique identifier for the peer node
58    pub node_id: String,
59
60    /// Network addresses where the peer can be reached
61    pub addresses: Vec<SocketAddr>,
62
63    /// Optional relay server URL for NAT traversal
64    pub relay_url: Option<String>,
65
66    /// When this peer was last seen (not serialized)
67    #[serde(skip, default = "instant_now")]
68    pub last_seen: Instant,
69
70    /// Additional metadata about the peer
71    pub metadata: HashMap<String, String>,
72}
73
74impl PeerInfo {
75    pub fn new(node_id: String, addresses: Vec<SocketAddr>) -> Self {
76        Self {
77            node_id,
78            addresses,
79            relay_url: None,
80            last_seen: Instant::now(),
81            metadata: HashMap::new(),
82        }
83    }
84
85    pub fn with_relay(mut self, relay_url: String) -> Self {
86        self.relay_url = Some(relay_url);
87        self
88    }
89
90    pub fn with_metadata(mut self, key: String, value: String) -> Self {
91        self.metadata.insert(key, value);
92        self
93    }
94
95    pub fn update_last_seen(&mut self) {
96        self.last_seen = Instant::now();
97    }
98}
99
100/// Events emitted by discovery strategies
101#[derive(Clone, Debug)]
102#[allow(clippy::enum_variant_names)]
103pub enum DiscoveryEvent {
104    /// A new peer has been discovered
105    PeerFound(PeerInfo),
106
107    /// A previously discovered peer is no longer available
108    PeerLost(String), // node_id
109
110    /// A peer's information has been updated
111    PeerUpdated(PeerInfo),
112}
113
114/// Trait for peer discovery strategies
115#[async_trait]
116pub trait DiscoveryStrategy: Send + Sync {
117    /// Start the discovery process
118    async fn start(&mut self) -> Result<()>;
119
120    /// Stop discovery
121    async fn stop(&mut self) -> Result<()>;
122
123    /// Get currently discovered peers
124    async fn discovered_peers(&self) -> Vec<PeerInfo>;
125
126    /// Subscribe to discovery events
127    /// Note: This can only be called once per strategy instance
128    fn event_stream(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>>;
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn test_peer_info_creation() {
137        let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
138        let peer = PeerInfo::new("test-node".to_string(), vec![addr])
139            .with_relay("https://relay.example.com".to_string())
140            .with_metadata("role".to_string(), "squad-leader".to_string());
141
142        assert_eq!(peer.node_id, "test-node");
143        assert_eq!(peer.addresses.len(), 1);
144        assert_eq!(
145            peer.relay_url,
146            Some("https://relay.example.com".to_string())
147        );
148        assert_eq!(peer.metadata.get("role"), Some(&"squad-leader".to_string()));
149    }
150
151    #[test]
152    fn test_peer_info_no_relay() {
153        let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
154        let peer = PeerInfo::new("node-2".to_string(), vec![addr]);
155        assert!(peer.relay_url.is_none());
156        assert!(peer.metadata.is_empty());
157    }
158
159    #[test]
160    fn test_peer_info_update_last_seen() {
161        let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
162        let mut peer = PeerInfo::new("node-1".to_string(), vec![addr]);
163        let before = peer.last_seen;
164        std::thread::sleep(std::time::Duration::from_millis(2));
165        peer.update_last_seen();
166        assert!(peer.last_seen >= before);
167    }
168
169    #[test]
170    fn test_peer_info_multiple_metadata() {
171        let peer = PeerInfo::new("node-1".to_string(), vec![])
172            .with_metadata("role".to_string(), "leader".to_string())
173            .with_metadata("unit".to_string(), "alpha".to_string());
174
175        assert_eq!(peer.metadata.len(), 2);
176        assert_eq!(peer.metadata.get("unit"), Some(&"alpha".to_string()));
177    }
178
179    #[test]
180    fn test_discovery_error_display() {
181        let err = DiscoveryError::MdnsError("timeout".into());
182        assert_eq!(err.to_string(), "mDNS error: timeout");
183
184        let err = DiscoveryError::ConfigError("bad toml".into());
185        assert_eq!(err.to_string(), "Configuration error: bad toml");
186
187        let err = DiscoveryError::EventStreamConsumed;
188        assert_eq!(err.to_string(), "Event stream already consumed");
189    }
190
191    #[test]
192    fn test_discovery_error_from_io() {
193        let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file gone");
194        let err: DiscoveryError = io_err.into();
195        assert!(err.to_string().contains("file gone"));
196    }
197
198    #[test]
199    fn test_discovery_event_variants() {
200        let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
201        let peer = PeerInfo::new("p1".to_string(), vec![addr]);
202
203        let found = DiscoveryEvent::PeerFound(peer.clone());
204        let updated = DiscoveryEvent::PeerUpdated(peer);
205        let lost = DiscoveryEvent::PeerLost("p1".to_string());
206
207        // Just verify Debug works (no panics)
208        let _ = format!("{:?}", found);
209        let _ = format!("{:?}", updated);
210        let _ = format!("{:?}", lost);
211    }
212
213    #[test]
214    fn test_peer_info_serialization() {
215        let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
216        let peer = PeerInfo::new("node-1".to_string(), vec![addr])
217            .with_relay("https://relay.example.com".to_string());
218        let json = serde_json::to_string(&peer).unwrap();
219        let deserialized: PeerInfo = serde_json::from_str(&json).unwrap();
220        assert_eq!(deserialized.node_id, "node-1");
221        assert_eq!(deserialized.addresses.len(), 1);
222        assert_eq!(
223            deserialized.relay_url,
224            Some("https://relay.example.com".to_string())
225        );
226    }
227}