use std::time::Duration;
use actix_web::web::Data;
use athena_scheduler::{SchedulerLoopConfig, SchedulerTickResult, run_scheduler_loop};
use tracing::info;
use crate::AppState;
use crate::api::backup::{enqueue_due_backup_schedules_once, process_one_queued_backup_job};
pub fn spawn_backup_workers(state: Data<AppState>) {
if !state.backup_worker_enabled {
info!("Backup workers disabled");
return;
}
let execution_poll_ms = state.backup_execution_worker_poll_ms.max(250);
let schedule_poll_ms = state.backup_schedule_worker_poll_ms.max(1_000);
info!(
execution_poll_ms,
schedule_poll_ms,
max_attempts = state.backup_worker_max_attempts,
lease_ttl_minutes = state.backup_worker_lease_ttl_minutes,
"Backup workers started"
);
let execution_state = state.clone();
tokio::spawn(async move {
run_scheduler_loop(
SchedulerLoopConfig::new(
"backup_execution_worker",
Duration::from_millis(execution_poll_ms),
)
.busy_interval(Duration::from_millis(500))
.error_interval(Duration::from_millis(execution_poll_ms)),
|| async {
match process_one_queued_backup_job(execution_state.get_ref()).await {
Ok(true) => Ok(SchedulerTickResult::Progress),
Ok(false) => Ok(SchedulerTickResult::Idle),
Err(err) => Err(err),
}
},
)
.await
});
let schedule_state = state.clone();
tokio::spawn(async move {
run_scheduler_loop(
SchedulerLoopConfig::new(
"backup_schedule_worker",
Duration::from_millis(schedule_poll_ms),
)
.busy_interval(Duration::from_millis(1_000))
.error_interval(Duration::from_millis(schedule_poll_ms)),
|| async {
match enqueue_due_backup_schedules_once(schedule_state.get_ref()).await {
Ok(true) => Ok(SchedulerTickResult::Progress),
Ok(false) => Ok(SchedulerTickResult::Idle),
Err(err) => Err(err),
}
},
)
.await
});
}