use std::{collections::BTreeMap, sync::Arc};
use async_trait::async_trait;
use tokio::sync::RwLock;
use crate::discovery::{DiscoveryError, DiscoveryResult, ServiceInstance};
#[async_trait]
pub trait Discovery: Send + Sync {
async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>>;
}
#[async_trait]
pub trait Registry: Discovery {
async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()>;
async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance>;
}
#[derive(Debug, Clone, Default)]
pub struct MemoryRegistry {
services: Arc<RwLock<BTreeMap<String, BTreeMap<String, ServiceInstance>>>>,
}
impl MemoryRegistry {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait]
impl Discovery for MemoryRegistry {
async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
let services = self.services.read().await;
let instances = services
.get(service)
.map(|items| {
items
.values()
.filter(|instance| instance.healthy)
.cloned()
.collect::<Vec<_>>()
})
.unwrap_or_default();
if instances.is_empty() {
Err(DiscoveryError::NoInstances {
service: service.to_string(),
})
} else {
Ok(instances)
}
}
}
#[async_trait]
impl Registry for MemoryRegistry {
async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()> {
let mut services = self.services.write().await;
let service = services.entry(instance.service.clone()).or_default();
if service.contains_key(&instance.id) {
return Err(DiscoveryError::DuplicateInstance {
service: instance.service,
id: instance.id,
});
}
service.insert(instance.id.clone(), instance);
Ok(())
}
async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance> {
let mut services = self.services.write().await;
let instances =
services
.get_mut(service)
.ok_or_else(|| DiscoveryError::MissingInstance {
service: service.to_string(),
id: id.to_string(),
})?;
instances
.remove(id)
.ok_or_else(|| DiscoveryError::MissingInstance {
service: service.to_string(),
id: id.to_string(),
})
}
}
#[cfg(test)]
mod tests {
use super::{Discovery, MemoryRegistry, Registry};
use crate::discovery::{InstanceEndpoint, ServiceInstance};
#[tokio::test]
async fn memory_registry_registers_and_removes_instances() {
let registry = MemoryRegistry::new();
let endpoint = InstanceEndpoint::new("127.0.0.1", 8080).expect("endpoint");
registry
.register(ServiceInstance::new("api", "api-1", endpoint))
.await
.expect("register");
assert_eq!(registry.discover("api").await.expect("discover").len(), 1);
registry
.deregister("api", "api-1")
.await
.expect("deregister");
assert!(registry.discover("api").await.is_err());
}
}