rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::collections::BTreeMap;

use k8s_openapi::api::discovery::v1::{Endpoint, EndpointPort, EndpointSlice};
use kube::ResourceExt;

use crate::discovery::{InstanceEndpoint, ServiceInstance};
use crate::discovery_kube::{KubeDiscoveryConfig, KubeDiscoveryError, KubeDiscoveryResult};

/// Kubernetes label used by EndpointSlice objects to identify their Service.
pub const SERVICE_NAME_LABEL: &str = "kubernetes.io/service-name";

/// Cache update produced by one Kubernetes EndpointSlice.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KubeSliceCacheUpdate {
    /// Kubernetes service name.
    pub service: String,
    /// EndpointSlice name.
    pub slice: String,
    /// Ready service instances contributed by this slice.
    pub instances: Vec<ServiceInstance>,
}

/// Maps an EndpointSlice JSON payload into a cache update.
pub fn map_endpoint_slice_json(
    config: &KubeDiscoveryConfig,
    payload: &str,
) -> KubeDiscoveryResult<KubeSliceCacheUpdate> {
    let slice: EndpointSlice = serde_json::from_str(payload)?;
    map_endpoint_slice(config, &slice)
}

/// Maps a Kubernetes EndpointSlice into ready service instances.
pub fn map_endpoint_slice(
    config: &KubeDiscoveryConfig,
    slice: &EndpointSlice,
) -> KubeDiscoveryResult<KubeSliceCacheUpdate> {
    let service = service_name(slice).ok_or_else(|| {
        KubeDiscoveryError::Mapping(format!(
            "EndpointSlice `{}` is missing `{SERVICE_NAME_LABEL}` label",
            slice.name_any()
        ))
    })?;
    if config
        .service_name
        .as_ref()
        .is_some_and(|expected| expected != &service)
    {
        return Ok(KubeSliceCacheUpdate {
            service,
            slice: slice.name_any(),
            instances: Vec::new(),
        });
    }
    let port = select_port(config, slice)?;
    let mut instances = Vec::new();
    for (index, endpoint) in slice.endpoints.iter().enumerate() {
        if endpoint_is_ready(endpoint) {
            instances.extend(endpoint_instances(
                config, slice, endpoint, &service, &port, index,
            )?);
        }
    }
    instances.sort_by(|left, right| left.id.cmp(&right.id));
    Ok(KubeSliceCacheUpdate {
        service,
        slice: slice.name_any(),
        instances,
    })
}

fn service_name(slice: &EndpointSlice) -> Option<String> {
    slice
        .metadata
        .labels
        .as_ref()
        .and_then(|labels| labels.get(SERVICE_NAME_LABEL))
        .cloned()
}

fn select_port(
    config: &KubeDiscoveryConfig,
    slice: &EndpointSlice,
) -> KubeDiscoveryResult<EndpointPort> {
    let ports = slice
        .ports
        .as_deref()
        .ok_or(KubeDiscoveryError::MissingPort)?;
    let port = if let Some(name) = &config.port_name {
        ports
            .iter()
            .find(|port| port.name.as_ref() == Some(name))
            .ok_or(KubeDiscoveryError::MissingPort)?
    } else {
        ports.first().ok_or(KubeDiscoveryError::MissingPort)?
    };
    let number = port.port.ok_or(KubeDiscoveryError::MissingPort)?;
    if !(1..=u16::MAX as i32).contains(&number) {
        return Err(KubeDiscoveryError::Mapping(format!(
            "EndpointSlice `{}` has invalid port `{number}`",
            slice.name_any()
        )));
    }
    Ok(port.clone())
}

fn endpoint_is_ready(endpoint: &Endpoint) -> bool {
    let Some(conditions) = &endpoint.conditions else {
        return true;
    };
    conditions.ready.unwrap_or(true)
        && conditions.serving.unwrap_or(true)
        && !conditions.terminating.unwrap_or(false)
}

fn endpoint_instances(
    config: &KubeDiscoveryConfig,
    slice: &EndpointSlice,
    endpoint: &Endpoint,
    service: &str,
    port: &EndpointPort,
    index: usize,
) -> KubeDiscoveryResult<Vec<ServiceInstance>> {
    let port_number = port
        .port
        .ok_or(KubeDiscoveryError::MissingPort)?
        .try_into()
        .map_err(|_| KubeDiscoveryError::MissingPort)?;
    endpoint
        .addresses
        .iter()
        .enumerate()
        .map(|(address_index, address)| {
            let endpoint = InstanceEndpoint::new(address.clone(), port_number)
                .map_err(|error| KubeDiscoveryError::Backend(error.to_string()))?;
            let id = instance_id(slice, address, index, address_index);
            let mut instance = ServiceInstance::new(service.to_string(), id, endpoint);
            for (key, value) in metadata(config, slice, index, address, port) {
                instance = instance.with_metadata(key, value);
            }
            Ok(instance)
        })
        .collect()
}

fn instance_id(
    slice: &EndpointSlice,
    address: &str,
    endpoint_index: usize,
    address_index: usize,
) -> String {
    let target = slice
        .endpoints
        .get(endpoint_index)
        .and_then(|endpoint| endpoint.target_ref.as_ref());
    if let Some(uid) = target.and_then(|target| target.uid.as_ref()) {
        return uid.clone();
    }
    if let Some(name) = target.and_then(|target| target.name.as_ref()) {
        return name.clone();
    }
    format!(
        "{}-{endpoint_index}-{address_index}-{address}",
        slice.name_any()
    )
}

fn metadata(
    config: &KubeDiscoveryConfig,
    slice: &EndpointSlice,
    endpoint_index: usize,
    address: &str,
    port: &EndpointPort,
) -> BTreeMap<String, String> {
    let mut metadata = config.metadata.clone();
    metadata.insert("kubernetes.namespace".to_string(), config.namespace.clone());
    metadata.insert("kubernetes.endpoint_slice".to_string(), slice.name_any());
    metadata.insert("kubernetes.address".to_string(), address.to_string());
    if let Some(name) = &port.name {
        metadata.insert("kubernetes.port_name".to_string(), name.clone());
    }
    if let Some(protocol) = &port.protocol {
        metadata.insert("kubernetes.protocol".to_string(), protocol.clone());
    }
    if let Some(endpoint) = slice.endpoints.get(endpoint_index) {
        if let Some(node) = &endpoint.node_name {
            metadata.insert("kubernetes.node".to_string(), node.clone());
        }
        if let Some(zone) = &endpoint.zone {
            metadata.insert("kubernetes.zone".to_string(), zone.clone());
        }
    }
    metadata
}