rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{collections::BTreeMap, sync::Arc};

use async_trait::async_trait;
use tokio::sync::RwLock;

use crate::discovery::{DiscoveryError, DiscoveryResult, ServiceInstance};

/// Resolves service names to available instances.
#[async_trait]
pub trait Discovery: Send + Sync {
    /// Returns healthy instances for a service.
    async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>>;
}

/// Mutable service registry.
#[async_trait]
pub trait Registry: Discovery {
    /// Registers a service instance.
    async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()>;

    /// Removes a service instance.
    async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance>;
}

/// In-memory registry useful for tests, single-process services, and examples.
#[derive(Debug, Clone, Default)]
pub struct MemoryRegistry {
    services: Arc<RwLock<BTreeMap<String, BTreeMap<String, ServiceInstance>>>>,
}

impl MemoryRegistry {
    /// Creates an empty registry.
    pub fn new() -> Self {
        Self::default()
    }
}

#[async_trait]
impl Discovery for MemoryRegistry {
    async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
        let services = self.services.read().await;
        let instances = services
            .get(service)
            .map(|items| {
                items
                    .values()
                    .filter(|instance| instance.healthy)
                    .cloned()
                    .collect::<Vec<_>>()
            })
            .unwrap_or_default();
        if instances.is_empty() {
            Err(DiscoveryError::NoInstances {
                service: service.to_string(),
            })
        } else {
            Ok(instances)
        }
    }
}

#[async_trait]
impl Registry for MemoryRegistry {
    async fn register(&self, instance: ServiceInstance) -> DiscoveryResult<()> {
        let mut services = self.services.write().await;
        let service = services.entry(instance.service.clone()).or_default();
        if service.contains_key(&instance.id) {
            return Err(DiscoveryError::DuplicateInstance {
                service: instance.service,
                id: instance.id,
            });
        }
        service.insert(instance.id.clone(), instance);
        Ok(())
    }

    async fn deregister(&self, service: &str, id: &str) -> DiscoveryResult<ServiceInstance> {
        let mut services = self.services.write().await;
        let instances =
            services
                .get_mut(service)
                .ok_or_else(|| DiscoveryError::MissingInstance {
                    service: service.to_string(),
                    id: id.to_string(),
                })?;
        instances
            .remove(id)
            .ok_or_else(|| DiscoveryError::MissingInstance {
                service: service.to_string(),
                id: id.to_string(),
            })
    }
}

#[cfg(test)]
mod tests {
    use super::{Discovery, MemoryRegistry, Registry};
    use crate::discovery::{InstanceEndpoint, ServiceInstance};

    #[tokio::test]
    async fn memory_registry_registers_and_removes_instances() {
        let registry = MemoryRegistry::new();
        let endpoint = InstanceEndpoint::new("127.0.0.1", 8080).expect("endpoint");
        registry
            .register(ServiceInstance::new("api", "api-1", endpoint))
            .await
            .expect("register");

        assert_eq!(registry.discover("api").await.expect("discover").len(), 1);
        registry
            .deregister("api", "api-1")
            .await
            .expect("deregister");
        assert!(registry.discover("api").await.is_err());
    }
}