use futures::TryStreamExt;
use kube::{Api, Client, ResourceExt};
use kube_runtime::WatchStreamExt;
use kube_runtime::watcher::{Config, watcher};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{error, info};
use crate::error::Result;
use crate::scope::ApiScope;
use crate::traits::KubeResource;
#[derive(Debug, Clone)]
pub enum WatchEvent<T> {
Applied(T),
Deleted(T),
}
async fn spawn_watcher<T>(
api: Api<T>,
config: Config,
tx: mpsc::Sender<()>,
) -> Result<JoinHandle<()>>
where
T: KubeResource,
{
let handle = tokio::task::spawn(async move {
let result = watcher(api, config)
.applied_objects()
.default_backoff()
.try_for_each(|resource: T| {
let tx = tx.clone();
async move {
info!(name = %resource.name_any(), "Resource applied");
tx.send(()).await.ok();
Ok(())
}
})
.await;
if let Err(e) = result {
error!(error = %e, "Resource watcher failed");
}
});
Ok(handle)
}
pub async fn watch<T, Scope>(
client: Client,
scope: Scope,
label_selector: Option<&str>,
tx: mpsc::Sender<()>,
) -> Result<JoinHandle<()>>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match label_selector {
Some(sel) => info!(%kind, label_selector = %sel, "Starting signal watcher"),
None => info!(%kind, "Starting signal watcher"),
}
let config = match label_selector {
Some(sel) => Config::default().labels(sel),
None => Config::default(),
};
spawn_watcher(scope.into_api(client), config, tx).await
}
pub async fn watch_objects<T, Scope>(
client: Client,
scope: Scope,
label_selector: Option<&str>,
tx: mpsc::Sender<T>,
) -> Result<JoinHandle<()>>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match label_selector {
Some(sel) => info!(%kind, label_selector = %sel, "Starting object watcher"),
None => info!(%kind, "Starting object watcher"),
}
let config = match label_selector {
Some(sel) => Config::default().labels(sel),
None => Config::default(),
};
let api = scope.into_api(client);
let handle = tokio::task::spawn(async move {
let result = watcher(api, config)
.applied_objects()
.default_backoff()
.try_for_each(|resource: T| {
let tx = tx.clone();
async move {
info!(name = %resource.name_any(), "Resource applied — sending object");
tx.send(resource).await.ok();
Ok(())
}
})
.await;
if let Err(e) = result {
error!(error = %e, "Object watcher failed");
}
});
Ok(handle)
}
pub async fn watch_events<T, Scope>(
client: Client,
scope: Scope,
label_selector: Option<&str>,
tx: mpsc::Sender<WatchEvent<T>>,
) -> Result<JoinHandle<()>>
where
T: KubeResource,
Scope: ApiScope<T>,
{
let kind = T::kind(&());
match label_selector {
Some(sel) => info!(%kind, label_selector = %sel, "Starting event watcher"),
None => info!(%kind, "Starting event watcher"),
}
let config = match label_selector {
Some(sel) => Config::default().labels(sel),
None => Config::default(),
};
let api = scope.into_api(client);
let handle = tokio::task::spawn(async move {
let result = watcher(api, config)
.default_backoff()
.try_for_each(|event| {
let tx = tx.clone();
async move {
let msg = match event {
kube_runtime::watcher::Event::Apply(r) => {
info!(name = %r.name_any(), "Resource applied");
Some(WatchEvent::Applied(r))
}
kube_runtime::watcher::Event::Delete(r) => {
info!(name = %r.name_any(), "Resource deleted");
Some(WatchEvent::Deleted(r))
}
kube_runtime::watcher::Event::InitApply(r) => {
info!(name = %r.name_any(), "Resource observed (init)");
Some(WatchEvent::Applied(r))
}
kube_runtime::watcher::Event::Init
| kube_runtime::watcher::Event::InitDone => None,
};
if let Some(msg) = msg {
tx.send(msg).await.ok();
}
Ok(())
}
})
.await;
if let Err(e) = result {
error!(error = %e, "Event watcher failed");
}
});
Ok(handle)
}