athena_rs 3.18.0

Hyper performant polyglot Database driver
Documentation
use std::time::Duration;

use actix_web::web::Data;
use athena_scheduler::{SchedulerLoopConfig, SchedulerTickResult, run_scheduler_loop};
use tracing::info;

use crate::AppState;
use crate::api::backup::{enqueue_due_backup_schedules_once, process_one_queued_backup_job};

pub fn spawn_backup_workers(state: Data<AppState>) {
    if !state.backup_worker_enabled {
        info!("Backup workers disabled");
        return;
    }

    let execution_poll_ms = state.backup_execution_worker_poll_ms.max(250);
    let schedule_poll_ms = state.backup_schedule_worker_poll_ms.max(1_000);
    info!(
        execution_poll_ms,
        schedule_poll_ms,
        max_attempts = state.backup_worker_max_attempts,
        lease_ttl_minutes = state.backup_worker_lease_ttl_minutes,
        "Backup workers started"
    );

    let execution_state = state.clone();
    tokio::spawn(async move {
        run_scheduler_loop(
            SchedulerLoopConfig::new(
                "backup_execution_worker",
                Duration::from_millis(execution_poll_ms),
            )
            .busy_interval(Duration::from_millis(500))
            .error_interval(Duration::from_millis(execution_poll_ms)),
            || async {
                match process_one_queued_backup_job(execution_state.get_ref()).await {
                    Ok(true) => Ok(SchedulerTickResult::Progress),
                    Ok(false) => Ok(SchedulerTickResult::Idle),
                    Err(err) => Err(err),
                }
            },
        )
        .await
    });

    let schedule_state = state.clone();
    tokio::spawn(async move {
        run_scheduler_loop(
            SchedulerLoopConfig::new(
                "backup_schedule_worker",
                Duration::from_millis(schedule_poll_ms),
            )
            .busy_interval(Duration::from_millis(1_000))
            .error_interval(Duration::from_millis(schedule_poll_ms)),
            || async {
                match enqueue_due_backup_schedules_once(schedule_state.get_ref()).await {
                    Ok(true) => Ok(SchedulerTickResult::Progress),
                    Ok(false) => Ok(SchedulerTickResult::Idle),
                    Err(err) => Err(err),
                }
            },
        )
        .await
    });
}