ainl-mission 0.1.0

Host-neutral mission engine: state machine, DAG, scheduler, stall, task ledger (zero armaraos-* deps)
Documentation
//! Stall detection: insufficient `PROGRESS_FOR` activity within a time window.

use thiserror::Error;

/// Thresholds for orchestrator takeover when workers stop reporting progress.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StallConfig {
    /// Minimum progress events required within the window (K).
    pub min_progress_events: u32,
    /// Window length in whole minutes (T).
    pub window_minutes: u64,
}

impl Default for StallConfig {
    fn default() -> Self {
        Self {
            min_progress_events: 1,
            window_minutes: 30,
        }
    }
}

impl StallConfig {
    pub fn window_secs(&self) -> i64 {
        (self.window_minutes as i64).saturating_mul(60)
    }
}

/// Host-provided progress counters (typically from `PROGRESS_FOR` edges / episode queries).
pub trait StallProgressSource {
    /// Unix seconds of the most recent progress event for the mission, if any.
    fn last_progress_at(&self, mission_id: &str) -> Result<Option<i64>, StallError>;

    /// Count of progress events since `since_ts` (inclusive).
    fn progress_count_since(&self, mission_id: &str, since_ts: i64) -> Result<u32, StallError>;
}

/// Stall query failure.
#[derive(Debug, Error)]
pub enum StallError {
    #[error("progress source: {0}")]
    Source(String),
}

/// Returns true when the mission appears stalled relative to `config` at `now` (unix seconds).
///
/// A mission is stalled when there have been fewer than `min_progress_events` in the last
/// `window_minutes`, **and** the last progress (if any) is older than the window.
pub fn is_stalled(
    source: &dyn StallProgressSource,
    mission_id: &str,
    config: &StallConfig,
    now: i64,
) -> Result<bool, StallError> {
    let since = now.saturating_sub(config.window_secs());
    let count = source.progress_count_since(mission_id, since)?;
    if count >= config.min_progress_events {
        return Ok(false);
    }
    let last = source.last_progress_at(mission_id)?;
    match last {
        None => Ok(true),
        Some(ts) => Ok(ts < since),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    struct MockSource {
        last: Option<i64>,
        count: u32,
    }

    impl StallProgressSource for MockSource {
        fn last_progress_at(&self, _mission_id: &str) -> Result<Option<i64>, StallError> {
            Ok(self.last)
        }

        fn progress_count_since(
            &self,
            _mission_id: &str,
            _since_ts: i64,
        ) -> Result<u32, StallError> {
            Ok(self.count)
        }
    }

    #[test]
    fn not_stalled_when_enough_progress() {
        let src = MockSource {
            last: Some(1000),
            count: 2,
        };
        let cfg = StallConfig {
            min_progress_events: 2,
            window_minutes: 10,
        };
        assert!(!is_stalled(&src, "m1", &cfg, 1200).unwrap());
    }

    #[test]
    fn stalled_when_no_progress_ever() {
        let src = MockSource {
            last: None,
            count: 0,
        };
        assert!(is_stalled(&src, "m1", &StallConfig::default(), 5000).unwrap());
    }

    #[test]
    fn stalled_when_last_progress_old() {
        let src = MockSource {
            last: Some(100),
            count: 0,
        };
        let cfg = StallConfig {
            min_progress_events: 1,
            window_minutes: 5,
        };
        assert!(is_stalled(&src, "m1", &cfg, 1000).unwrap());
    }
}