athena-worker 3.18.0

Shared worker runtime primitives for Athena background jobs
Documentation
use std::time::Duration;

use tokio::time::sleep;
use tracing::warn;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerTickResult {
    Progress,
    Idle,
    Busy,
}

#[derive(Debug, Clone, Copy)]
pub struct WorkerLoopConfig {
    pub poll_interval: Duration,
    pub busy_interval: Duration,
    pub error_interval: Duration,
}

impl WorkerLoopConfig {
    pub fn normalized(self) -> Self {
        Self {
            poll_interval: self.poll_interval.max(Duration::from_millis(250)),
            busy_interval: self.busy_interval.max(Duration::from_millis(250)),
            error_interval: self.error_interval.max(Duration::from_millis(250)),
        }
    }
}

impl Default for WorkerLoopConfig {
    fn default() -> Self {
        Self {
            poll_interval: Duration::from_secs(5),
            busy_interval: Duration::from_secs(1),
            error_interval: Duration::from_secs(5),
        }
    }
}

pub async fn run_worker_loop<F, Fut, E>(
    worker_name: &'static str,
    config: WorkerLoopConfig,
    mut tick: F,
) -> !
where
    F: FnMut() -> Fut,
    Fut: std::future::Future<Output = Result<WorkerTickResult, E>>,
    E: std::fmt::Display,
{
    let config = config.normalized();
    loop {
        let sleep_for = match tick().await {
            Ok(WorkerTickResult::Progress) => Duration::from_millis(250),
            Ok(WorkerTickResult::Busy) => config.busy_interval,
            Ok(WorkerTickResult::Idle) => config.poll_interval,
            Err(err) => {
                warn!(worker = worker_name, error = %err, "worker_tick_failed");
                config.error_interval
            }
        };

        sleep(sleep_for).await;
    }
}

#[cfg(test)]
mod tests {
    use super::{WorkerLoopConfig, WorkerTickResult};
    use std::time::Duration;

    #[test]
    fn worker_loop_config_normalizes_small_values() {
        let cfg = WorkerLoopConfig {
            poll_interval: Duration::from_millis(0),
            busy_interval: Duration::from_millis(10),
            error_interval: Duration::from_millis(200),
        }
        .normalized();

        assert_eq!(cfg.poll_interval, Duration::from_millis(250));
        assert_eq!(cfg.busy_interval, Duration::from_millis(250));
        assert_eq!(cfg.error_interval, Duration::from_millis(250));
    }

    #[test]
    fn worker_tick_result_is_copy() {
        let result = WorkerTickResult::Progress;
        let copied = result;
        assert_eq!(copied, WorkerTickResult::Progress);
    }
}