Skip to main content

peat_mesh/discovery/
static_config.rs

1use super::{DiscoveryError, DiscoveryEvent, DiscoveryStrategy, PeerInfo, Result};
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::net::SocketAddr;
6use std::path::Path;
7use std::sync::Arc;
8use tokio::sync::{mpsc, RwLock};
9use tracing::{debug, info};
10
11/// Configuration for a static peer
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct StaticPeerConfig {
14    /// Unique node identifier
15    pub node_id: String,
16
17    /// Network addresses where peer can be reached
18    pub addresses: Vec<String>,
19
20    /// Optional relay URL for NAT traversal
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub relay_url: Option<String>,
23
24    /// Connection priority (0-255, higher = more important)
25    #[serde(default = "default_priority")]
26    pub priority: u8,
27
28    /// Additional metadata about the peer
29    #[serde(default)]
30    pub metadata: HashMap<String, String>,
31}
32
33fn default_priority() -> u8 {
34    128
35}
36
37/// Top-level discovery configuration
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct DiscoveryConfig {
40    /// List of static peers to discover
41    pub peers: Vec<StaticPeerConfig>,
42}
43
44/// Static configuration-based peer discovery
45///
46/// This strategy loads peer information from a configuration file
47/// and immediately reports all peers as discovered. Useful for:
48/// - EMCON (Emission Control) operations where broadcasting is not allowed
49/// - Pre-planned network topologies
50/// - Testing and development
51pub struct StaticDiscovery {
52    config: DiscoveryConfig,
53    peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
54    events_tx: mpsc::Sender<DiscoveryEvent>,
55    events_rx: Option<mpsc::Receiver<DiscoveryEvent>>,
56    started: Arc<RwLock<bool>>,
57}
58
59impl StaticDiscovery {
60    /// Create a new static discovery from a configuration file
61    pub fn from_file(path: &Path) -> Result<Self> {
62        let config_str = std::fs::read_to_string(path).map_err(|e| {
63            DiscoveryError::ConfigError(format!("Failed to read config file: {}", e))
64        })?;
65
66        let config: DiscoveryConfig = toml::from_str(&config_str).map_err(|e| {
67            DiscoveryError::ConfigError(format!("Failed to parse TOML config: {}", e))
68        })?;
69
70        Self::from_config(config)
71    }
72
73    /// Create a new static discovery from a configuration object
74    pub fn from_config(config: DiscoveryConfig) -> Result<Self> {
75        let (events_tx, events_rx) = mpsc::channel(100);
76
77        Ok(Self {
78            config,
79            peers: Arc::new(RwLock::new(HashMap::new())),
80            events_tx,
81            events_rx: Some(events_rx),
82            started: Arc::new(RwLock::new(false)),
83        })
84    }
85
86    /// Parse addresses from string format
87    fn parse_addresses(address_strs: &[String]) -> Vec<SocketAddr> {
88        address_strs
89            .iter()
90            .filter_map(|s| {
91                s.parse::<SocketAddr>()
92                    .map_err(|e| {
93                        debug!("Failed to parse address '{}': {}", s, e);
94                        e
95                    })
96                    .ok()
97            })
98            .collect()
99    }
100
101    /// Convert a StaticPeerConfig to PeerInfo
102    fn config_to_peer_info(peer_config: &StaticPeerConfig) -> Option<PeerInfo> {
103        let addresses = Self::parse_addresses(&peer_config.addresses);
104
105        if addresses.is_empty() {
106            debug!("Skipping peer {} - no valid addresses", peer_config.node_id);
107            return None;
108        }
109
110        Some(PeerInfo {
111            node_id: peer_config.node_id.clone(),
112            addresses,
113            relay_url: peer_config.relay_url.clone(),
114            last_seen: std::time::Instant::now(),
115            metadata: peer_config.metadata.clone(),
116        })
117    }
118}
119
120#[async_trait]
121impl DiscoveryStrategy for StaticDiscovery {
122    async fn start(&mut self) -> Result<()> {
123        let mut started = self.started.write().await;
124        if *started {
125            info!("Static discovery already started");
126            return Ok(());
127        }
128
129        info!(
130            "Starting static discovery with {} configured peers",
131            self.config.peers.len()
132        );
133
134        let mut peers = self.peers.write().await;
135
136        for peer_config in &self.config.peers {
137            if let Some(peer_info) = Self::config_to_peer_info(peer_config) {
138                let node_id = peer_info.node_id.clone();
139
140                info!(
141                    "Discovered static peer: {} at {:?} (priority: {})",
142                    node_id, peer_info.addresses, peer_config.priority
143                );
144
145                peers.insert(node_id.clone(), peer_info.clone());
146
147                // Notify about discovered peer
148                if let Err(e) = self
149                    .events_tx
150                    .send(DiscoveryEvent::PeerFound(peer_info))
151                    .await
152                {
153                    debug!("Failed to send discovery event: {}", e);
154                }
155            }
156        }
157
158        *started = true;
159
160        info!("Static discovery started with {} peers", peers.len());
161
162        Ok(())
163    }
164
165    async fn stop(&mut self) -> Result<()> {
166        let mut started = self.started.write().await;
167        if !*started {
168            return Ok(());
169        }
170
171        info!("Stopping static discovery");
172        *started = false;
173
174        Ok(())
175    }
176
177    async fn discovered_peers(&self) -> Vec<PeerInfo> {
178        self.peers.read().await.values().cloned().collect()
179    }
180
181    fn event_stream(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>> {
182        self.events_rx
183            .take()
184            .ok_or(DiscoveryError::EventStreamConsumed)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    fn create_test_config() -> DiscoveryConfig {
193        DiscoveryConfig {
194            peers: vec![
195                StaticPeerConfig {
196                    node_id: "hq-alpha".to_string(),
197                    addresses: vec![
198                        "10.0.0.100:5000".to_string(),
199                        "192.168.1.100:5000".to_string(),
200                    ],
201                    relay_url: Some("https://relay.example.com:3479".to_string()),
202                    priority: 255,
203                    metadata: {
204                        let mut m = HashMap::new();
205                        m.insert("role".to_string(), "company-hq".to_string());
206                        m
207                    },
208                },
209                StaticPeerConfig {
210                    node_id: "platoon-1".to_string(),
211                    addresses: vec!["10.0.1.50:5000".to_string()],
212                    relay_url: None,
213                    priority: 200,
214                    metadata: HashMap::new(),
215                },
216            ],
217        }
218    }
219
220    #[tokio::test]
221    async fn test_static_discovery_basic() {
222        let config = create_test_config();
223        let mut discovery = StaticDiscovery::from_config(config).unwrap();
224
225        // Get event stream before starting
226        let mut events = discovery.event_stream().unwrap();
227
228        // Start discovery
229        discovery.start().await.unwrap();
230
231        // Should discover 2 peers
232        let peers = discovery.discovered_peers().await;
233        assert_eq!(peers.len(), 2);
234
235        // Check first peer
236        let hq = peers.iter().find(|p| p.node_id == "hq-alpha").unwrap();
237        assert_eq!(hq.addresses.len(), 2);
238        assert_eq!(
239            hq.relay_url,
240            Some("https://relay.example.com:3479".to_string())
241        );
242
243        // Check events
244        let event1 = events.try_recv().unwrap();
245        let event2 = events.try_recv().unwrap();
246
247        assert!(matches!(event1, DiscoveryEvent::PeerFound(_)));
248        assert!(matches!(event2, DiscoveryEvent::PeerFound(_)));
249    }
250
251    #[tokio::test]
252    async fn test_parse_addresses() {
253        let addresses = vec![
254            "10.0.0.1:5000".to_string(),
255            "invalid".to_string(),
256            "192.168.1.1:8080".to_string(),
257        ];
258
259        let parsed = StaticDiscovery::parse_addresses(&addresses);
260        assert_eq!(parsed.len(), 2);
261    }
262
263    #[test]
264    fn test_toml_serialization() {
265        let config = create_test_config();
266        let toml_str = toml::to_string(&config).unwrap();
267
268        let parsed: DiscoveryConfig = toml::from_str(&toml_str).unwrap();
269        assert_eq!(parsed.peers.len(), 2);
270        assert_eq!(parsed.peers[0].node_id, "hq-alpha");
271    }
272
273    #[tokio::test]
274    async fn test_static_discovery_stop_when_not_started() {
275        let config = create_test_config();
276        let mut discovery = StaticDiscovery::from_config(config).unwrap();
277        // Stopping when not started is a no-op
278        discovery.stop().await.unwrap();
279        assert!(!*discovery.started.read().await);
280    }
281
282    #[tokio::test]
283    async fn test_static_discovery_start_twice_idempotent() {
284        let config = create_test_config();
285        let mut discovery = StaticDiscovery::from_config(config).unwrap();
286        let _events = discovery.event_stream().unwrap();
287
288        discovery.start().await.unwrap();
289        let peers_after_first = discovery.discovered_peers().await;
290
291        // Starting again should be idempotent
292        discovery.start().await.unwrap();
293        let peers_after_second = discovery.discovered_peers().await;
294
295        assert_eq!(peers_after_first.len(), peers_after_second.len());
296    }
297
298    #[tokio::test]
299    async fn test_static_discovery_event_stream_consumed() {
300        let config = create_test_config();
301        let mut discovery = StaticDiscovery::from_config(config).unwrap();
302
303        let _stream = discovery.event_stream().unwrap();
304        // Second call should fail
305        let result = discovery.event_stream();
306        assert!(result.is_err());
307        assert!(matches!(
308            result.unwrap_err(),
309            DiscoveryError::EventStreamConsumed
310        ));
311    }
312
313    #[test]
314    fn test_config_to_peer_info_no_valid_addresses() {
315        let peer_config = StaticPeerConfig {
316            node_id: "bad-peer".to_string(),
317            addresses: vec!["not-a-socket-addr".to_string()],
318            relay_url: None,
319            priority: 128,
320            metadata: HashMap::new(),
321        };
322
323        let result = StaticDiscovery::config_to_peer_info(&peer_config);
324        assert!(result.is_none());
325    }
326
327    #[test]
328    fn test_config_to_peer_info_with_relay() {
329        let peer_config = StaticPeerConfig {
330            node_id: "relay-peer".to_string(),
331            addresses: vec!["10.0.0.1:5000".to_string()],
332            relay_url: Some("https://relay.example.com".to_string()),
333            priority: 200,
334            metadata: {
335                let mut m = HashMap::new();
336                m.insert("role".to_string(), "hq".to_string());
337                m
338            },
339        };
340
341        let peer_info = StaticDiscovery::config_to_peer_info(&peer_config).unwrap();
342        assert_eq!(peer_info.node_id, "relay-peer");
343        assert_eq!(
344            peer_info.relay_url,
345            Some("https://relay.example.com".to_string())
346        );
347        assert_eq!(peer_info.metadata.get("role"), Some(&"hq".to_string()));
348    }
349
350    #[test]
351    fn test_from_file_nonexistent() {
352        let result = StaticDiscovery::from_file(std::path::Path::new("/nonexistent/path.toml"));
353        assert!(result.is_err());
354    }
355
356    #[test]
357    fn test_default_priority() {
358        assert_eq!(default_priority(), 128);
359    }
360
361    #[test]
362    fn test_static_peer_config_serde() {
363        let config = StaticPeerConfig {
364            node_id: "node-1".to_string(),
365            addresses: vec!["10.0.0.1:5000".to_string()],
366            relay_url: None,
367            priority: 128,
368            metadata: HashMap::new(),
369        };
370        let json = serde_json::to_string(&config).unwrap();
371        // relay_url should be skipped when None
372        assert!(!json.contains("relay_url"));
373
374        let parsed: StaticPeerConfig = serde_json::from_str(&json).unwrap();
375        assert_eq!(parsed.node_id, "node-1");
376        assert_eq!(parsed.priority, 128);
377    }
378}