1use crate::{Envelope, OperationType};
9#[cfg(feature = "websocket")]
10use crate::peer::PeerInfo;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16#[derive(Debug, Clone)]
18pub struct ServiceInfo {
19 pub service_id: String,
21 pub name: String,
23 pub address: String,
25 pub capabilities: Vec<String>,
27 pub metadata: HashMap<String, String>,
29 pub last_seen: SystemTime,
31 pub version: String,
33}
34
35impl ServiceInfo {
36 pub fn new(service_id: String, name: String, address: String) -> Self {
38 Self {
39 service_id,
40 name,
41 address,
42 capabilities: Vec::new(),
43 metadata: HashMap::new(),
44 last_seen: SystemTime::now(),
45 version: "1.0.0".to_string(),
46 }
47 }
48
49 pub fn add_capability(&mut self, capability: String) {
51 if !self.capabilities.contains(&capability) {
52 self.capabilities.push(capability);
53 }
54 }
55
56 pub fn add_metadata(&mut self, key: String, value: String) {
58 self.metadata.insert(key, value);
59 }
60
61 pub fn has_capability(&self, capability: &str) -> bool {
63 self.capabilities.contains(&capability.to_string())
64 }
65
66 pub fn get_metadata(&self, key: &str) -> Option<&String> {
68 self.metadata.get(key)
69 }
70
71 pub fn update_last_seen(&mut self) {
73 self.last_seen = SystemTime::now();
74 }
75
76 pub fn is_stale(&self, timeout: Duration) -> bool {
78 SystemTime::now()
79 .duration_since(self.last_seen)
80 .unwrap_or(Duration::from_secs(0))
81 > timeout
82 }
83}
84
85pub struct ServiceDiscovery {
87 services: Arc<RwLock<HashMap<String, ServiceInfo>>>,
89 timeout: Duration,
91 local_service: Option<ServiceInfo>,
93}
94
95impl ServiceDiscovery {
96 pub fn new() -> Self {
98 Self {
99 services: Arc::new(RwLock::new(HashMap::new())),
100 timeout: Duration::from_secs(60),
101 local_service: None,
102 }
103 }
104
105 pub fn with_timeout(timeout: Duration) -> Self {
107 Self {
108 services: Arc::new(RwLock::new(HashMap::new())),
109 timeout,
110 local_service: None,
111 }
112 }
113
114 pub fn register_local(&mut self, service: ServiceInfo) {
116 let service_id = service.service_id.clone();
117 self.local_service = Some(service.clone());
118 self.services.write().insert(service_id, service);
119 }
120
121 pub fn register_service(&self, service: ServiceInfo) {
123 self.services
124 .write()
125 .insert(service.service_id.clone(), service);
126 }
127
128 pub fn unregister_service(&self, service_id: &str) {
130 self.services.write().remove(service_id);
131 }
132
133 pub fn get_service(&self, service_id: &str) -> Option<ServiceInfo> {
135 self.services.read().get(service_id).cloned()
136 }
137
138 pub fn get_all_services(&self) -> Vec<ServiceInfo> {
140 self.services.read().values().cloned().collect()
141 }
142
143 pub fn find_by_capability(&self, capability: &str) -> Vec<ServiceInfo> {
145 self.services
146 .read()
147 .values()
148 .filter(|s| s.has_capability(capability))
149 .cloned()
150 .collect()
151 }
152
153 pub fn find_by_metadata(&self, key: &str, value: &str) -> Vec<ServiceInfo> {
155 self.services
156 .read()
157 .values()
158 .filter(|s| s.get_metadata(key).map(|v| v == value).unwrap_or(false))
159 .cloned()
160 .collect()
161 }
162
163 pub fn find_by_name(&self, name: &str) -> Vec<ServiceInfo> {
165 self.services
166 .read()
167 .values()
168 .filter(|s| s.name == name)
169 .cloned()
170 .collect()
171 }
172
173 pub fn update_service(&self, service_id: &str) {
175 if let Some(service) = self.services.write().get_mut(service_id) {
176 service.update_last_seen();
177 }
178 }
179
180 pub fn cleanup_stale(&self) -> usize {
182 let timeout = self.timeout;
183 let mut services = self.services.write();
184 let before_count = services.len();
185
186 services.retain(|_, service| !service.is_stale(timeout));
187
188 before_count - services.len()
189 }
190
191 pub fn service_count(&self) -> usize {
193 self.services.read().len()
194 }
195
196 pub fn get_local_service(&self) -> Option<ServiceInfo> {
198 self.local_service.clone()
199 }
200
201 pub fn create_discovery_envelope(&self) -> Option<Envelope> {
203 self.local_service.as_ref().map(|service| {
204 let mut envelope = Envelope::builder()
205 .from(&service.service_id)
206 .to("broadcast")
207 .operation(OperationType::Control)
208 .message_id(&format!("discovery-{}", uuid::Uuid::new_v4()))
209 .capability_str("type", "discovery")
210 .capability_str("service_name", &service.name)
211 .capability_str("service_address", &service.address)
212 .capability_str("service_version", &service.version);
213
214 for cap in &service.capabilities {
216 envelope = envelope.capability_str(&format!("cap:{}", cap), "true");
217 }
218
219 for (key, value) in &service.metadata {
221 envelope = envelope.capability_str(&format!("meta:{}", key), value);
222 }
223
224 envelope.build().ok()
225 })?
226 }
227
228 pub fn parse_discovery_envelope(&self, envelope: &Envelope) -> Option<ServiceInfo> {
230 let caps = envelope.capabilities()?;
231
232 if caps.get("type").map(|v| v == "discovery").unwrap_or(false) {
234 let service_id = envelope.from().to_string();
235 let name = caps.get("service_name")
236 .and_then(|v| v.as_str())
237 .unwrap_or("")
238 .to_string();
239 let address = caps.get("service_address")
240 .and_then(|v| v.as_str())
241 .unwrap_or("")
242 .to_string();
243
244 let mut service = ServiceInfo::new(service_id, name, address);
245
246 if let Some(version) = caps.get("service_version") {
248 service.version = version.as_str().unwrap_or("").to_string();
249 }
250
251 for (key, _value) in caps.iter() {
253 if let Some(cap) = key.strip_prefix("cap:") {
254 service.add_capability(cap.to_string());
255 }
256 }
257
258 for (key, value) in caps.iter() {
260 if let Some(meta_key) = key.strip_prefix("meta:") {
261 let value_str = match value {
262 serde_json::Value::String(s) => s.clone(),
263 other => other.to_string(),
264 };
265 service.add_metadata(meta_key.to_string(), value_str);
266 }
267 }
268
269 Some(service)
270 } else {
271 None
272 }
273 }
274
275 #[cfg(feature = "websocket")]
277 pub fn discover_from_peer_info(&self, peer_info: &PeerInfo) -> ServiceInfo {
278 let service = ServiceInfo {
279 service_id: peer_info.id.clone(),
280 name: peer_info
281 .metadata
282 .get("name")
283 .cloned()
284 .unwrap_or_else(|| peer_info.id.clone()),
285 address: peer_info.url.clone().unwrap_or_default(),
286 capabilities: peer_info.capabilities.clone(),
287 metadata: peer_info.metadata.clone(),
288 last_seen: SystemTime::now(),
289 version: peer_info
290 .metadata
291 .get("version")
292 .cloned()
293 .unwrap_or_else(|| "1.0.0".to_string()),
294 };
295
296 self.register_service(service.clone());
297 service
298 }
299}
300
301impl Default for ServiceDiscovery {
302 fn default() -> Self {
303 Self::new()
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_service_info_creation() {
313 let service = ServiceInfo::new(
314 "service-1".to_string(),
315 "Test Service".to_string(),
316 "ws://localhost:8080".to_string(),
317 );
318
319 assert_eq!(service.service_id, "service-1");
320 assert_eq!(service.name, "Test Service");
321 assert_eq!(service.address, "ws://localhost:8080");
322 }
323
324 #[test]
325 fn test_service_capabilities() {
326 let mut service = ServiceInfo::new(
327 "service-1".to_string(),
328 "Test".to_string(),
329 "ws://localhost:8080".to_string(),
330 );
331
332 service.add_capability("storage".to_string());
333 service.add_capability("compute".to_string());
334
335 assert!(service.has_capability("storage"));
336 assert!(service.has_capability("compute"));
337 assert!(!service.has_capability("network"));
338 }
339
340 #[test]
341 fn test_service_metadata() {
342 let mut service = ServiceInfo::new(
343 "service-1".to_string(),
344 "Test".to_string(),
345 "ws://localhost:8080".to_string(),
346 );
347
348 service.add_metadata("region".to_string(), "us-east-1".to_string());
349 service.add_metadata("version".to_string(), "1.0.0".to_string());
350
351 assert_eq!(service.get_metadata("region"), Some(&"us-east-1".to_string()));
352 assert_eq!(service.get_metadata("version"), Some(&"1.0.0".to_string()));
353 assert_eq!(service.get_metadata("unknown"), None);
354 }
355
356 #[test]
357 fn test_service_discovery_creation() {
358 let discovery = ServiceDiscovery::new();
359 assert_eq!(discovery.service_count(), 0);
360 }
361
362 #[test]
363 fn test_register_service() {
364 let discovery = ServiceDiscovery::new();
365 let service = ServiceInfo::new(
366 "service-1".to_string(),
367 "Test".to_string(),
368 "ws://localhost:8080".to_string(),
369 );
370
371 discovery.register_service(service.clone());
372 assert_eq!(discovery.service_count(), 1);
373
374 let retrieved = discovery.get_service("service-1");
375 assert!(retrieved.is_some());
376 assert_eq!(retrieved.unwrap().name, "Test");
377 }
378
379 #[test]
380 fn test_find_by_capability() {
381 let discovery = ServiceDiscovery::new();
382
383 let mut service1 = ServiceInfo::new(
384 "service-1".to_string(),
385 "Storage".to_string(),
386 "ws://localhost:8080".to_string(),
387 );
388 service1.add_capability("storage".to_string());
389
390 let mut service2 = ServiceInfo::new(
391 "service-2".to_string(),
392 "Compute".to_string(),
393 "ws://localhost:8081".to_string(),
394 );
395 service2.add_capability("compute".to_string());
396
397 discovery.register_service(service1);
398 discovery.register_service(service2);
399
400 let storage_services = discovery.find_by_capability("storage");
401 assert_eq!(storage_services.len(), 1);
402 assert_eq!(storage_services[0].name, "Storage");
403
404 let compute_services = discovery.find_by_capability("compute");
405 assert_eq!(compute_services.len(), 1);
406 assert_eq!(compute_services[0].name, "Compute");
407 }
408
409 #[test]
410 fn test_find_by_metadata() {
411 let discovery = ServiceDiscovery::new();
412
413 let mut service1 = ServiceInfo::new(
414 "service-1".to_string(),
415 "US Service".to_string(),
416 "ws://localhost:8080".to_string(),
417 );
418 service1.add_metadata("region".to_string(), "us-east-1".to_string());
419
420 let mut service2 = ServiceInfo::new(
421 "service-2".to_string(),
422 "EU Service".to_string(),
423 "ws://localhost:8081".to_string(),
424 );
425 service2.add_metadata("region".to_string(), "eu-west-1".to_string());
426
427 discovery.register_service(service1);
428 discovery.register_service(service2);
429
430 let us_services = discovery.find_by_metadata("region", "us-east-1");
431 assert_eq!(us_services.len(), 1);
432 assert_eq!(us_services[0].name, "US Service");
433 }
434
435 #[test]
436 fn test_unregister_service() {
437 let discovery = ServiceDiscovery::new();
438 let service = ServiceInfo::new(
439 "service-1".to_string(),
440 "Test".to_string(),
441 "ws://localhost:8080".to_string(),
442 );
443
444 discovery.register_service(service);
445 assert_eq!(discovery.service_count(), 1);
446
447 discovery.unregister_service("service-1");
448 assert_eq!(discovery.service_count(), 0);
449 }
450
451 #[test]
452 fn test_service_stale_detection() {
453 let mut service = ServiceInfo::new(
454 "service-1".to_string(),
455 "Test".to_string(),
456 "ws://localhost:8080".to_string(),
457 );
458
459 assert!(!service.is_stale(Duration::from_secs(60)));
461
462 service.last_seen = SystemTime::now()
464 .checked_sub(Duration::from_secs(120))
465 .unwrap();
466
467 assert!(service.is_stale(Duration::from_secs(60)));
469 }
470
471 #[test]
472 fn test_cleanup_stale() {
473 let discovery = ServiceDiscovery::with_timeout(Duration::from_millis(100));
474
475 let mut service = ServiceInfo::new(
476 "service-1".to_string(),
477 "Test".to_string(),
478 "ws://localhost:8080".to_string(),
479 );
480
481 service.last_seen = SystemTime::now()
483 .checked_sub(Duration::from_secs(120))
484 .unwrap();
485
486 discovery.register_service(service);
487 assert_eq!(discovery.service_count(), 1);
488
489 let removed = discovery.cleanup_stale();
491 assert_eq!(removed, 1);
492 assert_eq!(discovery.service_count(), 0);
493 }
494}
495