rs-zero 0.2.7

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{collections::HashMap, sync::Arc};

use tokio::sync::Mutex;

use crate::resil::{
    AdaptiveShedder, AdaptiveShedderConfig, BreakerConfig, BreakerPolicyConfig, BreakerSnapshot,
    SharedCircuitBreaker, ShedderSnapshot,
};

/// Registry that reuses circuit breakers by stable service or route key.
#[derive(Debug, Clone, Default)]
pub struct BreakerRegistry {
    breakers: Arc<Mutex<HashMap<String, SharedCircuitBreaker>>>,
}

impl BreakerRegistry {
    /// Creates an empty breaker registry.
    pub fn new() -> Self {
        Self::default()
    }

    /// Returns an existing breaker or creates one with the provided config.
    pub async fn get_or_insert(
        &self,
        key: impl Into<String>,
        config: BreakerConfig,
    ) -> SharedCircuitBreaker {
        self.get_or_insert_with_policy(key, config, BreakerPolicyConfig::default())
            .await
    }

    /// Returns an existing breaker or creates one with the provided config and policy.
    pub async fn get_or_insert_with_policy(
        &self,
        key: impl Into<String>,
        config: BreakerConfig,
        policy: BreakerPolicyConfig,
    ) -> SharedCircuitBreaker {
        let key = key.into();
        let mut breakers = self.breakers.lock().await;
        breakers
            .entry(key)
            .or_insert_with(|| SharedCircuitBreaker::with_policy(config, policy))
            .clone()
    }

    /// Returns the number of registered breakers.
    pub async fn len(&self) -> usize {
        self.breakers.lock().await.len()
    }

    /// Returns whether the registry has no breakers.
    pub async fn is_empty(&self) -> bool {
        self.len().await == 0
    }

    /// Returns snapshots for all registered breakers.
    pub async fn snapshots(&self) -> Vec<(String, BreakerSnapshot)> {
        let breakers = self.breakers.lock().await;
        let items = breakers
            .iter()
            .map(|(key, breaker)| (key.clone(), breaker.clone()))
            .collect::<Vec<_>>();
        drop(breakers);

        let mut snapshots = Vec::with_capacity(items.len());
        for (key, breaker) in items {
            snapshots.push((key, breaker.snapshot().await));
        }
        snapshots
    }
}

/// Registry that reuses adaptive shedders by stable service or route key.
#[derive(Debug, Clone, Default)]
pub struct ShedderRegistry {
    shedders: Arc<Mutex<HashMap<String, AdaptiveShedder>>>,
}

impl ShedderRegistry {
    /// Creates an empty shedder registry.
    pub fn new() -> Self {
        Self::default()
    }

    /// Returns an existing shedder or creates one with the provided config.
    pub async fn get_or_insert(
        &self,
        key: impl Into<String>,
        config: AdaptiveShedderConfig,
    ) -> AdaptiveShedder {
        let key = key.into();
        let mut shedders = self.shedders.lock().await;
        shedders
            .entry(key)
            .or_insert_with(|| AdaptiveShedder::new(config))
            .clone()
    }

    /// Returns the number of registered shedders.
    pub async fn len(&self) -> usize {
        self.shedders.lock().await.len()
    }

    /// Returns whether the registry has no shedders.
    pub async fn is_empty(&self) -> bool {
        self.len().await == 0
    }

    /// Returns snapshots for all registered shedders.
    pub async fn snapshots(&self) -> Vec<(String, ShedderSnapshot)> {
        let shedders = self.shedders.lock().await;
        let items = shedders
            .iter()
            .map(|(key, shedder)| (key.clone(), shedder.clone()))
            .collect::<Vec<_>>();
        drop(shedders);

        let mut snapshots = Vec::with_capacity(items.len());
        for (key, shedder) in items {
            snapshots.push((key, shedder.snapshot().await));
        }
        snapshots
    }
}

#[cfg(test)]
mod tests {
    use crate::resil::{AdaptiveShedderConfig, BreakerConfig, BreakerRegistry, ShedderRegistry};

    #[tokio::test]
    async fn registry_reuses_breakers_by_key() {
        let registry = BreakerRegistry::new();
        let first = registry
            .get_or_insert("GET /ready", BreakerConfig::default())
            .await;
        first.allow().await.expect("allow").record_failure().await;
        let second = registry
            .get_or_insert("GET /ready", BreakerConfig::default())
            .await;

        assert_eq!(registry.len().await, 1);
        assert_eq!(first.state().await, second.state().await);
        assert_eq!(registry.snapshots().await.len(), 1);
    }

    #[tokio::test]
    async fn shedder_registry_reuses_shedders_by_key() {
        let registry = ShedderRegistry::new();
        let first = registry
            .get_or_insert("GET /ready", AdaptiveShedderConfig::default())
            .await;
        let second = registry
            .get_or_insert("GET /ready", AdaptiveShedderConfig::default())
            .await;

        first.allow().await.expect("allow").record_success().await;
        assert_eq!(registry.len().await, 1);
        assert_eq!(
            second.snapshot().await.window.successes,
            first.snapshot().await.window.successes
        );
    }
}