umicp_core/
discovery.rs

1/*!
2# Service Discovery
3
4Basic service discovery for peer-to-peer networks.
5Allows peers to discover and connect to each other based on capabilities and metadata.
6*/
7
8use crate::{Envelope, OperationType};
9#[cfg(feature = "websocket")]
10use crate::peer::PeerInfo;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16/// Discovered service information
17#[derive(Debug, Clone)]
18pub struct ServiceInfo {
19    /// Unique service ID
20    pub service_id: String,
21    /// Service name
22    pub name: String,
23    /// Service address (URL)
24    pub address: String,
25    /// Service capabilities
26    pub capabilities: Vec<String>,
27    /// Service metadata
28    pub metadata: HashMap<String, String>,
29    /// Last seen timestamp
30    pub last_seen: SystemTime,
31    /// Service version
32    pub version: String,
33}
34
35impl ServiceInfo {
36    /// Create new service info
37    pub fn new(service_id: String, name: String, address: String) -> Self {
38        Self {
39            service_id,
40            name,
41            address,
42            capabilities: Vec::new(),
43            metadata: HashMap::new(),
44            last_seen: SystemTime::now(),
45            version: "1.0.0".to_string(),
46        }
47    }
48
49    /// Add capability
50    pub fn add_capability(&mut self, capability: String) {
51        if !self.capabilities.contains(&capability) {
52            self.capabilities.push(capability);
53        }
54    }
55
56    /// Add metadata
57    pub fn add_metadata(&mut self, key: String, value: String) {
58        self.metadata.insert(key, value);
59    }
60
61    /// Check if service has capability
62    pub fn has_capability(&self, capability: &str) -> bool {
63        self.capabilities.contains(&capability.to_string())
64    }
65
66    /// Get metadata value
67    pub fn get_metadata(&self, key: &str) -> Option<&String> {
68        self.metadata.get(key)
69    }
70
71    /// Update last seen timestamp
72    pub fn update_last_seen(&mut self) {
73        self.last_seen = SystemTime::now();
74    }
75
76    /// Check if service is stale (older than timeout)
77    pub fn is_stale(&self, timeout: Duration) -> bool {
78        SystemTime::now()
79            .duration_since(self.last_seen)
80            .unwrap_or(Duration::from_secs(0))
81            > timeout
82    }
83}
84
85/// Service discovery manager
86pub struct ServiceDiscovery {
87    /// Services registry
88    services: Arc<RwLock<HashMap<String, ServiceInfo>>>,
89    /// Service timeout (default: 60 seconds)
90    timeout: Duration,
91    /// Local service info
92    local_service: Option<ServiceInfo>,
93}
94
95impl ServiceDiscovery {
96    /// Create new service discovery manager
97    pub fn new() -> Self {
98        Self {
99            services: Arc::new(RwLock::new(HashMap::new())),
100            timeout: Duration::from_secs(60),
101            local_service: None,
102        }
103    }
104
105    /// Create with custom timeout
106    pub fn with_timeout(timeout: Duration) -> Self {
107        Self {
108            services: Arc::new(RwLock::new(HashMap::new())),
109            timeout,
110            local_service: None,
111        }
112    }
113
114    /// Register local service
115    pub fn register_local(&mut self, service: ServiceInfo) {
116        let service_id = service.service_id.clone();
117        self.local_service = Some(service.clone());
118        self.services.write().insert(service_id, service);
119    }
120
121    /// Register discovered service
122    pub fn register_service(&self, service: ServiceInfo) {
123        self.services
124            .write()
125            .insert(service.service_id.clone(), service);
126    }
127
128    /// Unregister service
129    pub fn unregister_service(&self, service_id: &str) {
130        self.services.write().remove(service_id);
131    }
132
133    /// Get service by ID
134    pub fn get_service(&self, service_id: &str) -> Option<ServiceInfo> {
135        self.services.read().get(service_id).cloned()
136    }
137
138    /// Get all services
139    pub fn get_all_services(&self) -> Vec<ServiceInfo> {
140        self.services.read().values().cloned().collect()
141    }
142
143    /// Find services by capability
144    pub fn find_by_capability(&self, capability: &str) -> Vec<ServiceInfo> {
145        self.services
146            .read()
147            .values()
148            .filter(|s| s.has_capability(capability))
149            .cloned()
150            .collect()
151    }
152
153    /// Find services by metadata
154    pub fn find_by_metadata(&self, key: &str, value: &str) -> Vec<ServiceInfo> {
155        self.services
156            .read()
157            .values()
158            .filter(|s| s.get_metadata(key).map(|v| v == value).unwrap_or(false))
159            .cloned()
160            .collect()
161    }
162
163    /// Find services by name
164    pub fn find_by_name(&self, name: &str) -> Vec<ServiceInfo> {
165        self.services
166            .read()
167            .values()
168            .filter(|s| s.name == name)
169            .cloned()
170            .collect()
171    }
172
173    /// Update service last seen
174    pub fn update_service(&self, service_id: &str) {
175        if let Some(service) = self.services.write().get_mut(service_id) {
176            service.update_last_seen();
177        }
178    }
179
180    /// Remove stale services
181    pub fn cleanup_stale(&self) -> usize {
182        let timeout = self.timeout;
183        let mut services = self.services.write();
184        let before_count = services.len();
185
186        services.retain(|_, service| !service.is_stale(timeout));
187
188        before_count - services.len()
189    }
190
191    /// Get service count
192    pub fn service_count(&self) -> usize {
193        self.services.read().len()
194    }
195
196    /// Get local service info
197    pub fn get_local_service(&self) -> Option<ServiceInfo> {
198        self.local_service.clone()
199    }
200
201    /// Create discovery envelope (HELLO message)
202    pub fn create_discovery_envelope(&self) -> Option<Envelope> {
203        self.local_service.as_ref().map(|service| {
204            let mut envelope = Envelope::builder()
205                .from(&service.service_id)
206                .to("broadcast")
207                .operation(OperationType::Control)
208                .message_id(&format!("discovery-{}", uuid::Uuid::new_v4()))
209                .capability_str("type", "discovery")
210                .capability_str("service_name", &service.name)
211                .capability_str("service_address", &service.address)
212                .capability_str("service_version", &service.version);
213
214            // Add capabilities
215            for cap in &service.capabilities {
216                envelope = envelope.capability_str(&format!("cap:{}", cap), "true");
217            }
218
219            // Add metadata
220            for (key, value) in &service.metadata {
221                envelope = envelope.capability_str(&format!("meta:{}", key), value);
222            }
223
224            envelope.build().ok()
225        })?
226    }
227
228    /// Parse discovery envelope
229    pub fn parse_discovery_envelope(&self, envelope: &Envelope) -> Option<ServiceInfo> {
230        let caps = envelope.capabilities()?;
231
232        // Check if it's a discovery message
233        if caps.get("type").map(|v| v == "discovery").unwrap_or(false) {
234            let service_id = envelope.from().to_string();
235            let name = caps.get("service_name")
236                .and_then(|v| v.as_str())
237                .unwrap_or("")
238                .to_string();
239            let address = caps.get("service_address")
240                .and_then(|v| v.as_str())
241                .unwrap_or("")
242                .to_string();
243
244            let mut service = ServiceInfo::new(service_id, name, address);
245
246            // Parse version
247            if let Some(version) = caps.get("service_version") {
248                service.version = version.as_str().unwrap_or("").to_string();
249            }
250
251            // Parse capabilities (cap:*)
252            for (key, _value) in caps.iter() {
253                if let Some(cap) = key.strip_prefix("cap:") {
254                    service.add_capability(cap.to_string());
255                }
256            }
257
258            // Parse metadata (meta:*)
259            for (key, value) in caps.iter() {
260                if let Some(meta_key) = key.strip_prefix("meta:") {
261                    let value_str = match value {
262                        serde_json::Value::String(s) => s.clone(),
263                        other => other.to_string(),
264                    };
265                    service.add_metadata(meta_key.to_string(), value_str);
266                }
267            }
268
269            Some(service)
270        } else {
271            None
272        }
273    }
274
275    /// Auto-discover from peer (requires peer connection info)
276    #[cfg(feature = "websocket")]
277    pub fn discover_from_peer_info(&self, peer_info: &PeerInfo) -> ServiceInfo {
278        let service = ServiceInfo {
279            service_id: peer_info.id.clone(),
280            name: peer_info
281                .metadata
282                .get("name")
283                .cloned()
284                .unwrap_or_else(|| peer_info.id.clone()),
285            address: peer_info.url.clone().unwrap_or_default(),
286            capabilities: peer_info.capabilities.clone(),
287            metadata: peer_info.metadata.clone(),
288            last_seen: SystemTime::now(),
289            version: peer_info
290                .metadata
291                .get("version")
292                .cloned()
293                .unwrap_or_else(|| "1.0.0".to_string()),
294        };
295
296        self.register_service(service.clone());
297        service
298    }
299}
300
301impl Default for ServiceDiscovery {
302    fn default() -> Self {
303        Self::new()
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn test_service_info_creation() {
313        let service = ServiceInfo::new(
314            "service-1".to_string(),
315            "Test Service".to_string(),
316            "ws://localhost:8080".to_string(),
317        );
318
319        assert_eq!(service.service_id, "service-1");
320        assert_eq!(service.name, "Test Service");
321        assert_eq!(service.address, "ws://localhost:8080");
322    }
323
324    #[test]
325    fn test_service_capabilities() {
326        let mut service = ServiceInfo::new(
327            "service-1".to_string(),
328            "Test".to_string(),
329            "ws://localhost:8080".to_string(),
330        );
331
332        service.add_capability("storage".to_string());
333        service.add_capability("compute".to_string());
334
335        assert!(service.has_capability("storage"));
336        assert!(service.has_capability("compute"));
337        assert!(!service.has_capability("network"));
338    }
339
340    #[test]
341    fn test_service_metadata() {
342        let mut service = ServiceInfo::new(
343            "service-1".to_string(),
344            "Test".to_string(),
345            "ws://localhost:8080".to_string(),
346        );
347
348        service.add_metadata("region".to_string(), "us-east-1".to_string());
349        service.add_metadata("version".to_string(), "1.0.0".to_string());
350
351        assert_eq!(service.get_metadata("region"), Some(&"us-east-1".to_string()));
352        assert_eq!(service.get_metadata("version"), Some(&"1.0.0".to_string()));
353        assert_eq!(service.get_metadata("unknown"), None);
354    }
355
356    #[test]
357    fn test_service_discovery_creation() {
358        let discovery = ServiceDiscovery::new();
359        assert_eq!(discovery.service_count(), 0);
360    }
361
362    #[test]
363    fn test_register_service() {
364        let discovery = ServiceDiscovery::new();
365        let service = ServiceInfo::new(
366            "service-1".to_string(),
367            "Test".to_string(),
368            "ws://localhost:8080".to_string(),
369        );
370
371        discovery.register_service(service.clone());
372        assert_eq!(discovery.service_count(), 1);
373
374        let retrieved = discovery.get_service("service-1");
375        assert!(retrieved.is_some());
376        assert_eq!(retrieved.unwrap().name, "Test");
377    }
378
379    #[test]
380    fn test_find_by_capability() {
381        let discovery = ServiceDiscovery::new();
382
383        let mut service1 = ServiceInfo::new(
384            "service-1".to_string(),
385            "Storage".to_string(),
386            "ws://localhost:8080".to_string(),
387        );
388        service1.add_capability("storage".to_string());
389
390        let mut service2 = ServiceInfo::new(
391            "service-2".to_string(),
392            "Compute".to_string(),
393            "ws://localhost:8081".to_string(),
394        );
395        service2.add_capability("compute".to_string());
396
397        discovery.register_service(service1);
398        discovery.register_service(service2);
399
400        let storage_services = discovery.find_by_capability("storage");
401        assert_eq!(storage_services.len(), 1);
402        assert_eq!(storage_services[0].name, "Storage");
403
404        let compute_services = discovery.find_by_capability("compute");
405        assert_eq!(compute_services.len(), 1);
406        assert_eq!(compute_services[0].name, "Compute");
407    }
408
409    #[test]
410    fn test_find_by_metadata() {
411        let discovery = ServiceDiscovery::new();
412
413        let mut service1 = ServiceInfo::new(
414            "service-1".to_string(),
415            "US Service".to_string(),
416            "ws://localhost:8080".to_string(),
417        );
418        service1.add_metadata("region".to_string(), "us-east-1".to_string());
419
420        let mut service2 = ServiceInfo::new(
421            "service-2".to_string(),
422            "EU Service".to_string(),
423            "ws://localhost:8081".to_string(),
424        );
425        service2.add_metadata("region".to_string(), "eu-west-1".to_string());
426
427        discovery.register_service(service1);
428        discovery.register_service(service2);
429
430        let us_services = discovery.find_by_metadata("region", "us-east-1");
431        assert_eq!(us_services.len(), 1);
432        assert_eq!(us_services[0].name, "US Service");
433    }
434
435    #[test]
436    fn test_unregister_service() {
437        let discovery = ServiceDiscovery::new();
438        let service = ServiceInfo::new(
439            "service-1".to_string(),
440            "Test".to_string(),
441            "ws://localhost:8080".to_string(),
442        );
443
444        discovery.register_service(service);
445        assert_eq!(discovery.service_count(), 1);
446
447        discovery.unregister_service("service-1");
448        assert_eq!(discovery.service_count(), 0);
449    }
450
451    #[test]
452    fn test_service_stale_detection() {
453        let mut service = ServiceInfo::new(
454            "service-1".to_string(),
455            "Test".to_string(),
456            "ws://localhost:8080".to_string(),
457        );
458
459        // Service just created is not stale
460        assert!(!service.is_stale(Duration::from_secs(60)));
461
462        // Manually set old timestamp (in real scenario this would be old)
463        service.last_seen = SystemTime::now()
464            .checked_sub(Duration::from_secs(120))
465            .unwrap();
466
467        // Now it should be stale (older than 60 seconds)
468        assert!(service.is_stale(Duration::from_secs(60)));
469    }
470
471    #[test]
472    fn test_cleanup_stale() {
473        let discovery = ServiceDiscovery::with_timeout(Duration::from_millis(100));
474
475        let mut service = ServiceInfo::new(
476            "service-1".to_string(),
477            "Test".to_string(),
478            "ws://localhost:8080".to_string(),
479        );
480
481        // Make service stale
482        service.last_seen = SystemTime::now()
483            .checked_sub(Duration::from_secs(120))
484            .unwrap();
485
486        discovery.register_service(service);
487        assert_eq!(discovery.service_count(), 1);
488
489        // Cleanup should remove stale service
490        let removed = discovery.cleanup_stale();
491        assert_eq!(removed, 1);
492        assert_eq!(discovery.service_count(), 0);
493    }
494}
495