rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::sync::Arc;

use async_trait::async_trait;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{Api, Client, Config, ResourceExt, config::KubeConfigOptions, runtime::watcher};
use tokio::{
    sync::RwLock,
    task::JoinHandle,
    time::{sleep, timeout},
};
use tokio_stream::StreamExt;

use crate::discovery::{Discovery, DiscoveryError, DiscoveryResult, ServiceInstance};
use crate::discovery_kube::{
    KubeDiscoveryCache, KubeDiscoveryConfig, KubeDiscoveryError, KubeDiscoveryResult,
    KubeDiscoveryStatus, map_endpoint_slice, map_endpoints,
};

enum KubeDiscoverySource {
    Snapshot { service: String, snapshot: String },
    Watch { task: Arc<JoinHandle<()>> },
}

/// Kubernetes discovery backed by either fixture snapshots or a real EndpointSlice watcher.
#[derive(Clone)]
pub struct KubeDiscovery {
    config: KubeDiscoveryConfig,
    cache: Arc<RwLock<KubeDiscoveryCache>>,
    source: Arc<KubeDiscoverySource>,
}

impl std::fmt::Debug for KubeDiscovery {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        formatter
            .debug_struct("KubeDiscovery")
            .field("config", &self.config)
            .finish_non_exhaustive()
    }
}

impl KubeDiscovery {
    /// Creates a discovery facade from a fixture snapshot for offline tests.
    pub fn from_snapshot(
        config: KubeDiscoveryConfig,
        service: impl Into<String>,
        snapshot: impl Into<String>,
    ) -> Self {
        Self {
            config,
            cache: Arc::new(RwLock::new(KubeDiscoveryCache::default())),
            source: Arc::new(KubeDiscoverySource::Snapshot {
                service: service.into(),
                snapshot: snapshot.into(),
            }),
        }
    }

    /// Connects with the default Kubernetes client and starts an EndpointSlice watcher.
    pub async fn connect(config: KubeDiscoveryConfig) -> KubeDiscoveryResult<Self> {
        validate_config(&config)?;
        let mut kube_config = if config.context.is_some() {
            Config::from_kubeconfig(&KubeConfigOptions {
                context: config.context.clone(),
                cluster: None,
                user: None,
            })
            .await
            .map_err(|error| KubeDiscoveryError::Client(error.to_string()))?
        } else {
            Config::infer()
                .await
                .map_err(|error| KubeDiscoveryError::Client(error.to_string()))?
        };
        kube_config.apply_debug_overrides();
        let client = Client::try_from(kube_config)
            .map_err(|error| KubeDiscoveryError::Client(error.to_string()))?;
        Self::from_client(config, client).await
    }

    /// Starts an EndpointSlice watcher from an existing Kubernetes client.
    pub async fn from_client(
        config: KubeDiscoveryConfig,
        client: Client,
    ) -> KubeDiscoveryResult<Self> {
        validate_config(&config)?;
        let cache = Arc::new(RwLock::new(KubeDiscoveryCache::default()));
        let task = tokio::spawn(run_endpoint_slice_watcher(
            config.clone(),
            client,
            Arc::clone(&cache),
        ));
        wait_for_initial_sync(&config, &cache).await?;
        Ok(Self {
            config,
            cache,
            source: Arc::new(KubeDiscoverySource::Watch {
                task: Arc::new(task),
            }),
        })
    }

    /// Returns the current watcher status.
    pub async fn status(&self) -> KubeDiscoveryStatus {
        self.cache.read().await.status()
    }
}

impl Drop for KubeDiscovery {
    fn drop(&mut self) {
        if let KubeDiscoverySource::Watch { task } = self.source.as_ref()
            && Arc::strong_count(task) == 1
        {
            task.abort();
        }
    }
}

#[async_trait]
impl Discovery for KubeDiscovery {
    async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
        match self.source.as_ref() {
            KubeDiscoverySource::Snapshot {
                service: expected,
                snapshot,
            } => discover_snapshot(&self.config, expected, snapshot, service),
            KubeDiscoverySource::Watch { .. } => self.cache.read().await.discover(service),
        }
    }
}

async fn wait_for_initial_sync(
    config: &KubeDiscoveryConfig,
    cache: &Arc<RwLock<KubeDiscoveryCache>>,
) -> KubeDiscoveryResult<()> {
    timeout(config.sync_timeout, async {
        loop {
            if cache.read().await.status().synced {
                return;
            }
            sleep(std::time::Duration::from_millis(50)).await;
        }
    })
    .await
    .map_err(|_| KubeDiscoveryError::Timeout {
        operation: "initial_sync",
    })
}

async fn run_endpoint_slice_watcher(
    config: KubeDiscoveryConfig,
    client: Client,
    cache: Arc<RwLock<KubeDiscoveryCache>>,
) {
    let api: Api<EndpointSlice> = Api::namespaced(client, &config.namespace);
    let mut stream = Box::pin(watcher(api, watcher_config(&config)));
    while let Some(event) = stream.next().await {
        match event {
            Ok(event) => apply_watcher_event(&config, &cache, event).await,
            Err(error) => {
                cache.write().await.mark_error(error.to_string());
            }
        }
    }
    cache
        .write()
        .await
        .mark_error("kubernetes EndpointSlice watch stream ended");
}

async fn apply_watcher_event(
    config: &KubeDiscoveryConfig,
    cache: &Arc<RwLock<KubeDiscoveryCache>>,
    event: watcher::Event<EndpointSlice>,
) {
    match event {
        watcher::Event::Init => cache.write().await.begin_init(),
        watcher::Event::InitApply(slice) => match map_endpoint_slice(config, &slice) {
            Ok(update) => cache.write().await.buffer_init(update),
            Err(error) => cache.write().await.mark_error(error.to_string()),
        },
        watcher::Event::InitDone => cache.write().await.finish_init(),
        watcher::Event::Apply(slice) => match map_endpoint_slice(config, &slice) {
            Ok(update) => cache.write().await.apply_slice(update),
            Err(error) => cache.write().await.mark_error(error.to_string()),
        },
        watcher::Event::Delete(slice) => {
            let service = service_name(config, &slice);
            cache
                .write()
                .await
                .delete_slice(&service, &slice.name_any());
        }
    }
}

fn watcher_config(config: &KubeDiscoveryConfig) -> watcher::Config {
    let mut watcher_config = watcher::Config::default();
    if let Some(selector) = &config.label_selector {
        watcher_config = watcher_config.labels(selector);
    }
    if let Some(selector) = &config.field_selector {
        watcher_config = watcher_config.fields(selector);
    }
    if let Some(timeout) = config.watch_timeout {
        watcher_config = watcher_config.timeout(timeout);
    }
    watcher_config.page_size = config.page_size;
    watcher_config
}

fn service_name(config: &KubeDiscoveryConfig, slice: &EndpointSlice) -> String {
    slice
        .metadata
        .labels
        .as_ref()
        .and_then(|labels| labels.get(crate::discovery_kube::SERVICE_NAME_LABEL))
        .cloned()
        .or_else(|| config.service_name.clone())
        .unwrap_or_default()
}

fn discover_snapshot(
    config: &KubeDiscoveryConfig,
    expected: &str,
    snapshot: &str,
    service: &str,
) -> DiscoveryResult<Vec<ServiceInstance>> {
    if service != expected {
        return Err(DiscoveryError::NoInstances {
            service: service.to_string(),
        });
    }
    let instances = map_endpoints(config, snapshot).map_err(|error| DiscoveryError::Resolve {
        host: "kubernetes".to_string(),
        message: error.to_string(),
    })?;
    if instances.is_empty() {
        Err(DiscoveryError::NoInstances {
            service: service.to_string(),
        })
    } else {
        Ok(instances)
    }
}

fn validate_config(config: &KubeDiscoveryConfig) -> KubeDiscoveryResult<()> {
    if config.namespace.trim().is_empty() {
        return Err(KubeDiscoveryError::InvalidConfig(
            "namespace must not be empty".to_string(),
        ));
    }
    if config
        .service_name
        .as_ref()
        .is_some_and(|service| service.trim().is_empty())
    {
        return Err(KubeDiscoveryError::InvalidConfig(
            "service_name must not be empty".to_string(),
        ));
    }
    if config.sync_timeout.is_zero() {
        return Err(KubeDiscoveryError::InvalidConfig(
            "sync_timeout must be greater than zero".to_string(),
        ));
    }
    Ok(())
}