use std::collections::HashMap;
use async_trait::async_trait;
use futures::{
StreamExt,
TryStreamExt,
};
use kube::runtime::watcher::watcher;
use sk_core::errors::*;
use sk_core::k8s::PodLifecycleData;
use sk_core::prelude::*;
use tokio::sync::mpsc;
use tracing::*;
use crate::watchers::{
EventHandler,
ObjWatcher,
};
#[derive(Debug)]
pub struct Message {
pub(crate) ns_name: String,
pub(crate) maybe_pod: Option<corev1::Pod>,
pub(crate) lifecycle_data: PodLifecycleData,
}
pub type Sender = mpsc::UnboundedSender<Message>;
pub type Receiver = mpsc::UnboundedReceiver<Message>;
pub fn new_with_stream(
client: kube::Client,
pod_tx: Sender,
ready_tx: mpsc::Sender<bool>,
) -> anyhow::Result<ObjWatcher<corev1::Pod>> {
let pod_api: kube::Api<corev1::Pod> = kube::Api::all(client);
let pod_handler = Box::new(PodHandler { owned_pods: HashMap::new(), pod_tx });
let pod_stream = watcher(pod_api, Default::default()).map_err(|e| e.into()).boxed();
Ok(ObjWatcher::new(pod_handler, pod_stream, ready_tx))
}
pub(super) struct PodHandler {
owned_pods: HashMap<String, PodLifecycleData>,
pod_tx: Sender,
}
impl PodHandler {
async fn handle_pod_applied(&mut self, ns_name: &str, pod: corev1::Pod) -> EmptyResult {
let new_lifecycle_data = PodLifecycleData::new_for(&pod)?;
let current_lifecycle_data = self.owned_pods.get(ns_name);
if new_lifecycle_data > current_lifecycle_data {
self.owned_pods.insert(ns_name.into(), new_lifecycle_data.clone());
self.send_pod_lifecycle_data(ns_name, Some(pod), new_lifecycle_data).await?;
} else if !new_lifecycle_data.empty() && new_lifecycle_data != current_lifecycle_data {
warn!(
"new lifecycle data for {} does not match stored data, cowardly refusing to update: {:?} !>= {:?}",
ns_name, new_lifecycle_data, current_lifecycle_data
);
}
Ok(())
}
async fn handle_pod_deleted(
&mut self,
ns_name: &str,
current_lifecycle_data: PodLifecycleData,
ts: i64,
) -> EmptyResult {
self.owned_pods.remove(ns_name);
if current_lifecycle_data.finished() {
return Ok(());
}
let start_ts = current_lifecycle_data.start_ts().unwrap();
let new_lifecycle_data = PodLifecycleData::Finished(start_ts, ts);
self.send_pod_lifecycle_data(ns_name, None, new_lifecycle_data).await
}
async fn send_pod_lifecycle_data(
&self,
ns_name: &str,
maybe_pod: Option<corev1::Pod>,
lifecycle_data: PodLifecycleData,
) -> EmptyResult {
self.pod_tx
.send(Message { ns_name: ns_name.into(), maybe_pod, lifecycle_data })?;
Ok(())
}
}
#[async_trait]
impl EventHandler<corev1::Pod> for PodHandler {
async fn applied(&mut self, pod: corev1::Pod, _ts: i64) -> EmptyResult {
let ns_name = pod.namespaced_name();
self.handle_pod_applied(&ns_name, pod).await
}
async fn deleted(&mut self, ns_name: &str, ts: i64) -> EmptyResult {
let Some(current_lifecycle_data) = self.owned_pods.get(ns_name) else {
warn!("pod {ns_name} deleted but not tracked, may have already been processed");
return Ok(());
};
self.handle_pod_deleted(ns_name, current_lifecycle_data.clone(), ts).await
}
}
#[cfg(test)]
#[cfg_attr(coverage, coverage(off))]
impl PodHandler {
pub(crate) fn new_from_parts(owned_pods: HashMap<String, PodLifecycleData>, pod_tx: Sender) -> PodHandler {
PodHandler { owned_pods, pod_tx }
}
pub(crate) fn get_owned_pod_lifecycle(&self, ns_name: &str) -> Option<&PodLifecycleData> {
self.owned_pods.get(ns_name)
}
}