use std::sync::Arc;
use async_trait::async_trait;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{Api, Client, Config, ResourceExt, config::KubeConfigOptions, runtime::watcher};
use tokio::{
sync::RwLock,
task::JoinHandle,
time::{sleep, timeout},
};
use tokio_stream::StreamExt;
use crate::discovery::{Discovery, DiscoveryError, DiscoveryResult, ServiceInstance};
use crate::discovery_kube::{
KubeDiscoveryCache, KubeDiscoveryConfig, KubeDiscoveryError, KubeDiscoveryResult,
KubeDiscoveryStatus, map_endpoint_slice, map_endpoints,
};
enum KubeDiscoverySource {
Snapshot { service: String, snapshot: String },
Watch { task: Arc<JoinHandle<()>> },
}
#[derive(Clone)]
pub struct KubeDiscovery {
config: KubeDiscoveryConfig,
cache: Arc<RwLock<KubeDiscoveryCache>>,
source: Arc<KubeDiscoverySource>,
}
impl std::fmt::Debug for KubeDiscovery {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("KubeDiscovery")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl KubeDiscovery {
pub fn from_snapshot(
config: KubeDiscoveryConfig,
service: impl Into<String>,
snapshot: impl Into<String>,
) -> Self {
Self {
config,
cache: Arc::new(RwLock::new(KubeDiscoveryCache::default())),
source: Arc::new(KubeDiscoverySource::Snapshot {
service: service.into(),
snapshot: snapshot.into(),
}),
}
}
pub async fn connect(config: KubeDiscoveryConfig) -> KubeDiscoveryResult<Self> {
validate_config(&config)?;
let mut kube_config = if config.context.is_some() {
Config::from_kubeconfig(&KubeConfigOptions {
context: config.context.clone(),
cluster: None,
user: None,
})
.await
.map_err(|error| KubeDiscoveryError::Client(error.to_string()))?
} else {
Config::infer()
.await
.map_err(|error| KubeDiscoveryError::Client(error.to_string()))?
};
kube_config.apply_debug_overrides();
let client = Client::try_from(kube_config)
.map_err(|error| KubeDiscoveryError::Client(error.to_string()))?;
Self::from_client(config, client).await
}
pub async fn from_client(
config: KubeDiscoveryConfig,
client: Client,
) -> KubeDiscoveryResult<Self> {
validate_config(&config)?;
let cache = Arc::new(RwLock::new(KubeDiscoveryCache::default()));
let task = tokio::spawn(run_endpoint_slice_watcher(
config.clone(),
client,
Arc::clone(&cache),
));
wait_for_initial_sync(&config, &cache).await?;
Ok(Self {
config,
cache,
source: Arc::new(KubeDiscoverySource::Watch {
task: Arc::new(task),
}),
})
}
pub async fn status(&self) -> KubeDiscoveryStatus {
self.cache.read().await.status()
}
}
impl Drop for KubeDiscovery {
fn drop(&mut self) {
if let KubeDiscoverySource::Watch { task } = self.source.as_ref()
&& Arc::strong_count(task) == 1
{
task.abort();
}
}
}
#[async_trait]
impl Discovery for KubeDiscovery {
async fn discover(&self, service: &str) -> DiscoveryResult<Vec<ServiceInstance>> {
match self.source.as_ref() {
KubeDiscoverySource::Snapshot {
service: expected,
snapshot,
} => discover_snapshot(&self.config, expected, snapshot, service),
KubeDiscoverySource::Watch { .. } => self.cache.read().await.discover(service),
}
}
}
async fn wait_for_initial_sync(
config: &KubeDiscoveryConfig,
cache: &Arc<RwLock<KubeDiscoveryCache>>,
) -> KubeDiscoveryResult<()> {
timeout(config.sync_timeout, async {
loop {
if cache.read().await.status().synced {
return;
}
sleep(std::time::Duration::from_millis(50)).await;
}
})
.await
.map_err(|_| KubeDiscoveryError::Timeout {
operation: "initial_sync",
})
}
async fn run_endpoint_slice_watcher(
config: KubeDiscoveryConfig,
client: Client,
cache: Arc<RwLock<KubeDiscoveryCache>>,
) {
let api: Api<EndpointSlice> = Api::namespaced(client, &config.namespace);
let mut stream = Box::pin(watcher(api, watcher_config(&config)));
while let Some(event) = stream.next().await {
match event {
Ok(event) => apply_watcher_event(&config, &cache, event).await,
Err(error) => {
cache.write().await.mark_error(error.to_string());
}
}
}
cache
.write()
.await
.mark_error("kubernetes EndpointSlice watch stream ended");
}
async fn apply_watcher_event(
config: &KubeDiscoveryConfig,
cache: &Arc<RwLock<KubeDiscoveryCache>>,
event: watcher::Event<EndpointSlice>,
) {
match event {
watcher::Event::Init => cache.write().await.begin_init(),
watcher::Event::InitApply(slice) => match map_endpoint_slice(config, &slice) {
Ok(update) => cache.write().await.buffer_init(update),
Err(error) => cache.write().await.mark_error(error.to_string()),
},
watcher::Event::InitDone => cache.write().await.finish_init(),
watcher::Event::Apply(slice) => match map_endpoint_slice(config, &slice) {
Ok(update) => cache.write().await.apply_slice(update),
Err(error) => cache.write().await.mark_error(error.to_string()),
},
watcher::Event::Delete(slice) => {
let service = service_name(config, &slice);
cache
.write()
.await
.delete_slice(&service, &slice.name_any());
}
}
}
fn watcher_config(config: &KubeDiscoveryConfig) -> watcher::Config {
let mut watcher_config = watcher::Config::default();
if let Some(selector) = &config.label_selector {
watcher_config = watcher_config.labels(selector);
}
if let Some(selector) = &config.field_selector {
watcher_config = watcher_config.fields(selector);
}
if let Some(timeout) = config.watch_timeout {
watcher_config = watcher_config.timeout(timeout);
}
watcher_config.page_size = config.page_size;
watcher_config
}
fn service_name(config: &KubeDiscoveryConfig, slice: &EndpointSlice) -> String {
slice
.metadata
.labels
.as_ref()
.and_then(|labels| labels.get(crate::discovery_kube::SERVICE_NAME_LABEL))
.cloned()
.or_else(|| config.service_name.clone())
.unwrap_or_default()
}
fn discover_snapshot(
config: &KubeDiscoveryConfig,
expected: &str,
snapshot: &str,
service: &str,
) -> DiscoveryResult<Vec<ServiceInstance>> {
if service != expected {
return Err(DiscoveryError::NoInstances {
service: service.to_string(),
});
}
let instances = map_endpoints(config, snapshot).map_err(|error| DiscoveryError::Resolve {
host: "kubernetes".to_string(),
message: error.to_string(),
})?;
if instances.is_empty() {
Err(DiscoveryError::NoInstances {
service: service.to_string(),
})
} else {
Ok(instances)
}
}
fn validate_config(config: &KubeDiscoveryConfig) -> KubeDiscoveryResult<()> {
if config.namespace.trim().is_empty() {
return Err(KubeDiscoveryError::InvalidConfig(
"namespace must not be empty".to_string(),
));
}
if config
.service_name
.as_ref()
.is_some_and(|service| service.trim().is_empty())
{
return Err(KubeDiscoveryError::InvalidConfig(
"service_name must not be empty".to_string(),
));
}
if config.sync_timeout.is_zero() {
return Err(KubeDiscoveryError::InvalidConfig(
"sync_timeout must be greater than zero".to_string(),
));
}
Ok(())
}