use std::collections::{BTreeMap, BTreeSet};
use crate::discovery::{DiscoveryError, DiscoveryResult, ServiceInstance};
use crate::discovery_kube::KubeSliceCacheUpdate;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KubeDiscoveryStatus {
pub synced: bool,
pub last_error: Option<String>,
pub resource_kind: String,
}
impl Default for KubeDiscoveryStatus {
fn default() -> Self {
Self {
synced: false,
last_error: None,
resource_kind: "EndpointSlice".to_string(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct KubeDiscoveryCache {
slices: BTreeMap<String, BTreeMap<String, Vec<ServiceInstance>>>,
init_buffer: BTreeMap<String, BTreeMap<String, Vec<ServiceInstance>>>,
status: KubeDiscoveryStatus,
}
impl KubeDiscoveryCache {
pub fn begin_init(&mut self) {
self.init_buffer.clear();
self.status.synced = false;
self.status.last_error = None;
}
pub fn buffer_init(&mut self, update: KubeSliceCacheUpdate) {
write_update(&mut self.init_buffer, update);
}
pub fn finish_init(&mut self) {
self.slices = std::mem::take(&mut self.init_buffer);
self.status.synced = true;
self.status.last_error = None;
}
pub fn apply_slice(&mut self, update: KubeSliceCacheUpdate) {
write_update(&mut self.slices, update);
self.status.synced = true;
self.status.last_error = None;
}
pub fn delete_slice(&mut self, service: &str, slice: &str) {
if let Some(slices) = self.slices.get_mut(service) {
slices.remove(slice);
if slices.is_empty() {
self.slices.remove(service);
}
}
}
pub fn mark_error(&mut self, error: impl Into<String>) {
self.status.last_error = Some(error.into());
}
pub fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
if !self.status.synced {
return Err(DiscoveryError::Resolve {
host: "kubernetes".to_string(),
message: format!("kubernetes watcher for `{service}` has not synchronized"),
});
}
let mut seen = BTreeSet::new();
let mut instances = self
.slices
.get(service)
.into_iter()
.flat_map(|slices| slices.values())
.flat_map(|instances| instances.iter())
.filter(|instance| seen.insert(instance.id.clone()))
.cloned()
.collect::<Vec<_>>();
instances.sort_by(|left, right| left.id.cmp(&right.id));
if instances.is_empty() {
Err(DiscoveryError::NoInstances {
service: service.to_string(),
})
} else {
Ok(instances)
}
}
pub fn status(&self) -> KubeDiscoveryStatus {
self.status.clone()
}
}
fn write_update(
target: &mut BTreeMap<String, BTreeMap<String, Vec<ServiceInstance>>>,
update: KubeSliceCacheUpdate,
) {
let service = target.entry(update.service).or_default();
if update.instances.is_empty() {
service.remove(&update.slice);
} else {
service.insert(update.slice, update.instances);
}
}