1use super::{DiscoveryError, DiscoveryEvent, DiscoveryStrategy, PeerInfo, Result};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::net::SocketAddr;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::sync::{mpsc, RwLock};
9use tracing::{debug, info};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct StaticPeerConfig {
14 pub node_id: String,
16
17 pub addresses: Vec<String>,
19
20 #[serde(skip_serializing_if = "Option::is_none")]
22 pub relay_url: Option<String>,
23
24 #[serde(default = "default_priority")]
26 pub priority: u8,
27
28 #[serde(default)]
30 pub metadata: HashMap<String, String>,
31}
32
33fn default_priority() -> u8 {
34 128
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct DiscoveryConfig {
40 pub peers: Vec<StaticPeerConfig>,
42}
43
44pub struct StaticDiscovery {
52 config: DiscoveryConfig,
53 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
54 events_tx: mpsc::Sender<DiscoveryEvent>,
55 events_rx: Option<mpsc::Receiver<DiscoveryEvent>>,
56 started: Arc<RwLock<bool>>,
57}
58
59impl StaticDiscovery {
60 pub fn from_file(path: &Path) -> Result<Self> {
62 let config_str = std::fs::read_to_string(path).map_err(|e| {
63 DiscoveryError::ConfigError(format!("Failed to read config file: {}", e))
64 })?;
65
66 let config: DiscoveryConfig = toml::from_str(&config_str).map_err(|e| {
67 DiscoveryError::ConfigError(format!("Failed to parse TOML config: {}", e))
68 })?;
69
70 Self::from_config(config)
71 }
72
73 pub fn from_config(config: DiscoveryConfig) -> Result<Self> {
75 let (events_tx, events_rx) = mpsc::channel(100);
76
77 Ok(Self {
78 config,
79 peers: Arc::new(RwLock::new(HashMap::new())),
80 events_tx,
81 events_rx: Some(events_rx),
82 started: Arc::new(RwLock::new(false)),
83 })
84 }
85
86 fn parse_addresses(address_strs: &[String]) -> Vec<SocketAddr> {
88 address_strs
89 .iter()
90 .filter_map(|s| {
91 s.parse::<SocketAddr>()
92 .map_err(|e| {
93 debug!("Failed to parse address '{}': {}", s, e);
94 e
95 })
96 .ok()
97 })
98 .collect()
99 }
100
101 fn config_to_peer_info(peer_config: &StaticPeerConfig) -> Option<PeerInfo> {
103 let addresses = Self::parse_addresses(&peer_config.addresses);
104
105 if addresses.is_empty() {
106 debug!("Skipping peer {} - no valid addresses", peer_config.node_id);
107 return None;
108 }
109
110 Some(PeerInfo {
111 node_id: peer_config.node_id.clone(),
112 addresses,
113 relay_url: peer_config.relay_url.clone(),
114 last_seen: std::time::Instant::now(),
115 metadata: peer_config.metadata.clone(),
116 })
117 }
118}
119
120#[async_trait]
121impl DiscoveryStrategy for StaticDiscovery {
122 async fn start(&mut self) -> Result<()> {
123 let mut started = self.started.write().await;
124 if *started {
125 info!("Static discovery already started");
126 return Ok(());
127 }
128
129 info!(
130 "Starting static discovery with {} configured peers",
131 self.config.peers.len()
132 );
133
134 let mut peers = self.peers.write().await;
135
136 for peer_config in &self.config.peers {
137 if let Some(peer_info) = Self::config_to_peer_info(peer_config) {
138 let node_id = peer_info.node_id.clone();
139
140 info!(
141 "Discovered static peer: {} at {:?} (priority: {})",
142 node_id, peer_info.addresses, peer_config.priority
143 );
144
145 peers.insert(node_id.clone(), peer_info.clone());
146
147 if let Err(e) = self
149 .events_tx
150 .send(DiscoveryEvent::PeerFound(peer_info))
151 .await
152 {
153 debug!("Failed to send discovery event: {}", e);
154 }
155 }
156 }
157
158 *started = true;
159
160 info!("Static discovery started with {} peers", peers.len());
161
162 Ok(())
163 }
164
165 async fn stop(&mut self) -> Result<()> {
166 let mut started = self.started.write().await;
167 if !*started {
168 return Ok(());
169 }
170
171 info!("Stopping static discovery");
172 *started = false;
173
174 Ok(())
175 }
176
177 async fn discovered_peers(&self) -> Vec<PeerInfo> {
178 self.peers.read().await.values().cloned().collect()
179 }
180
181 fn event_stream(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>> {
182 self.events_rx
183 .take()
184 .ok_or(DiscoveryError::EventStreamConsumed)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 fn create_test_config() -> DiscoveryConfig {
193 DiscoveryConfig {
194 peers: vec![
195 StaticPeerConfig {
196 node_id: "hq-alpha".to_string(),
197 addresses: vec![
198 "10.0.0.100:5000".to_string(),
199 "192.168.1.100:5000".to_string(),
200 ],
201 relay_url: Some("https://relay.example.com:3479".to_string()),
202 priority: 255,
203 metadata: {
204 let mut m = HashMap::new();
205 m.insert("role".to_string(), "company-hq".to_string());
206 m
207 },
208 },
209 StaticPeerConfig {
210 node_id: "platoon-1".to_string(),
211 addresses: vec!["10.0.1.50:5000".to_string()],
212 relay_url: None,
213 priority: 200,
214 metadata: HashMap::new(),
215 },
216 ],
217 }
218 }
219
220 #[tokio::test]
221 async fn test_static_discovery_basic() {
222 let config = create_test_config();
223 let mut discovery = StaticDiscovery::from_config(config).unwrap();
224
225 let mut events = discovery.event_stream().unwrap();
227
228 discovery.start().await.unwrap();
230
231 let peers = discovery.discovered_peers().await;
233 assert_eq!(peers.len(), 2);
234
235 let hq = peers.iter().find(|p| p.node_id == "hq-alpha").unwrap();
237 assert_eq!(hq.addresses.len(), 2);
238 assert_eq!(
239 hq.relay_url,
240 Some("https://relay.example.com:3479".to_string())
241 );
242
243 let event1 = events.try_recv().unwrap();
245 let event2 = events.try_recv().unwrap();
246
247 assert!(matches!(event1, DiscoveryEvent::PeerFound(_)));
248 assert!(matches!(event2, DiscoveryEvent::PeerFound(_)));
249 }
250
251 #[tokio::test]
252 async fn test_parse_addresses() {
253 let addresses = vec![
254 "10.0.0.1:5000".to_string(),
255 "invalid".to_string(),
256 "192.168.1.1:8080".to_string(),
257 ];
258
259 let parsed = StaticDiscovery::parse_addresses(&addresses);
260 assert_eq!(parsed.len(), 2);
261 }
262
263 #[test]
264 fn test_toml_serialization() {
265 let config = create_test_config();
266 let toml_str = toml::to_string(&config).unwrap();
267
268 let parsed: DiscoveryConfig = toml::from_str(&toml_str).unwrap();
269 assert_eq!(parsed.peers.len(), 2);
270 assert_eq!(parsed.peers[0].node_id, "hq-alpha");
271 }
272
273 #[tokio::test]
274 async fn test_static_discovery_stop_when_not_started() {
275 let config = create_test_config();
276 let mut discovery = StaticDiscovery::from_config(config).unwrap();
277 discovery.stop().await.unwrap();
279 assert!(!*discovery.started.read().await);
280 }
281
282 #[tokio::test]
283 async fn test_static_discovery_start_twice_idempotent() {
284 let config = create_test_config();
285 let mut discovery = StaticDiscovery::from_config(config).unwrap();
286 let _events = discovery.event_stream().unwrap();
287
288 discovery.start().await.unwrap();
289 let peers_after_first = discovery.discovered_peers().await;
290
291 discovery.start().await.unwrap();
293 let peers_after_second = discovery.discovered_peers().await;
294
295 assert_eq!(peers_after_first.len(), peers_after_second.len());
296 }
297
298 #[tokio::test]
299 async fn test_static_discovery_event_stream_consumed() {
300 let config = create_test_config();
301 let mut discovery = StaticDiscovery::from_config(config).unwrap();
302
303 let _stream = discovery.event_stream().unwrap();
304 let result = discovery.event_stream();
306 assert!(result.is_err());
307 assert!(matches!(
308 result.unwrap_err(),
309 DiscoveryError::EventStreamConsumed
310 ));
311 }
312
313 #[test]
314 fn test_config_to_peer_info_no_valid_addresses() {
315 let peer_config = StaticPeerConfig {
316 node_id: "bad-peer".to_string(),
317 addresses: vec!["not-a-socket-addr".to_string()],
318 relay_url: None,
319 priority: 128,
320 metadata: HashMap::new(),
321 };
322
323 let result = StaticDiscovery::config_to_peer_info(&peer_config);
324 assert!(result.is_none());
325 }
326
327 #[test]
328 fn test_config_to_peer_info_with_relay() {
329 let peer_config = StaticPeerConfig {
330 node_id: "relay-peer".to_string(),
331 addresses: vec!["10.0.0.1:5000".to_string()],
332 relay_url: Some("https://relay.example.com".to_string()),
333 priority: 200,
334 metadata: {
335 let mut m = HashMap::new();
336 m.insert("role".to_string(), "hq".to_string());
337 m
338 },
339 };
340
341 let peer_info = StaticDiscovery::config_to_peer_info(&peer_config).unwrap();
342 assert_eq!(peer_info.node_id, "relay-peer");
343 assert_eq!(
344 peer_info.relay_url,
345 Some("https://relay.example.com".to_string())
346 );
347 assert_eq!(peer_info.metadata.get("role"), Some(&"hq".to_string()));
348 }
349
350 #[test]
351 fn test_from_file_nonexistent() {
352 let result = StaticDiscovery::from_file(std::path::Path::new("/nonexistent/path.toml"));
353 assert!(result.is_err());
354 }
355
356 #[test]
357 fn test_default_priority() {
358 assert_eq!(default_priority(), 128);
359 }
360
361 #[test]
362 fn test_static_peer_config_serde() {
363 let config = StaticPeerConfig {
364 node_id: "node-1".to_string(),
365 addresses: vec!["10.0.0.1:5000".to_string()],
366 relay_url: None,
367 priority: 128,
368 metadata: HashMap::new(),
369 };
370 let json = serde_json::to_string(&config).unwrap();
371 assert!(!json.contains("relay_url"));
373
374 let parsed: StaticPeerConfig = serde_json::from_str(&json).unwrap();
375 assert_eq!(parsed.node_id, "node-1");
376 assert_eq!(parsed.priority, 128);
377 }
378}