use std::sync::Arc;
use std::time::Duration;
use engenho_store::{StoreMesh, WatchEvent};
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use crate::controller::Controller;
#[derive(Clone, Debug)]
pub enum KindFilter {
All,
Kinds(Vec<String>),
}
impl KindFilter {
#[must_use]
pub fn kind(name: impl Into<String>) -> Self {
Self::Kinds(vec![name.into()])
}
fn matches(&self, ev: &WatchEvent) -> bool {
match self {
Self::All => true,
Self::Kinds(list) => list.iter().any(|k| k == &ev.key.kind),
}
}
}
#[derive(Clone, Debug)]
pub struct WatchDriverConfig {
pub filter: KindFilter,
pub debounce: Duration,
pub fallback_interval: Duration,
}
impl Default for WatchDriverConfig {
fn default() -> Self {
Self {
filter: KindFilter::All,
debounce: Duration::from_millis(50),
fallback_interval: Duration::from_secs(30),
}
}
}
pub struct WatchDriver<C: Controller> {
controller: Arc<C>,
store: Arc<StoreMesh>,
config: WatchDriverConfig,
}
impl<C: Controller + 'static> WatchDriver<C> {
#[must_use]
pub fn new(controller: C, store: Arc<StoreMesh>, config: WatchDriverConfig) -> Self {
Self {
controller: Arc::new(controller),
store,
config,
}
}
pub fn spawn(self) -> JoinHandle<()> {
let controller = self.controller;
let store = self.store;
let config = self.config;
tokio::spawn(async move {
run(controller, store, config).await;
})
}
pub async fn tick_once(&self) -> Result<crate::controller::ReconcileReport, crate::error::ControllerError>
{
self.controller.tick().await
}
}
async fn run<C: Controller + ?Sized>(
controller: Arc<C>,
store: Arc<StoreMesh>,
config: WatchDriverConfig,
) {
let mut rx = store.watch();
info!(
controller = controller.name(),
debounce_ms = config.debounce.as_millis() as u64,
fallback_s = if config.fallback_interval == Duration::MAX {
0
} else {
config.fallback_interval.as_secs()
},
"watch driver started"
);
loop {
let next_event = wait_for_relevant_event(&mut rx, &config).await;
match next_event {
EventOrTimer::Event => {
tokio::time::sleep(config.debounce).await;
while let Ok(ev) = rx.try_recv() {
if config.filter.matches(&ev) {
debug!(
controller = controller.name(),
key = %ev.key.label(),
"coalesced event"
);
}
}
tick_and_log(controller.as_ref()).await;
}
EventOrTimer::FallbackTimer => {
tick_and_log(controller.as_ref()).await;
}
EventOrTimer::Shutdown => {
info!(controller = controller.name(), "watch driver shutting down");
return;
}
}
}
}
#[derive(Debug)]
enum EventOrTimer {
Event,
FallbackTimer,
Shutdown,
}
async fn wait_for_relevant_event(
rx: &mut tokio::sync::broadcast::Receiver<WatchEvent>,
config: &WatchDriverConfig,
) -> EventOrTimer {
let timer = tokio::time::sleep(config.fallback_interval);
tokio::pin!(timer);
loop {
tokio::select! {
biased;
res = rx.recv() => match res {
Ok(ev) => {
if config.filter.matches(&ev) {
return EventOrTimer::Event;
}
}
Err(RecvError::Lagged(n)) => {
warn!(lagged = n, "watch driver fell behind; ticking anyway");
return EventOrTimer::Event;
}
Err(RecvError::Closed) => return EventOrTimer::Shutdown,
},
_ = &mut timer => return EventOrTimer::FallbackTimer,
}
}
}
async fn tick_and_log<C: Controller + ?Sized>(controller: &C) {
match controller.tick().await {
Ok(report) => report.log(controller.name()),
Err(e) => warn!(
controller = controller.name(),
error = %e,
"reconcile failed"
),
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;
fn ev(kind: &str, name: &str) -> WatchEvent {
use engenho_store::ResourceKey;
WatchEvent {
kind: engenho_store::WatchEventKind::Added,
object: Value::Null,
key: ResourceKey::namespaced("", "v1", kind, "default", name),
resource_version: 1,
}
}
#[test]
fn kind_filter_all_matches_anything() {
let f = KindFilter::All;
assert!(f.matches(&ev("Pod", "x")));
assert!(f.matches(&ev("Service", "x")));
}
#[test]
fn kind_filter_specific_matches_only_listed() {
let f = KindFilter::Kinds(vec!["Pod".into(), "Endpoints".into()]);
assert!(f.matches(&ev("Pod", "x")));
assert!(f.matches(&ev("Endpoints", "x")));
assert!(!f.matches(&ev("Service", "x")));
}
#[test]
fn kind_filter_kind_helper() {
let f = KindFilter::kind("Pod");
assert!(f.matches(&ev("Pod", "p")));
assert!(!f.matches(&ev("Node", "n")));
}
#[test]
fn config_default_is_sensible() {
let cfg = WatchDriverConfig::default();
assert_eq!(cfg.debounce, Duration::from_millis(50));
assert_eq!(cfg.fallback_interval, Duration::from_secs(30));
assert!(matches!(cfg.filter, KindFilter::All));
}
}