rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::collections::{BTreeMap, BTreeSet};

use crate::discovery::{DiscoveryError, DiscoveryResult, ServiceInstance};
use crate::discovery_kube::KubeSliceCacheUpdate;

/// Current Kubernetes watcher status.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KubeDiscoveryStatus {
    /// Whether the initial Kubernetes watch list has completed.
    pub synced: bool,
    /// Last watcher or mapper error.
    pub last_error: Option<String>,
    /// Kubernetes resource kind used by this watcher.
    pub resource_kind: String,
}

impl Default for KubeDiscoveryStatus {
    fn default() -> Self {
        Self {
            synced: false,
            last_error: None,
            resource_kind: "EndpointSlice".to_string(),
        }
    }
}

/// EndpointSlice-backed discovery cache.
#[derive(Debug, Clone, Default)]
pub struct KubeDiscoveryCache {
    slices: BTreeMap<String, BTreeMap<String, Vec<ServiceInstance>>>,
    init_buffer: BTreeMap<String, BTreeMap<String, Vec<ServiceInstance>>>,
    status: KubeDiscoveryStatus,
}

impl KubeDiscoveryCache {
    /// Starts a fresh watcher initialization buffer.
    pub fn begin_init(&mut self) {
        self.init_buffer.clear();
        self.status.synced = false;
        self.status.last_error = None;
    }

    /// Buffers one slice update during watcher initialization.
    pub fn buffer_init(&mut self, update: KubeSliceCacheUpdate) {
        write_update(&mut self.init_buffer, update);
    }

    /// Replaces the live cache with the initialization buffer.
    pub fn finish_init(&mut self) {
        self.slices = std::mem::take(&mut self.init_buffer);
        self.status.synced = true;
        self.status.last_error = None;
    }

    /// Applies one live EndpointSlice update.
    pub fn apply_slice(&mut self, update: KubeSliceCacheUpdate) {
        write_update(&mut self.slices, update);
        self.status.synced = true;
        self.status.last_error = None;
    }

    /// Removes one EndpointSlice from a service cache.
    pub fn delete_slice(&mut self, service: &str, slice: &str) {
        if let Some(slices) = self.slices.get_mut(service) {
            slices.remove(slice);
            if slices.is_empty() {
                self.slices.remove(service);
            }
        }
    }

    /// Marks the cache with a watcher error while keeping the last known state.
    pub fn mark_error(&mut self, error: impl Into<String>) {
        self.status.last_error = Some(error.into());
    }

    /// Returns healthy instances for a service from the current cache.
    pub fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
        if !self.status.synced {
            return Err(DiscoveryError::Resolve {
                host: "kubernetes".to_string(),
                message: format!("kubernetes watcher for `{service}` has not synchronized"),
            });
        }
        let mut seen = BTreeSet::new();
        let mut instances = self
            .slices
            .get(service)
            .into_iter()
            .flat_map(|slices| slices.values())
            .flat_map(|instances| instances.iter())
            .filter(|instance| seen.insert(instance.id.clone()))
            .cloned()
            .collect::<Vec<_>>();
        instances.sort_by(|left, right| left.id.cmp(&right.id));
        if instances.is_empty() {
            Err(DiscoveryError::NoInstances {
                service: service.to_string(),
            })
        } else {
            Ok(instances)
        }
    }

    /// Returns a snapshot of watcher status.
    pub fn status(&self) -> KubeDiscoveryStatus {
        self.status.clone()
    }
}

fn write_update(
    target: &mut BTreeMap<String, BTreeMap<String, Vec<ServiceInstance>>>,
    update: KubeSliceCacheUpdate,
) {
    let service = target.entry(update.service).or_default();
    if update.instances.is_empty() {
        service.remove(&update.slice);
    } else {
        service.insert(update.slice, update.instances);
    }
}