rs_zero/discovery/
registry.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use async_trait::async_trait;
4use tokio::sync::RwLock;
5
6use crate::discovery::{DiscoveryError, DiscoveryResult, ServiceInstance};
7
8#[async_trait]
10pub trait Discovery: Send + Sync {
11 async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>>;
13}
14
15#[async_trait]
17pub trait Registry: Discovery {
18 async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()>;
20
21 async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance>;
23}
24
25#[derive(Debug, Clone, Default)]
27pub struct MemoryRegistry {
28 services: Arc<RwLock<BTreeMap<String, BTreeMap<String, ServiceInstance>>>>,
29}
30
31impl MemoryRegistry {
32 pub fn new() -> Self {
34 Self::default()
35 }
36}
37
38#[async_trait]
39impl Discovery for MemoryRegistry {
40 async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
41 let services = self.services.read().await;
42 let instances = services
43 .get(service)
44 .map(|items| {
45 items
46 .values()
47 .filter(|instance| instance.healthy)
48 .cloned()
49 .collect::<Vec<_>>()
50 })
51 .unwrap_or_default();
52 if instances.is_empty() {
53 Err(DiscoveryError::NoInstances {
54 service: service.to_string(),
55 })
56 } else {
57 Ok(instances)
58 }
59 }
60}
61
62#[async_trait]
63impl Registry for MemoryRegistry {
64 async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()> {
65 let mut services = self.services.write().await;
66 let service = services.entry(instance.service.clone()).or_default();
67 if service.contains_key(&instance.id) {
68 return Err(DiscoveryError::DuplicateInstance {
69 service: instance.service,
70 id: instance.id,
71 });
72 }
73 service.insert(instance.id.clone(), instance);
74 Ok(())
75 }
76
77 async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance> {
78 let mut services = self.services.write().await;
79 let instances =
80 services
81 .get_mut(service)
82 .ok_or_else(|| DiscoveryError::MissingInstance {
83 service: service.to_string(),
84 id: id.to_string(),
85 })?;
86 instances
87 .remove(id)
88 .ok_or_else(|| DiscoveryError::MissingInstance {
89 service: service.to_string(),
90 id: id.to_string(),
91 })
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::{Discovery, MemoryRegistry, Registry};
98 use crate::discovery::{InstanceEndpoint, ServiceInstance};
99
100 #[tokio::test]
101 async fn memory_registry_registers_and_removes_instances() {
102 let registry = MemoryRegistry::new();
103 let endpoint = InstanceEndpoint::new("127.0.0.1", 8080).expect("endpoint");
104 registry
105 .register(ServiceInstance::new("api", "api-1", endpoint))
106 .await
107 .expect("register");
108
109 assert_eq!(registry.discover("api").await.expect("discover").len(), 1);
110 registry
111 .deregister("api", "api-1")
112 .await
113 .expect("deregister");
114 assert!(registry.discover("api").await.is_err());
115 }
116}