use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use futures::StreamExt;
use k8s_openapi::api::coordination::v1::{Lease, LeaseSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::MicroTime;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::Client;
use tracing::{debug, error, info, warn};
use crate::error::{KubeGenericError, Result};
use crate::observability::{Metrics, serve_metrics};
use crate::traits::KubeResource;
pub use kube_runtime::controller::Action;
pub use kube_runtime::reflector::ObjectRef;
pub use kube_runtime::watcher;
pub struct Context<T = ()> {
pub client: Client,
pub data: T,
}
impl Context<()> {
pub fn new(client: Client) -> Arc<Self> {
Arc::new(Self { client, data: () })
}
}
impl<T> Context<T> {
pub fn with_data(client: Client, data: T) -> Arc<Self> {
Arc::new(Self { client, data })
}
}
pub trait Reconciler<CR, T = ()>: Send + Sync + 'static
where
CR: KubeResource,
T: Send + Sync + 'static,
{
type Error: std::error::Error + Send + Sync + 'static;
fn reconcile(
&self,
cr: Arc<CR>,
ctx: Arc<Context<T>>,
) -> impl std::future::Future<Output = std::result::Result<Action, Self::Error>> + Send;
fn error_policy(&self, _cr: Arc<CR>, _err: &Self::Error, _ctx: Arc<Context<T>>) -> Action {
Action::requeue(Duration::from_secs(30))
}
}
pub(crate) async fn serve_health(listener: tokio::net::TcpListener, ready: Arc<AtomicBool>) {
use bytes::Bytes;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
loop {
let Ok((stream, _)) = listener.accept().await else {
break;
};
let ready = ready.clone();
tokio::spawn(async move {
let io = TokioIo::new(stream);
let svc = service_fn(move |req: Request<Incoming>| {
let ready = ready.clone();
async move {
let (status, body): (StatusCode, &'static str) =
if req.uri().path() == "/readyz" {
if ready.load(Ordering::Acquire) {
(StatusCode::OK, "ok")
} else {
(StatusCode::SERVICE_UNAVAILABLE, "not ready")
}
} else {
(StatusCode::OK, "ok")
};
Ok::<_, std::convert::Infallible>(
Response::builder()
.status(status)
.header("content-type", "text/plain")
.body(Full::new(Bytes::from_static(body.as_bytes())))
.unwrap(),
)
}
});
let _ = http1::Builder::new().serve_connection(io, svc).await;
});
}
}
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
{
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler");
tokio::select! {
_ = ctrl_c => {},
_ = sigterm.recv() => {},
}
}
#[cfg(not(unix))]
ctrl_c.await;
}
pub(crate) struct LeaderElectionConfig {
pub(crate) namespace: String,
pub(crate) name: String,
pub(crate) identity: String,
pub(crate) lease_duration_secs: i32,
pub(crate) renew_period: Duration,
pub(crate) retry_period: Duration,
}
pub(crate) async fn try_acquire_or_renew(
config: &LeaderElectionConfig,
client: &Client,
) -> Result<bool> {
let api: kube::Api<Lease> = kube::Api::namespaced(client.clone(), &config.namespace);
let now = chrono::Utc::now();
let now_micro = MicroTime(now);
let existing = api
.get(&config.name)
.await
.map(Some)
.or_else(|e| match &e {
kube::Error::Api(ae) if ae.code == 404 => Ok(None),
_ => Err(KubeGenericError::Kube(e)),
})?;
match existing {
None => {
let lease = build_lease(
&config.name,
&config.namespace,
&config.identity,
config.lease_duration_secs,
now_micro.clone(),
now_micro,
None,
Some(0),
);
match api.create(&kube::api::PostParams::default(), &lease).await {
Ok(_) => Ok(true),
Err(kube::Error::Api(e)) if e.code == 409 => Ok(false),
Err(e) => Err(KubeGenericError::Kube(e)),
}
}
Some(existing) => {
let spec = existing.spec.as_ref();
let holder = spec.and_then(|s| s.holder_identity.as_deref());
let renew_time = spec.and_then(|s| s.renew_time.as_ref()).map(|t| t.0);
let duration_secs = spec
.and_then(|s| s.lease_duration_seconds)
.unwrap_or(config.lease_duration_secs);
let is_expired = renew_time.map_or(true, |rt| {
now > rt + chrono::Duration::seconds(duration_secs as i64)
});
if holder != Some(config.identity.as_str()) && !is_expired {
return Ok(false);
}
let transitioning = holder != Some(config.identity.as_str());
let acquire_time = if transitioning {
now_micro.clone()
} else {
spec.and_then(|s| s.acquire_time.clone())
.unwrap_or_else(|| now_micro.clone())
};
let transitions = spec
.and_then(|s| s.lease_transitions)
.map(|t| if transitioning { t + 1 } else { t });
let new_lease = build_lease(
&config.name,
&config.namespace,
&config.identity,
config.lease_duration_secs,
acquire_time,
now_micro,
existing.metadata.resource_version.clone(),
transitions,
);
match api
.replace(&config.name, &kube::api::PostParams::default(), &new_lease)
.await
{
Ok(_) => Ok(true),
Err(kube::Error::Api(e)) if e.code == 409 => Ok(false),
Err(e) => Err(KubeGenericError::Kube(e)),
}
}
}
}
pub(crate) fn build_lease(
name: &str,
namespace: &str,
identity: &str,
lease_duration_secs: i32,
acquire_time: MicroTime,
renew_time: MicroTime,
resource_version: Option<String>,
lease_transitions: Option<i32>,
) -> Lease {
Lease {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(namespace.to_string()),
resource_version,
..Default::default()
},
spec: Some(LeaseSpec {
holder_identity: Some(identity.to_string()),
lease_duration_seconds: Some(lease_duration_secs),
acquire_time: Some(acquire_time),
renew_time: Some(renew_time),
lease_transitions,
..Default::default()
}),
}
}
async fn acquire_leader_lease(config: &LeaderElectionConfig, client: &Client) -> Result<()> {
info!(
identity = %config.identity,
lease = %config.name,
ns = %config.namespace,
"Waiting to become leader"
);
loop {
match try_acquire_or_renew(config, client).await {
Ok(true) => {
info!(identity = %config.identity, "Acquired leader lease");
return Ok(());
}
Ok(false) => {
debug!(identity = %config.identity, "Lease held by another, retrying");
tokio::time::sleep(config.retry_period).await;
}
Err(e) => {
warn!(error = %e, "Error acquiring leader lease, retrying");
tokio::time::sleep(config.retry_period).await;
}
}
}
}
async fn renew_leader_lease_loop(
config: LeaderElectionConfig,
client: Client,
stop_tx: tokio::sync::watch::Sender<bool>,
) {
let mut consecutive_errors: u32 = 0;
loop {
tokio::time::sleep(config.renew_period).await;
match try_acquire_or_renew(&config, &client).await {
Ok(true) => {
consecutive_errors = 0;
debug!(identity = %config.identity, "Leader lease renewed");
}
Ok(false) => {
error!(
identity = %config.identity,
"Leader lease taken by another replica — stopping controller"
);
stop_tx.send(true).ok();
return;
}
Err(e) => {
consecutive_errors += 1;
warn!(
error = %e,
consecutive_errors,
"Failed to renew leader lease"
);
if consecutive_errors >= 3 {
error!(
identity = %config.identity,
"Could not renew leader lease after 3 attempts — stopping controller"
);
stop_tx.send(true).ok();
return;
}
}
}
}
}
type ConfigureFn<CR> =
Box<dyn FnOnce(kube_runtime::Controller<CR>) -> kube_runtime::Controller<CR> + Send + 'static>;
pub struct ControllerBuilder<CR, T = ()>
where
CR: KubeResource,
{
api: kube::Api<CR>,
pub(crate) watcher_config: watcher::Config,
pub(crate) configure: Option<ConfigureFn<CR>>,
pub(crate) health_port: Option<u16>,
pub(crate) metrics_port: Option<u16>,
pub(crate) graceful_shutdown: bool,
pub(crate) reconcile_timeout: Option<Duration>,
pub(crate) leader_election: Option<LeaderElectionConfig>,
pub(crate) concurrency: Option<u16>,
_phantom: PhantomData<T>,
}
impl<CR, T> ControllerBuilder<CR, T>
where
CR: KubeResource,
T: Send + Sync + 'static,
{
pub fn new(api: kube::Api<CR>) -> Self {
Self {
api,
watcher_config: watcher::Config::default(),
configure: None,
health_port: None,
metrics_port: None,
graceful_shutdown: false,
reconcile_timeout: None,
leader_election: None,
concurrency: None,
_phantom: PhantomData,
}
}
pub fn health_port(mut self, port: u16) -> Self {
self.health_port = Some(port);
self
}
pub fn metrics_port(mut self, port: u16) -> Self {
self.metrics_port = Some(port);
self
}
pub fn graceful_shutdown(mut self) -> Self {
self.graceful_shutdown = true;
self
}
pub fn leader_election(
mut self,
namespace: impl Into<String>,
name: impl Into<String>,
) -> Self {
let identity = std::env::var("POD_NAME")
.or_else(|_| std::env::var("HOSTNAME"))
.unwrap_or_else(|_| "unknown".to_string());
self.leader_election = Some(LeaderElectionConfig {
namespace: namespace.into(),
name: name.into(),
identity,
lease_duration_secs: 15,
renew_period: Duration::from_secs(5),
retry_period: Duration::from_secs(2),
});
self
}
pub fn reconcile_timeout(mut self, timeout: Duration) -> Self {
self.reconcile_timeout = Some(timeout);
self
}
pub fn label_selector(mut self, selector: impl Into<String>) -> Self {
let s = selector.into();
self.watcher_config = self.watcher_config.labels(&s);
self
}
pub fn watcher_config(mut self, config: watcher::Config) -> Self {
self.watcher_config = config;
self
}
pub fn concurrency(mut self, n: u16) -> Self {
self.concurrency = Some(n);
self
}
pub fn leader_election_timings(
mut self,
lease_duration: Duration,
renew_period: Duration,
retry_period: Duration,
) -> Self {
let le = self
.leader_election
.as_mut()
.expect("leader_election_timings must be called after leader_election");
le.lease_duration_secs = lease_duration.as_secs() as i32;
le.renew_period = renew_period;
le.retry_period = retry_period;
self
}
pub fn owns<Child>(mut self, api: kube::Api<Child>, config: watcher::Config) -> Self
where
Child: crate::traits::KubeResource,
{
let existing = self.configure.take();
self.configure = Some(Box::new(move |ctl| {
let ctl = ctl.owns(api, config);
match existing {
Some(f) => f(ctl),
None => ctl,
}
}));
self
}
pub fn watch<Other, I, F>(
mut self,
api: kube::Api<Other>,
config: watcher::Config,
mapper: F,
) -> Self
where
Other: crate::traits::KubeResource,
I: IntoIterator<Item = kube_runtime::reflector::ObjectRef<CR>> + Send + 'static,
I::IntoIter: Send,
F: Fn(Other) -> I + Send + Sync + 'static,
{
let existing = self.configure.take();
self.configure = Some(Box::new(move |ctl| {
let ctl = ctl.watches(api, config, mapper);
match existing {
Some(f) => f(ctl),
None => ctl,
}
}));
self
}
pub fn with_watches<F>(mut self, configure: F) -> Self
where
F: FnOnce(kube_runtime::Controller<CR>) -> kube_runtime::Controller<CR> + Send + 'static,
{
let existing = self.configure.take();
self.configure = Some(Box::new(move |ctl| {
let ctl = configure(ctl);
match existing {
Some(f) => f(ctl),
None => ctl,
}
}));
self
}
pub async fn run<R>(self, reconciler: R, context: Arc<Context<T>>) -> Result<()>
where
R: Reconciler<CR, T>,
{
let kind = CR::kind(&());
let client = context.client.clone();
info!(%kind, "Starting controller");
let ready = Arc::new(AtomicBool::new(false));
if let Some(port) = self.health_port {
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
info!(port, "Health server listening");
tokio::spawn(serve_health(listener, ready.clone()));
}
let metrics = if let Some(port) = self.metrics_port {
let registry = prometheus::Registry::new();
let metrics = Arc::new(Metrics::new_registered(®istry)?);
let listener = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
info!(port, "Metrics server listening");
tokio::spawn(serve_metrics(listener, registry));
Some(metrics)
} else {
None
};
let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false);
let has_stop = self.graceful_shutdown || self.leader_election.is_some();
if self.graceful_shutdown {
let tx = stop_tx.clone();
let kind_owned = kind.to_string();
tokio::spawn(async move {
shutdown_signal().await;
info!(kind = %kind_owned, "Shutdown signal received");
tx.send(true).ok();
});
}
if let Some(le) = self.leader_election {
acquire_leader_lease(&le, &client).await?;
let tx = stop_tx.clone();
let client_le = client.clone();
tokio::spawn(async move {
renew_leader_lease_loop(le, client_le, tx).await;
});
}
let mut ctl = kube_runtime::Controller::new(self.api, self.watcher_config);
if let Some(n) = self.concurrency {
ctl = ctl.with_config(kube_runtime::controller::Config::default().concurrency(n));
}
if let Some(configure) = self.configure {
ctl = configure(ctl);
}
let reconciler = Arc::new(reconciler);
let error_policy_r = reconciler.clone();
let ready_ref = ready.clone();
let reconcile_timeout = self.reconcile_timeout;
let kind_owned = kind.to_string();
let run_loop = ctl
.run(
move |cr, ctx| {
let r = reconciler.clone();
let metrics = metrics.clone();
let kind = kind_owned.clone();
async move {
let started = std::time::Instant::now();
let result = if let Some(t) = reconcile_timeout {
match tokio::time::timeout(t, r.reconcile(cr, ctx)).await {
Ok(result) => result,
Err(_) => {
warn!("Reconcile timed out after {t:?}, requeueing");
Ok(Action::requeue(t))
}
}
} else {
r.reconcile(cr, ctx).await
};
if let Some(m) = &metrics {
match &result {
Ok(_) => m.record_success(&kind, started.elapsed()),
Err(e) => {
m.record_failure(&kind, &e.to_string(), started.elapsed())
}
}
}
result
}
},
move |cr, err, ctx| error_policy_r.error_policy(cr, err, ctx),
context,
)
.for_each(move |result| {
ready_ref.store(true, Ordering::Release);
async move {
match result {
Ok((obj, _)) => info!(name = %obj.name, "Reconcile succeeded"),
Err(e) => warn!(error = %e, "Reconcile failed"),
}
}
});
if has_stop {
tokio::select! {
_ = run_loop => {},
_ = async move { stop_rx.wait_for(|v| *v).await.ok(); } => {},
}
} else {
run_loop.await;
}
Ok(())
}
}