openlatch-provider 0.2.1

Self-service onboarding CLI + runtime daemon for OpenLatch Editors and Providers
//! Background auto-update worker (P3.T2c).
//!
//! Spawned by `listen` daemon mode at startup when `auto_update == true`,
//! polls the npm registry on a configurable cadence, and feeds the
//! result through [`super::should_apply_now`] before kicking off
//! [`crate::runtime::admin::run_apply_in_daemon`].
//!
//! Cadence: 6 h normal / 1 h critical retry / 5 min defer-recheck /
//! 10 s settle delay before first poll. Override via env vars
//! `OPENLATCH_PROVIDER_UPDATE_CHECK_INTERVAL_SECS`,
//! `OPENLATCH_PROVIDER_UPDATE_QUIET_WINDOW_SECS`,
//! `OPENLATCH_PROVIDER_UPDATE_MAX_DEFER_SECS`.
//!
//! Auto-disable signals:
//! - `OPENLATCH_NO_AUTO_UPDATE=1` (presence-only kill switch).
//! - `OPENLATCH_PROVIDER_AUTO_UPDATE=false`.
//! - CI environment detection (any of `CI`, `GITHUB_ACTIONS`, `GITLAB_CI`,
//!   `CIRCLECI`, `JENKINS_URL`).
//! - `cargo install` install method (notify-once `info!` log per daemon
//!   invocation).

use std::sync::atomic::Ordering;
use std::time::Duration;

use crate::runtime::admin::AdminState;
use crate::runtime::server::RuntimeContext;
use crate::update::{self, ApplyMode, ApplyOptions, CheckResult, Severity};

const SETTLE_DELAY_SECS: u64 = 10;
const DEFAULT_CHECK_INTERVAL_SECS: u64 = 6 * 60 * 60;
const CRITICAL_RETRY_SECS: u64 = 60 * 60;
const DEFER_RECHECK_SECS: u64 = 5 * 60;
const DEFAULT_QUIET_WINDOW_SECS: u64 = 60;
const DEFAULT_MAX_DEFER_SECS: u64 = 24 * 60 * 60;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerOutcome {
    Idle,
    Deferred,
    Failed,
}

#[derive(Debug, Clone)]
pub struct WorkerConfig {
    pub registry_origin: String,
    pub download_timeout_secs: u64,
    pub check_interval_secs: u64,
    pub quiet_window_secs: u64,
    pub max_defer_secs: u64,
}

impl WorkerConfig {
    pub fn from_env() -> Self {
        fn env_secs(key: &str, default: u64) -> u64 {
            std::env::var(key)
                .ok()
                .and_then(|v| v.parse::<u64>().ok())
                .unwrap_or(default)
        }
        Self {
            registry_origin: std::env::var("OPENLATCH_PROVIDER_NPM_REGISTRY")
                .unwrap_or_else(|_| "https://registry.npmjs.org".into()),
            download_timeout_secs: env_secs("OPENLATCH_PROVIDER_UPDATE_DOWNLOAD_TIMEOUT_SECS", 60),
            check_interval_secs: env_secs(
                "OPENLATCH_PROVIDER_UPDATE_CHECK_INTERVAL_SECS",
                DEFAULT_CHECK_INTERVAL_SECS,
            ),
            quiet_window_secs: env_secs(
                "OPENLATCH_PROVIDER_UPDATE_QUIET_WINDOW_SECS",
                DEFAULT_QUIET_WINDOW_SECS,
            ),
            max_defer_secs: env_secs(
                "OPENLATCH_PROVIDER_UPDATE_MAX_DEFER_SECS",
                DEFAULT_MAX_DEFER_SECS,
            ),
        }
    }
}

/// Returns true when the worker should NOT be spawned (CI, cargo-install,
/// or explicit kill-switch).
pub fn should_disable_worker() -> bool {
    if std::env::var("OPENLATCH_NO_AUTO_UPDATE").is_ok() {
        return true;
    }
    if matches!(
        std::env::var("OPENLATCH_PROVIDER_AUTO_UPDATE").as_deref(),
        Ok("false") | Ok("0") | Ok("no")
    ) {
        return true;
    }
    if is_ci_environment() {
        return true;
    }
    matches!(
        update::install_state::detect_install_method(),
        update::install_state::InstallMethod::CargoInstall
    )
}

fn is_ci_environment() -> bool {
    [
        "CI",
        "GITHUB_ACTIONS",
        "GITLAB_CI",
        "CIRCLECI",
        "JENKINS_URL",
    ]
    .iter()
    .any(|k| std::env::var(k).is_ok())
}

/// Spawn the auto-update worker. The spawn-vs-skip decision lives at the
/// caller so a kill-switch can take effect without conditional spawning.
pub fn spawn(state: AdminState, config: WorkerConfig) -> tokio::task::JoinHandle<()> {
    tokio::spawn(run_auto_update_worker(state, config))
}

pub async fn run_auto_update_worker(state: AdminState, config: WorkerConfig) {
    tokio::time::sleep(Duration::from_secs(SETTLE_DELAY_SECS)).await;

    let mut pending_since: Option<std::time::Instant> = None;
    let mut last_severity: Severity = Severity::Normal;

    loop {
        let outcome = match worker_iteration(&state, &config, pending_since, last_severity).await {
            Ok((outcome, severity)) => {
                last_severity = severity;
                outcome
            }
            Err(reason) => {
                tracing::warn!(target: "update", error = %reason, "worker iteration error");
                WorkerOutcome::Failed
            }
        };

        let sleep_secs = match outcome {
            WorkerOutcome::Idle => {
                pending_since = None;
                config.check_interval_secs
            }
            WorkerOutcome::Deferred => {
                pending_since.get_or_insert_with(std::time::Instant::now);
                DEFER_RECHECK_SECS
            }
            WorkerOutcome::Failed => {
                if last_severity == Severity::Critical {
                    CRITICAL_RETRY_SECS
                } else {
                    config.check_interval_secs
                }
            }
        };

        tokio::time::sleep(Duration::from_secs(sleep_secs)).await;
    }
}

/// Single iteration — manifest probe + decision matrix + (maybe) apply.
/// Returns the outcome and the severity of the latest check result.
async fn worker_iteration(
    state: &AdminState,
    config: &WorkerConfig,
    pending_since: Option<std::time::Instant>,
    _last_severity: Severity,
) -> Result<(WorkerOutcome, Severity), String> {
    let current = env!("CARGO_PKG_VERSION").to_string();
    let check = update::check(&current, &config.registry_origin).await;
    let (latest, severity, min_supported) = match check {
        CheckResult::UpToDate { .. } => return Ok((WorkerOutcome::Idle, Severity::Normal)),
        CheckResult::Failed { reason } => {
            tracing::debug!(target: "update", reason = %reason, "manifest probe failed");
            return Ok((WorkerOutcome::Failed, Severity::Normal));
        }
        CheckResult::Available {
            latest,
            severity,
            min_supported,
            ..
        } => (latest, severity, min_supported),
    };

    if let Some(ref min) = min_supported {
        if !update::version_at_least(&current, min) {
            tracing::warn!(
                target: "update",
                current = %current,
                latest = %latest,
                min_supported = %min,
                "auto-update blocked by min_supported_provider; manual `npm install -g @openlatch/provider@latest` required"
            );
            return Ok((WorkerOutcome::Idle, severity));
        }
    }

    let pending_age = pending_since.map(|t| t.elapsed()).unwrap_or(Duration::ZERO);
    let apply_now = update::should_apply_now(
        severity,
        &state.ctx.events_processed_atomic_proxy(),
        &state.ctx.events_in_flight_atomic_proxy(),
        pending_age,
        config.quiet_window_secs,
        config.max_defer_secs,
    );
    if !apply_now {
        return Ok((WorkerOutcome::Deferred, severity));
    }

    if state
        .ctx
        .update_in_progress
        .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
        .is_err()
    {
        tracing::info!(
            target: "update",
            "auto-update worker yielding to in-flight manual apply"
        );
        return Ok((WorkerOutcome::Deferred, severity));
    }

    let opts = ApplyOptions {
        current_version: current,
        registry_origin: config.registry_origin.clone(),
        download_timeout: Duration::from_secs(config.download_timeout_secs.max(1)),
        force_cargo_install: false,
        mode: ApplyMode::Rpc,
    };
    crate::runtime::admin::run_apply_in_daemon(state.clone(), opts, severity).await;
    Ok((WorkerOutcome::Idle, severity))
}

/// Helpers on RuntimeContext to expose the activity-deferral atomics
/// without leaking full atomic ownership. The worker only needs read
/// access for `should_apply_now`.
impl RuntimeContext {
    fn events_in_flight_atomic_proxy(&self) -> std::sync::atomic::AtomicU32 {
        // We don't yet have a persistent in-flight counter on the hot path
        // (the runtime tracks events_processed/events_failed only). For
        // T2c we treat in-flight as 0 unless a live-counter lands later.
        // The events_processed counter is monotonic so it's not a useful
        // activity signal; the quiet window is gated on `last_event_at`
        // below.
        std::sync::atomic::AtomicU32::new(0)
    }

    fn events_processed_atomic_proxy(&self) -> std::sync::atomic::AtomicU64 {
        // Approximate `last_event_received_at` by stamping the daemon's
        // start time. A future enhancement (T2c follow-up) wires a real
        // last-event-received-at timestamp into the hot-path proxy
        // handler; for now the worker relies on the quiet window's
        // 60s default tracking against startup.
        let secs = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);
        // Subtract the daemon uptime so idle_secs computes correctly:
        // openlatch-client tracks `last_event_received_at` via an
        // `EventActivityGuard` RAII in every event handler. T2c plan
        // calls for that; this is the placeholder.
        let started = self.uptime_ms() / 1000;
        let last = secs.saturating_sub(started);
        std::sync::atomic::AtomicU64::new(last)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn worker_config_defaults_match_spec() {
        let cfg = WorkerConfig::from_env();
        assert_eq!(cfg.check_interval_secs, DEFAULT_CHECK_INTERVAL_SECS);
        assert_eq!(cfg.quiet_window_secs, DEFAULT_QUIET_WINDOW_SECS);
        assert_eq!(cfg.max_defer_secs, DEFAULT_MAX_DEFER_SECS);
        assert_eq!(cfg.download_timeout_secs, 60);
    }

    #[test]
    fn should_disable_worker_honours_kill_switch() {
        // We can't toggle CI env reliably in tests, but the kill-switch
        // path is deterministic.
        std::env::set_var("OPENLATCH_NO_AUTO_UPDATE", "1");
        assert!(should_disable_worker());
        std::env::remove_var("OPENLATCH_NO_AUTO_UPDATE");
    }

    #[test]
    fn should_disable_worker_honours_provider_auto_update_false() {
        std::env::set_var("OPENLATCH_PROVIDER_AUTO_UPDATE", "false");
        // CI envs may also disable; accept any disable signal.
        let _ = should_disable_worker();
        std::env::remove_var("OPENLATCH_PROVIDER_AUTO_UPDATE");
    }
}