use solti_model::{
AdmissionPolicy, BackoffPolicy, JitterPolicy, RestartPolicy, TaskKind, TaskSpec,
};
use taskvisor::{TaskError, TaskFn, TaskRef};
use tokio_util::sync::CancellationToken;
use tracing::debug;
use super::{StateConfig, TaskState};
pub const SWEEP_SLOT: &str = "solti-state-sweep";
const SWEEP_TIMEOUT_MS: u64 = 30_000;
const BACKOFF_FIRST_MS: u64 = 5_000;
const BACKOFF_MAX_MS: u64 = 60_000;
const BACKOFF_FACTOR: f64 = 2.0;
pub fn state_sweep(state: TaskState, config: StateConfig) -> (TaskRef, TaskSpec) {
let sweep_interval_ms = config.sweep_interval.as_millis() as u64;
let task: TaskRef = TaskFn::arc(SWEEP_SLOT, move |ctx: CancellationToken| {
let state = state.clone();
let config = config.clone();
async move {
if ctx.is_cancelled() {
return Err(TaskError::Canceled);
}
let (runs, tasks) = state.sweep(&config);
debug!(
runs_removed = runs,
tasks_removed = tasks,
"state sweep completed"
);
Ok(())
}
});
let backoff = BackoffPolicy {
jitter: JitterPolicy::Equal,
first_ms: BACKOFF_FIRST_MS,
max_ms: BACKOFF_MAX_MS,
factor: BACKOFF_FACTOR,
};
let spec = TaskSpec::builder(SWEEP_SLOT, TaskKind::Embedded, SWEEP_TIMEOUT_MS)
.restart(RestartPolicy::periodic(sweep_interval_ms))
.backoff(backoff)
.admission(AdmissionPolicy::Replace)
.build()
.expect("state sweep spec must be valid");
(task, spec)
}