use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use crate::config::OrphanWatchdogConfig;
use crate::domain::repos::{MessageRepository, TurnRepository};
use crate::infra::leader::LeaderElector;
use crate::infra::workers::WorkerHandles;
use crate::infra::workers::orphan_watchdog::OrphanWatchdogDeps;
pub struct WorkerConfigs {
pub(crate) orphan_watchdog: OrphanWatchdogConfig,
}
pub const WORKER_STOP_TIMEOUT: Duration = Duration::from_secs(5);
pub async fn prepare_worker_runtime(
configs: &WorkerConfigs,
) -> anyhow::Result<Option<Arc<dyn LeaderElector>>> {
if leader_workers_enabled(configs) {
Ok(Some(create_leader_elector().await?))
} else {
Ok(None)
}
}
pub fn spawn_workers<TR, MR>(
configs: &WorkerConfigs,
parent_cancel: &CancellationToken,
leader_elector: Option<&Arc<dyn LeaderElector>>,
orphan_deps: Option<OrphanWatchdogDeps<TR, MR>>,
) -> anyhow::Result<(WorkerHandles, CancellationToken)>
where
TR: TurnRepository + 'static,
MR: MessageRepository + 'static,
{
let worker_cancel = parent_cancel.child_token();
let mut handles = WorkerHandles::new();
if configs.orphan_watchdog.enabled {
let elector = Arc::clone(
leader_elector
.ok_or_else(|| anyhow::anyhow!("leader elector required for orphan_watchdog"))?,
);
let deps = orphan_deps
.ok_or_else(|| anyhow::anyhow!("orphan watchdog deps required when enabled"))?;
let cancel = worker_cancel.child_token();
handles.spawn(
"orphan_watchdog",
cancel.clone(),
crate::infra::workers::orphan_watchdog::run(
elector,
configs.orphan_watchdog.clone(),
deps,
cancel,
),
);
}
Ok((handles, worker_cancel))
}
fn leader_workers_enabled(configs: &WorkerConfigs) -> bool {
configs.orphan_watchdog.enabled
}
#[allow(
clippy::unused_async,
reason = "async needed when k8s feature is enabled"
)]
async fn create_leader_elector() -> anyhow::Result<Arc<dyn LeaderElector>> {
#[cfg(feature = "k8s")]
{
use crate::infra::leader::k8s_lease::{K8sLeaseConfig, K8sLeaseElector};
use anyhow::Context;
let config = K8sLeaseConfig::from_env("mini-chat")
.context("k8s feature enabled: POD_NAMESPACE and POD_NAME are required")?;
let elector = K8sLeaseElector::from_default(config).await.context(
"k8s feature enabled: kube client init failed; Kubernetes runtime access is required",
)?;
info!("Using K8s Lease leader election");
Ok(Arc::new(elector))
}
#[cfg(not(feature = "k8s"))]
{
info!("Using noop leader election (single-process mode)");
Ok(crate::infra::leader::noop())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn disabled_configs() -> WorkerConfigs {
WorkerConfigs {
orphan_watchdog: OrphanWatchdogConfig {
enabled: false,
..Default::default()
},
}
}
#[tokio::test]
async fn all_workers_disabled_skips_leader_preflight() {
let configs = disabled_configs();
let elector = prepare_worker_runtime(&configs).await.unwrap();
assert!(elector.is_none());
let parent_cancel = CancellationToken::new();
let (handles, worker_cancel) = spawn_workers::<
crate::infra::db::repo::turn_repo::TurnRepository,
crate::infra::db::repo::message_repo::MessageRepository,
>(&configs, &parent_cancel, elector.as_ref(), None)
.unwrap();
assert_eq!(handles.len(), 0);
worker_cancel.cancel();
handles
.join_all(CancellationToken::new(), Duration::from_millis(10))
.await;
}
#[cfg(not(feature = "k8s"))]
#[tokio::test]
async fn leader_workers_preflight_with_noop_when_k8s_feature_is_disabled() {
let mut configs = disabled_configs();
configs.orphan_watchdog.enabled = true;
let elector = prepare_worker_runtime(&configs).await.unwrap();
assert!(elector.is_some());
}
#[cfg(feature = "k8s")]
#[tokio::test(flavor = "current_thread")]
async fn k8s_preflight_fails_without_required_env() {
temp_env::async_with_vars(
[("POD_NAMESPACE", None::<&str>), ("POD_NAME", None::<&str>)],
async {
let mut configs = disabled_configs();
configs.orphan_watchdog.enabled = true;
let err = prepare_worker_runtime(&configs).await.unwrap_err();
assert!(
err.to_string().contains("POD_NAMESPACE")
|| err.to_string().contains("POD_NAME")
);
},
)
.await;
}
}