peat_mesh/discovery/
mod.rs1use async_trait::async_trait;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::time::Instant;
13use thiserror::Error;
14use tokio::sync::mpsc;
15
16pub mod hybrid;
17#[cfg(feature = "kubernetes")]
18pub mod kubernetes;
19pub mod mdns;
20pub mod static_config;
21
22pub use hybrid::HybridDiscovery;
24#[cfg(feature = "kubernetes")]
25pub use kubernetes::{KubernetesDiscovery, KubernetesDiscoveryConfig};
26pub use mdns::MdnsDiscovery;
27pub use static_config::{DiscoveryConfig, StaticDiscovery, StaticPeerConfig};
28
29#[derive(Debug, Error)]
30pub enum DiscoveryError {
31 #[error("mDNS error: {0}")]
32 MdnsError(String),
33
34 #[error("Configuration error: {0}")]
35 ConfigError(String),
36
37 #[error("I/O error: {0}")]
38 IoError(#[from] std::io::Error),
39
40 #[error("Event stream already consumed")]
41 EventStreamConsumed,
42
43 #[error("Kubernetes API error: {0}")]
44 KubernetesError(String),
45}
46
47pub type Result<T> = std::result::Result<T, DiscoveryError>;
48
49fn instant_now() -> Instant {
51 Instant::now()
52}
53
54#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
56pub struct PeerInfo {
57 pub node_id: String,
59
60 pub addresses: Vec<SocketAddr>,
62
63 pub relay_url: Option<String>,
65
66 #[serde(skip, default = "instant_now")]
68 pub last_seen: Instant,
69
70 pub metadata: HashMap<String, String>,
72}
73
74impl PeerInfo {
75 pub fn new(node_id: String, addresses: Vec<SocketAddr>) -> Self {
76 Self {
77 node_id,
78 addresses,
79 relay_url: None,
80 last_seen: Instant::now(),
81 metadata: HashMap::new(),
82 }
83 }
84
85 pub fn with_relay(mut self, relay_url: String) -> Self {
86 self.relay_url = Some(relay_url);
87 self
88 }
89
90 pub fn with_metadata(mut self, key: String, value: String) -> Self {
91 self.metadata.insert(key, value);
92 self
93 }
94
95 pub fn update_last_seen(&mut self) {
96 self.last_seen = Instant::now();
97 }
98}
99
100#[derive(Clone, Debug)]
102#[allow(clippy::enum_variant_names)]
103pub enum DiscoveryEvent {
104 PeerFound(PeerInfo),
106
107 PeerLost(String), PeerUpdated(PeerInfo),
112}
113
114#[async_trait]
116pub trait DiscoveryStrategy: Send + Sync {
117 async fn start(&mut self) -> Result<()>;
119
120 async fn stop(&mut self) -> Result<()>;
122
123 async fn advertise(&self, _node_id: &str, _port: u16) -> Result<()> {
128 Ok(())
129 }
130
131 async fn discovered_peers(&self) -> Vec<PeerInfo>;
133
134 fn event_stream(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>>;
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[test]
144 fn test_peer_info_creation() {
145 let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
146 let peer = PeerInfo::new("test-node".to_string(), vec![addr])
147 .with_relay("https://relay.example.com".to_string())
148 .with_metadata("role".to_string(), "squad-leader".to_string());
149
150 assert_eq!(peer.node_id, "test-node");
151 assert_eq!(peer.addresses.len(), 1);
152 assert_eq!(
153 peer.relay_url,
154 Some("https://relay.example.com".to_string())
155 );
156 assert_eq!(peer.metadata.get("role"), Some(&"squad-leader".to_string()));
157 }
158
159 #[test]
160 fn test_peer_info_no_relay() {
161 let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
162 let peer = PeerInfo::new("node-2".to_string(), vec![addr]);
163 assert!(peer.relay_url.is_none());
164 assert!(peer.metadata.is_empty());
165 }
166
167 #[test]
168 fn test_peer_info_update_last_seen() {
169 let addr: SocketAddr = "127.0.0.1:5000".parse().unwrap();
170 let mut peer = PeerInfo::new("node-1".to_string(), vec![addr]);
171 let before = peer.last_seen;
172 std::thread::sleep(std::time::Duration::from_millis(2));
173 peer.update_last_seen();
174 assert!(peer.last_seen >= before);
175 }
176
177 #[test]
178 fn test_peer_info_multiple_metadata() {
179 let peer = PeerInfo::new("node-1".to_string(), vec![])
180 .with_metadata("role".to_string(), "leader".to_string())
181 .with_metadata("unit".to_string(), "alpha".to_string());
182
183 assert_eq!(peer.metadata.len(), 2);
184 assert_eq!(peer.metadata.get("unit"), Some(&"alpha".to_string()));
185 }
186
187 #[test]
188 fn test_discovery_error_display() {
189 let err = DiscoveryError::MdnsError("timeout".into());
190 assert_eq!(err.to_string(), "mDNS error: timeout");
191
192 let err = DiscoveryError::ConfigError("bad toml".into());
193 assert_eq!(err.to_string(), "Configuration error: bad toml");
194
195 let err = DiscoveryError::EventStreamConsumed;
196 assert_eq!(err.to_string(), "Event stream already consumed");
197 }
198
199 #[test]
200 fn test_discovery_error_from_io() {
201 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file gone");
202 let err: DiscoveryError = io_err.into();
203 assert!(err.to_string().contains("file gone"));
204 }
205
206 #[test]
207 fn test_discovery_event_variants() {
208 let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
209 let peer = PeerInfo::new("p1".to_string(), vec![addr]);
210
211 let found = DiscoveryEvent::PeerFound(peer.clone());
212 let updated = DiscoveryEvent::PeerUpdated(peer);
213 let lost = DiscoveryEvent::PeerLost("p1".to_string());
214
215 let _ = format!("{:?}", found);
217 let _ = format!("{:?}", updated);
218 let _ = format!("{:?}", lost);
219 }
220
221 #[test]
222 fn test_peer_info_serialization() {
223 let addr: SocketAddr = "10.0.0.1:5000".parse().unwrap();
224 let peer = PeerInfo::new("node-1".to_string(), vec![addr])
225 .with_relay("https://relay.example.com".to_string());
226 let json = serde_json::to_string(&peer).unwrap();
227 let deserialized: PeerInfo = serde_json::from_str(&json).unwrap();
228 assert_eq!(deserialized.node_id, "node-1");
229 assert_eq!(deserialized.addresses.len(), 1);
230 assert_eq!(
231 deserialized.relay_url,
232 Some("https://relay.example.com".to_string())
233 );
234 }
235}