use std::collections::HashMap;
use std::mem::take;
use std::sync::{
Arc,
Mutex,
};
use async_trait::async_trait;
use futures::{
StreamExt,
TryStreamExt,
};
use kube::runtime::watcher::watcher;
use sk_core::errors::*;
use sk_core::k8s::{
ApiSet,
OwnersCache,
PodLifecycleData,
};
use sk_core::prelude::*;
use tracing::*;
use crate::watchers::{
EventHandler,
ObjStream,
};
use crate::TraceStorable;
pub struct PodHandler {
owned_pods: HashMap<String, PodLifecycleData>,
owners_cache: OwnersCache,
}
impl PodHandler {
pub fn new_with_stream(client: kube::Client, apiset: ApiSet) -> (Box<PodHandler>, ObjStream<corev1::Pod>) {
let pod_api: kube::Api<corev1::Pod> = kube::Api::all(client);
(
Box::new(PodHandler {
owned_pods: HashMap::new(),
owners_cache: OwnersCache::new(apiset),
}),
watcher(pod_api, Default::default()).map_err(|e| e.into()).boxed(),
)
}
async fn handle_pod_applied(
&mut self,
ns_name: &str,
pod: &corev1::Pod,
store: Arc<Mutex<dyn TraceStorable + Send>>,
) -> EmptyResult {
let new_lifecycle_data = PodLifecycleData::new_for(pod)?;
let current_lifecycle_data = self.owned_pods.get(ns_name);
if new_lifecycle_data > current_lifecycle_data {
self.owned_pods.insert(ns_name.into(), new_lifecycle_data.clone());
self.store_pod_lifecycle_data(ns_name, Some(pod), &new_lifecycle_data, store)
.await?;
} else if !new_lifecycle_data.empty() && new_lifecycle_data != current_lifecycle_data {
warn!(
"new lifecycle data for {} does not match stored data, cowardly refusing to update: {:?} !>= {:?}",
ns_name, new_lifecycle_data, current_lifecycle_data
);
}
Ok(())
}
async fn handle_pod_deleted(
&mut self,
ns_name: &str,
maybe_pod: Option<&corev1::Pod>,
current_lifecycle_data: PodLifecycleData,
store: Arc<Mutex<dyn TraceStorable + Send>>,
ts: i64,
) -> EmptyResult {
self.owned_pods.remove(ns_name);
if current_lifecycle_data.finished() {
return Ok(());
}
let new_lifecycle_data = match maybe_pod {
None => {
let start_ts = current_lifecycle_data.start_ts().unwrap();
PodLifecycleData::Finished(start_ts, ts)
},
Some(pod) => PodLifecycleData::guess_finished_lifecycle(pod, ¤t_lifecycle_data, ts)?,
};
self.store_pod_lifecycle_data(ns_name, maybe_pod, &new_lifecycle_data, store)
.await
}
async fn store_pod_lifecycle_data(
&mut self,
ns_name: &str,
maybe_pod: Option<&corev1::Pod>,
lifecycle_data: &PodLifecycleData,
store: Arc<Mutex<dyn TraceStorable + Send>>,
) -> EmptyResult {
let owners = match (self.owners_cache.lookup(ns_name), maybe_pod) {
(Some(o), _) => o.clone(),
(None, Some(pod)) => self.owners_cache.compute_owner_chain(pod).await?,
_ => bail!("could not determine owner chain for {}", ns_name),
};
let mut s = store.lock().expect("trace store mutex poisoned");
s.record_pod_lifecycle(ns_name, maybe_pod.cloned(), owners, lifecycle_data)
}
}
#[async_trait]
impl EventHandler<corev1::Pod> for PodHandler {
async fn applied(
&mut self,
pod: &corev1::Pod,
_ts: i64,
store: Arc<Mutex<dyn TraceStorable + Send>>,
) -> EmptyResult {
let ns_name = pod.namespaced_name();
self.handle_pod_applied(&ns_name, pod, store).await
}
async fn deleted(
&mut self,
pod: &corev1::Pod,
ts: i64,
store: Arc<Mutex<dyn TraceStorable + Send>>,
) -> EmptyResult {
let ns_name = pod.namespaced_name();
let Some(current_lifecycle_data) = self.owned_pods.get(&ns_name) else {
warn!("pod {ns_name} deleted but not tracked, may have already been processed");
return Ok(());
};
self.handle_pod_deleted(&ns_name, Some(pod), current_lifecycle_data.clone(), store, ts)
.await
}
async fn initialized(
&mut self,
pods: &[corev1::Pod],
ts: i64,
store: Arc<Mutex<dyn TraceStorable + Send>>,
) -> EmptyResult {
let mut old_owned_pods = take(&mut self.owned_pods);
for pod in pods {
let ns_name = &pod.namespaced_name();
if let Some(current_lifecycle_data) = old_owned_pods.remove(ns_name) {
self.owned_pods.insert(ns_name.into(), current_lifecycle_data);
}
if let Err(err) = self.handle_pod_applied(ns_name, pod, store.clone()).await {
skerr!(err, "(watcher restart) applied pod {} lifecycle data could not be stored", ns_name);
}
}
for (ns_name, current_lifecycle_data) in &old_owned_pods {
if let Err(err) = self
.handle_pod_deleted(ns_name, None, current_lifecycle_data.clone(), store.clone(), ts)
.await
{
skerr!(err, "(watcher restart) deleted pod {} lifecycle data could not be stored", ns_name);
}
}
Ok(())
}
}
#[cfg(test)]
impl PodHandler {
pub(crate) fn new_from_parts(
owned_pods: HashMap<String, PodLifecycleData>,
owners_cache: OwnersCache,
) -> PodHandler {
PodHandler { owned_pods, owners_cache }
}
pub(crate) fn get_owned_pod_lifecycle(&self, ns_name: &str) -> Option<&PodLifecycleData> {
self.owned_pods.get(ns_name)
}
}