Skip to main content

rs_zero/discovery/
registry.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use async_trait::async_trait;
4use tokio::sync::RwLock;
5
6use crate::discovery::{DiscoveryError, DiscoveryResult, ServiceInstance};
7
8/// Resolves service names to available instances.
9#[async_trait]
10pub trait Discovery: Send + Sync {
11    /// Returns healthy instances for a service.
12    async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>>;
13}
14
15/// Mutable service registry.
16#[async_trait]
17pub trait Registry: Discovery {
18    /// Registers a service instance.
19    async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()>;
20
21    /// Removes a service instance.
22    async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance>;
23}
24
25/// In-memory registry useful for tests, single-process services, and examples.
26#[derive(Debug, Clone, Default)]
27pub struct MemoryRegistry {
28    services: Arc<RwLock<BTreeMap<String, BTreeMap<String, ServiceInstance>>>>,
29}
30
31impl MemoryRegistry {
32    /// Creates an empty registry.
33    pub fn new() -> Self {
34        Self::default()
35    }
36}
37
38#[async_trait]
39impl Discovery for MemoryRegistry {
40    async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
41        let services = self.services.read().await;
42        let instances = services
43            .get(service)
44            .map(|items| {
45                items
46                    .values()
47                    .filter(|instance| instance.healthy)
48                    .cloned()
49                    .collect::<Vec<_>>()
50            })
51            .unwrap_or_default();
52        if instances.is_empty() {
53            Err(DiscoveryError::NoInstances {
54                service: service.to_string(),
55            })
56        } else {
57            Ok(instances)
58        }
59    }
60}
61
62#[async_trait]
63impl Registry for MemoryRegistry {
64    async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()> {
65        let mut services = self.services.write().await;
66        let service = services.entry(instance.service.clone()).or_default();
67        if service.contains_key(&instance.id) {
68            return Err(DiscoveryError::DuplicateInstance {
69                service: instance.service,
70                id: instance.id,
71            });
72        }
73        service.insert(instance.id.clone(), instance);
74        Ok(())
75    }
76
77    async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance> {
78        let mut services = self.services.write().await;
79        let instances =
80            services
81                .get_mut(service)
82                .ok_or_else(|| DiscoveryError::MissingInstance {
83                    service: service.to_string(),
84                    id: id.to_string(),
85                })?;
86        instances
87            .remove(id)
88            .ok_or_else(|| DiscoveryError::MissingInstance {
89                service: service.to_string(),
90                id: id.to_string(),
91            })
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::{Discovery, MemoryRegistry, Registry};
98    use crate::discovery::{InstanceEndpoint, ServiceInstance};
99
100    #[tokio::test]
101    async fn memory_registry_registers_and_removes_instances() {
102        let registry = MemoryRegistry::new();
103        let endpoint = InstanceEndpoint::new("127.0.0.1", 8080).expect("endpoint");
104        registry
105            .register(ServiceInstance::new("api", "api-1", endpoint))
106            .await
107            .expect("register");
108
109        assert_eq!(registry.discover("api").await.expect("discover").len(), 1);
110        registry
111            .deregister("api", "api-1")
112            .await
113            .expect("deregister");
114        assert!(registry.discover("api").await.is_err());
115    }
116}