use std::collections::BTreeMap;
use k8s_openapi::api::discovery::v1::{Endpoint, EndpointPort, EndpointSlice};
use kube::ResourceExt;
use crate::discovery::{InstanceEndpoint, ServiceInstance};
use crate::discovery_kube::{KubeDiscoveryConfig, KubeDiscoveryError, KubeDiscoveryResult};
pub const SERVICE_NAME_LABEL: &str = "kubernetes.io/service-name";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KubeSliceCacheUpdate {
pub service: String,
pub slice: String,
pub instances: Vec<ServiceInstance>,
}
pub fn map_endpoint_slice_json(
config: &KubeDiscoveryConfig,
payload: &str,
) -> KubeDiscoveryResult<KubeSliceCacheUpdate> {
let slice: EndpointSlice = serde_json::from_str(payload)?;
map_endpoint_slice(config, &slice)
}
pub fn map_endpoint_slice(
config: &KubeDiscoveryConfig,
slice: &EndpointSlice,
) -> KubeDiscoveryResult<KubeSliceCacheUpdate> {
let service = service_name(slice).ok_or_else(|| {
KubeDiscoveryError::Mapping(format!(
"EndpointSlice `{}` is missing `{SERVICE_NAME_LABEL}` label",
slice.name_any()
))
})?;
if config
.service_name
.as_ref()
.is_some_and(|expected| expected != &service)
{
return Ok(KubeSliceCacheUpdate {
service,
slice: slice.name_any(),
instances: Vec::new(),
});
}
let port = select_port(config, slice)?;
let mut instances = Vec::new();
for (index, endpoint) in slice.endpoints.iter().enumerate() {
if endpoint_is_ready(endpoint) {
instances.extend(endpoint_instances(
config, slice, endpoint, &service, &port, index,
)?);
}
}
instances.sort_by(|left, right| left.id.cmp(&right.id));
Ok(KubeSliceCacheUpdate {
service,
slice: slice.name_any(),
instances,
})
}
fn service_name(slice: &EndpointSlice) -> Option<String> {
slice
.metadata
.labels
.as_ref()
.and_then(|labels| labels.get(SERVICE_NAME_LABEL))
.cloned()
}
fn select_port(
config: &KubeDiscoveryConfig,
slice: &EndpointSlice,
) -> KubeDiscoveryResult<EndpointPort> {
let ports = slice
.ports
.as_deref()
.ok_or(KubeDiscoveryError::MissingPort)?;
let port = if let Some(name) = &config.port_name {
ports
.iter()
.find(|port| port.name.as_ref() == Some(name))
.ok_or(KubeDiscoveryError::MissingPort)?
} else {
ports.first().ok_or(KubeDiscoveryError::MissingPort)?
};
let number = port.port.ok_or(KubeDiscoveryError::MissingPort)?;
if !(1..=u16::MAX as i32).contains(&number) {
return Err(KubeDiscoveryError::Mapping(format!(
"EndpointSlice `{}` has invalid port `{number}`",
slice.name_any()
)));
}
Ok(port.clone())
}
fn endpoint_is_ready(endpoint: &Endpoint) -> bool {
let Some(conditions) = &endpoint.conditions else {
return true;
};
conditions.ready.unwrap_or(true)
&& conditions.serving.unwrap_or(true)
&& !conditions.terminating.unwrap_or(false)
}
fn endpoint_instances(
config: &KubeDiscoveryConfig,
slice: &EndpointSlice,
endpoint: &Endpoint,
service: &str,
port: &EndpointPort,
index: usize,
) -> KubeDiscoveryResult<Vec<ServiceInstance>> {
let port_number = port
.port
.ok_or(KubeDiscoveryError::MissingPort)?
.try_into()
.map_err(|_| KubeDiscoveryError::MissingPort)?;
endpoint
.addresses
.iter()
.enumerate()
.map(|(address_index, address)| {
let endpoint = InstanceEndpoint::new(address.clone(), port_number)
.map_err(|error| KubeDiscoveryError::Backend(error.to_string()))?;
let id = instance_id(slice, address, index, address_index);
let mut instance = ServiceInstance::new(service.to_string(), id, endpoint);
for (key, value) in metadata(config, slice, index, address, port) {
instance = instance.with_metadata(key, value);
}
Ok(instance)
})
.collect()
}
fn instance_id(
slice: &EndpointSlice,
address: &str,
endpoint_index: usize,
address_index: usize,
) -> String {
let target = slice
.endpoints
.get(endpoint_index)
.and_then(|endpoint| endpoint.target_ref.as_ref());
if let Some(uid) = target.and_then(|target| target.uid.as_ref()) {
return uid.clone();
}
if let Some(name) = target.and_then(|target| target.name.as_ref()) {
return name.clone();
}
format!(
"{}-{endpoint_index}-{address_index}-{address}",
slice.name_any()
)
}
fn metadata(
config: &KubeDiscoveryConfig,
slice: &EndpointSlice,
endpoint_index: usize,
address: &str,
port: &EndpointPort,
) -> BTreeMap<String, String> {
let mut metadata = config.metadata.clone();
metadata.insert("kubernetes.namespace".to_string(), config.namespace.clone());
metadata.insert("kubernetes.endpoint_slice".to_string(), slice.name_any());
metadata.insert("kubernetes.address".to_string(), address.to_string());
if let Some(name) = &port.name {
metadata.insert("kubernetes.port_name".to_string(), name.clone());
}
if let Some(protocol) = &port.protocol {
metadata.insert("kubernetes.protocol".to_string(), protocol.clone());
}
if let Some(endpoint) = slice.endpoints.get(endpoint_index) {
if let Some(node) = &endpoint.node_name {
metadata.insert("kubernetes.node".to_string(), node.clone());
}
if let Some(zone) = &endpoint.zone {
metadata.insert("kubernetes.zone".to_string(), zone.clone());
}
}
metadata
}