use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use crate::controller::Controller;
#[derive(Clone, Debug)]
pub struct RuntimeConfig {
pub default_interval: Duration,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
default_interval: Duration::from_secs(10),
}
}
}
impl From<&engenho_config::ControllersConfig> for RuntimeConfig {
fn from(top: &engenho_config::ControllersConfig) -> Self {
Self {
default_interval: Duration::from_secs(u64::from(top.fallback_interval_seconds)),
}
}
}
impl From<engenho_config::ControllersConfig> for RuntimeConfig {
fn from(top: engenho_config::ControllersConfig) -> Self {
(&top).into()
}
}
pub struct ControllerRuntime {
config: RuntimeConfig,
controllers: Vec<(Arc<dyn Controller>, Duration)>,
}
impl ControllerRuntime {
#[must_use]
pub fn new(config: RuntimeConfig) -> Self {
Self {
config,
controllers: Vec::new(),
}
}
pub fn register<C: Controller + 'static>(&mut self, controller: C) -> &mut Self {
let interval = self.config.default_interval;
self.controllers.push((Arc::new(controller), interval));
self
}
pub fn register_with_interval<C: Controller + 'static>(
&mut self,
controller: C,
interval: Duration,
) -> &mut Self {
self.controllers.push((Arc::new(controller), interval));
self
}
#[must_use]
pub fn spawn(self) -> Vec<JoinHandle<()>> {
let mut handles = Vec::new();
for (controller, interval) in self.controllers {
let handle = tokio::spawn(async move {
loop {
match controller.tick().await {
Ok(report) => report.log(controller.name()),
Err(e) => tracing::error!(
controller = controller.name(),
error = %e,
"reconcile failed"
),
}
tokio::time::sleep(interval).await;
}
});
handles.push(handle);
}
handles
}
#[must_use]
pub fn len(&self) -> usize {
self.controllers.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.controllers.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::controller::{Controller, ReconcileReport};
use crate::error::ControllerError;
use async_trait::async_trait;
struct Counter {
name: &'static str,
}
#[async_trait]
impl Controller for Counter {
fn name(&self) -> &'static str {
self.name
}
async fn tick(&self) -> Result<ReconcileReport, ControllerError> {
Ok(ReconcileReport {
objects_examined: 1,
..Default::default()
})
}
}
#[test]
fn runtime_starts_empty() {
let rt = ControllerRuntime::new(RuntimeConfig::default());
assert!(rt.is_empty());
assert_eq!(rt.len(), 0);
}
#[test]
fn runtime_config_from_engenho_config_carries_fallback_interval() {
use engenho_config::{ControllerEnable, ControllersConfig};
let top = ControllersConfig {
enable: ControllerEnable {
replicaset: true,
deployment: true,
endpoints: true,
gc: true,
},
namespace: String::new(),
fallback_interval_seconds: 45,
debounce_milliseconds: 100,
};
let runtime: RuntimeConfig = (&top).into();
assert_eq!(runtime.default_interval, Duration::from_secs(45));
let runtime_owned: RuntimeConfig = top.into();
assert_eq!(runtime_owned.default_interval, Duration::from_secs(45));
}
#[test]
fn runtime_registers_multiple_controllers() {
let mut rt = ControllerRuntime::new(RuntimeConfig::default());
rt.register(Counter { name: "a" });
rt.register(Counter { name: "b" });
rt.register_with_interval(Counter { name: "c" }, Duration::from_secs(60));
assert_eq!(rt.len(), 3);
}
#[tokio::test]
async fn runtime_spawn_runs_at_least_one_tick() {
let mut rt = ControllerRuntime::new(RuntimeConfig {
default_interval: Duration::from_millis(10),
});
rt.register(Counter { name: "t" });
let handles = rt.spawn();
tokio::time::sleep(Duration::from_millis(50)).await;
for h in handles {
h.abort();
}
}
}