engenho-controllers 0.1.4

engenho-controllers — the engenho K8s controller suite. Hosts the Controller trait + canonical implementations: ReplicaSetController (R9), DeploymentController (R9.5), ServiceController (R9.6), GC (R9.7). Each is a thin reconcile loop on engenho-store. Same shape as engenho-scheduler — the second-site for the controller pattern.
//! `ControllerRuntime` — runs N [`Controller`] impls on a shared
//! tokio runtime with per-controller intervals.
//!
//! Operator code instantiates one [`ControllerRuntime`], registers
//! controllers, calls [`ControllerRuntime::run`] which spawns
//! per-controller tick loops. Each loop logs the [`ReconcileReport`]
//! after every tick.
//!
//! At R9 the runtime is the simplest possible thing: a Vec of
//! controllers + per-controller intervals. R9.5+ may add leader
//! election (only one runtime instance ticks; others stand by),
//! priority queues, and shared work queues — but the trait surface
//! doesn't change.

use std::sync::Arc;
use std::time::Duration;

use tokio::task::JoinHandle;

use crate::controller::Controller;

#[derive(Clone, Debug)]
pub struct RuntimeConfig {
    /// Default reconcile interval if a controller doesn't specify one.
    pub default_interval: Duration,
}

impl Default for RuntimeConfig {
    fn default() -> Self {
        Self {
            default_interval: Duration::from_secs(10),
        }
    }
}

/// Wire engenho-config's top-level ControllersConfig into the runtime.
/// The single fallback_interval_seconds becomes default_interval —
/// individual controllers can still override per-registration.
impl From<&engenho_config::ControllersConfig> for RuntimeConfig {
    fn from(top: &engenho_config::ControllersConfig) -> Self {
        Self {
            default_interval: Duration::from_secs(u64::from(top.fallback_interval_seconds)),
        }
    }
}

impl From<engenho_config::ControllersConfig> for RuntimeConfig {
    fn from(top: engenho_config::ControllersConfig) -> Self {
        (&top).into()
    }
}

pub struct ControllerRuntime {
    config: RuntimeConfig,
    controllers: Vec<(Arc<dyn Controller>, Duration)>,
}

impl ControllerRuntime {
    #[must_use]
    pub fn new(config: RuntimeConfig) -> Self {
        Self {
            config,
            controllers: Vec::new(),
        }
    }

    /// Register a controller with the default interval.
    pub fn register<C: Controller + 'static>(&mut self, controller: C) -> &mut Self {
        let interval = self.config.default_interval;
        self.controllers.push((Arc::new(controller), interval));
        self
    }

    /// Register a controller with an explicit interval.
    pub fn register_with_interval<C: Controller + 'static>(
        &mut self,
        controller: C,
        interval: Duration,
    ) -> &mut Self {
        self.controllers.push((Arc::new(controller), interval));
        self
    }

    /// Spawn the per-controller tick loops. Returns one JoinHandle
    /// per controller. The caller can abort each handle to stop a
    /// controller individually.
    #[must_use]
    pub fn spawn(self) -> Vec<JoinHandle<()>> {
        let mut handles = Vec::new();
        for (controller, interval) in self.controllers {
            let handle = tokio::spawn(async move {
                loop {
                    match controller.tick().await {
                        Ok(report) => report.log(controller.name()),
                        Err(e) => tracing::error!(
                            controller = controller.name(),
                            error = %e,
                            "reconcile failed"
                        ),
                    }
                    tokio::time::sleep(interval).await;
                }
            });
            handles.push(handle);
        }
        handles
    }

    /// Number of registered controllers (for testing + telemetry).
    #[must_use]
    pub fn len(&self) -> usize {
        self.controllers.len()
    }

    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.controllers.is_empty()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::controller::{Controller, ReconcileReport};
    use crate::error::ControllerError;
    use async_trait::async_trait;

    struct Counter {
        name: &'static str,
    }

    #[async_trait]
    impl Controller for Counter {
        fn name(&self) -> &'static str {
            self.name
        }
        async fn tick(&self) -> Result<ReconcileReport, ControllerError> {
            Ok(ReconcileReport {
                objects_examined: 1,
                ..Default::default()
            })
        }
    }

    #[test]
    fn runtime_starts_empty() {
        let rt = ControllerRuntime::new(RuntimeConfig::default());
        assert!(rt.is_empty());
        assert_eq!(rt.len(), 0);
    }

    #[test]
    fn runtime_config_from_engenho_config_carries_fallback_interval() {
        use engenho_config::{ControllerEnable, ControllersConfig};
        let top = ControllersConfig {
            enable: ControllerEnable {
                replicaset: true,
                deployment: true,
                endpoints: true,
                gc: true,
            },
            namespace: String::new(),
            fallback_interval_seconds: 45,
            debounce_milliseconds: 100,
        };
        let runtime: RuntimeConfig = (&top).into();
        assert_eq!(runtime.default_interval, Duration::from_secs(45));
        // From-owned variant works too.
        let runtime_owned: RuntimeConfig = top.into();
        assert_eq!(runtime_owned.default_interval, Duration::from_secs(45));
    }

    #[test]
    fn runtime_registers_multiple_controllers() {
        let mut rt = ControllerRuntime::new(RuntimeConfig::default());
        rt.register(Counter { name: "a" });
        rt.register(Counter { name: "b" });
        rt.register_with_interval(Counter { name: "c" }, Duration::from_secs(60));
        assert_eq!(rt.len(), 3);
    }

    #[tokio::test]
    async fn runtime_spawn_runs_at_least_one_tick() {
        let mut rt = ControllerRuntime::new(RuntimeConfig {
            default_interval: Duration::from_millis(10),
        });
        rt.register(Counter { name: "t" });
        let handles = rt.spawn();
        // Let it tick a few times.
        tokio::time::sleep(Duration::from_millis(50)).await;
        for h in handles {
            h.abort();
        }
    }
}