use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::runtime::admin::AdminState;
use crate::runtime::server::RuntimeContext;
use crate::update::{self, ApplyMode, ApplyOptions, CheckResult, Severity};
const SETTLE_DELAY_SECS: u64 = 10;
const DEFAULT_CHECK_INTERVAL_SECS: u64 = 6 * 60 * 60;
const CRITICAL_RETRY_SECS: u64 = 60 * 60;
const DEFER_RECHECK_SECS: u64 = 5 * 60;
const DEFAULT_QUIET_WINDOW_SECS: u64 = 60;
const DEFAULT_MAX_DEFER_SECS: u64 = 24 * 60 * 60;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerOutcome {
Idle,
Deferred,
Failed,
}
#[derive(Debug, Clone)]
pub struct WorkerConfig {
pub registry_origin: String,
pub download_timeout_secs: u64,
pub check_interval_secs: u64,
pub quiet_window_secs: u64,
pub max_defer_secs: u64,
}
impl WorkerConfig {
pub fn from_env() -> Self {
fn env_secs(key: &str, default: u64) -> u64 {
std::env::var(key)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(default)
}
Self {
registry_origin: std::env::var("OPENLATCH_PROVIDER_NPM_REGISTRY")
.unwrap_or_else(|_| "https://registry.npmjs.org".into()),
download_timeout_secs: env_secs("OPENLATCH_PROVIDER_UPDATE_DOWNLOAD_TIMEOUT_SECS", 60),
check_interval_secs: env_secs(
"OPENLATCH_PROVIDER_UPDATE_CHECK_INTERVAL_SECS",
DEFAULT_CHECK_INTERVAL_SECS,
),
quiet_window_secs: env_secs(
"OPENLATCH_PROVIDER_UPDATE_QUIET_WINDOW_SECS",
DEFAULT_QUIET_WINDOW_SECS,
),
max_defer_secs: env_secs(
"OPENLATCH_PROVIDER_UPDATE_MAX_DEFER_SECS",
DEFAULT_MAX_DEFER_SECS,
),
}
}
}
pub fn should_disable_worker() -> bool {
if std::env::var("OPENLATCH_NO_AUTO_UPDATE").is_ok() {
return true;
}
if matches!(
std::env::var("OPENLATCH_PROVIDER_AUTO_UPDATE").as_deref(),
Ok("false") | Ok("0") | Ok("no")
) {
return true;
}
if is_ci_environment() {
return true;
}
matches!(
update::install_state::detect_install_method(),
update::install_state::InstallMethod::CargoInstall
)
}
fn is_ci_environment() -> bool {
[
"CI",
"GITHUB_ACTIONS",
"GITLAB_CI",
"CIRCLECI",
"JENKINS_URL",
]
.iter()
.any(|k| std::env::var(k).is_ok())
}
pub fn spawn(state: AdminState, config: WorkerConfig) -> tokio::task::JoinHandle<()> {
tokio::spawn(run_auto_update_worker(state, config))
}
pub async fn run_auto_update_worker(state: AdminState, config: WorkerConfig) {
tokio::time::sleep(Duration::from_secs(SETTLE_DELAY_SECS)).await;
let mut pending_since: Option<std::time::Instant> = None;
let mut last_severity: Severity = Severity::Normal;
loop {
let outcome = match worker_iteration(&state, &config, pending_since, last_severity).await {
Ok((outcome, severity)) => {
last_severity = severity;
outcome
}
Err(reason) => {
tracing::warn!(target: "update", error = %reason, "worker iteration error");
WorkerOutcome::Failed
}
};
let sleep_secs = match outcome {
WorkerOutcome::Idle => {
pending_since = None;
config.check_interval_secs
}
WorkerOutcome::Deferred => {
pending_since.get_or_insert_with(std::time::Instant::now);
DEFER_RECHECK_SECS
}
WorkerOutcome::Failed => {
if last_severity == Severity::Critical {
CRITICAL_RETRY_SECS
} else {
config.check_interval_secs
}
}
};
tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
}
}
async fn worker_iteration(
state: &AdminState,
config: &WorkerConfig,
pending_since: Option<std::time::Instant>,
_last_severity: Severity,
) -> Result<(WorkerOutcome, Severity), String> {
let current = env!("CARGO_PKG_VERSION").to_string();
let check = update::check(¤t, &config.registry_origin).await;
let (latest, severity, min_supported) = match check {
CheckResult::UpToDate { .. } => return Ok((WorkerOutcome::Idle, Severity::Normal)),
CheckResult::Failed { reason } => {
tracing::debug!(target: "update", reason = %reason, "manifest probe failed");
return Ok((WorkerOutcome::Failed, Severity::Normal));
}
CheckResult::Available {
latest,
severity,
min_supported,
..
} => (latest, severity, min_supported),
};
if let Some(ref min) = min_supported {
if !update::version_at_least(¤t, min) {
tracing::warn!(
target: "update",
current = %current,
latest = %latest,
min_supported = %min,
"auto-update blocked by min_supported_provider; manual `npm install -g @openlatch/provider@latest` required"
);
return Ok((WorkerOutcome::Idle, severity));
}
}
let pending_age = pending_since.map(|t| t.elapsed()).unwrap_or(Duration::ZERO);
let apply_now = update::should_apply_now(
severity,
&state.ctx.events_processed_atomic_proxy(),
&state.ctx.events_in_flight_atomic_proxy(),
pending_age,
config.quiet_window_secs,
config.max_defer_secs,
);
if !apply_now {
return Ok((WorkerOutcome::Deferred, severity));
}
if state
.ctx
.update_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
tracing::info!(
target: "update",
"auto-update worker yielding to in-flight manual apply"
);
return Ok((WorkerOutcome::Deferred, severity));
}
let opts = ApplyOptions {
current_version: current,
registry_origin: config.registry_origin.clone(),
download_timeout: Duration::from_secs(config.download_timeout_secs.max(1)),
force_cargo_install: false,
mode: ApplyMode::Rpc,
};
crate::runtime::admin::run_apply_in_daemon(state.clone(), opts, severity).await;
Ok((WorkerOutcome::Idle, severity))
}
impl RuntimeContext {
fn events_in_flight_atomic_proxy(&self) -> std::sync::atomic::AtomicU32 {
std::sync::atomic::AtomicU32::new(0)
}
fn events_processed_atomic_proxy(&self) -> std::sync::atomic::AtomicU64 {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let started = self.uptime_ms() / 1000;
let last = secs.saturating_sub(started);
std::sync::atomic::AtomicU64::new(last)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn worker_config_defaults_match_spec() {
let cfg = WorkerConfig::from_env();
assert_eq!(cfg.check_interval_secs, DEFAULT_CHECK_INTERVAL_SECS);
assert_eq!(cfg.quiet_window_secs, DEFAULT_QUIET_WINDOW_SECS);
assert_eq!(cfg.max_defer_secs, DEFAULT_MAX_DEFER_SECS);
assert_eq!(cfg.download_timeout_secs, 60);
}
#[test]
fn should_disable_worker_honours_kill_switch() {
std::env::set_var("OPENLATCH_NO_AUTO_UPDATE", "1");
assert!(should_disable_worker());
std::env::remove_var("OPENLATCH_NO_AUTO_UPDATE");
}
#[test]
fn should_disable_worker_honours_provider_auto_update_false() {
std::env::set_var("OPENLATCH_PROVIDER_AUTO_UPDATE", "false");
let _ = should_disable_worker();
std::env::remove_var("OPENLATCH_PROVIDER_AUTO_UPDATE");
}
}