use sqlx::{query, query_as, Postgres};
use uuid::Uuid;
use crate::DbStore;
#[derive(Debug, Clone)]
pub struct MaintenanceOptions {
pub worker_registry: WorkerRegistryMaintenanceOptions,
pub task: TaskMaintenanceOptions,
pub schedule: ScheduleMaintenanceOptions,
}
#[derive(Debug, Clone)]
pub struct WorkerRegistryMaintenanceOptions {
pub worker_inactive_after: time::Duration,
pub remove_inactive_workers_after: time::Duration,
pub fail_inactive_worker_tasks: bool,
}
impl Default for WorkerRegistryMaintenanceOptions {
fn default() -> Self {
Self {
worker_inactive_after: time::Duration::minutes(5),
remove_inactive_workers_after: time::Duration::minutes(60),
fail_inactive_worker_tasks: true,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct TaskMaintenanceOptions {
pub remove_inactive_tasks_after: Option<time::Duration>,
}
#[derive(Debug, Default, Clone)]
pub struct ScheduleMaintenanceOptions {
pub remove_inactive_schedules_after: Option<time::Duration>,
}
impl DbStore<Postgres> {
#[tracing::instrument(level = "debug", skip_all)]
pub async fn run_all_maintenance(&self, options: &MaintenanceOptions) -> eyre::Result<()> {
self.run_worker_registry_maintenance(&options.worker_registry)
.await?;
self.run_task_maintenance(&options.task).await?;
self.run_schedule_maintenance(&options.schedule).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn run_task_maintenance(&self, options: &TaskMaintenanceOptions) -> eyre::Result<()> {
if let Some(remove_after) = options.remove_inactive_tasks_after {
query(
r#"--sql
DELETE FROM "ora"."task"
WHERE
NOT "active"
AND "updated" < NOW() - $1::INTERVAL
"#,
)
.bind(remove_after)
.execute(&self.db)
.await?;
}
Ok(())
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn run_schedule_maintenance(
&self,
options: &ScheduleMaintenanceOptions,
) -> eyre::Result<()> {
if let Some(remove_after) = options.remove_inactive_schedules_after {
query(
r#"--sql
DELETE FROM "ora"."schedule"
WHERE
NOT "active"
AND "updated" < NOW() - $1::INTERVAL
AND NOT EXISTS (
SELECT 1
FROM "ora"."task"
WHERE "schedule_id" = "ora"."schedule"."id"
)
"#,
)
.bind(remove_after)
.execute(&self.db)
.await?;
}
Ok(())
}
pub async fn run_worker_registry_maintenance(
&self,
options: &WorkerRegistryMaintenanceOptions,
) -> Result<(), sqlx::Error> {
let worker_id_rows: Vec<(Uuid,)> = query_as(
r#"--sql
UPDATE "ora"."worker"
SET
"active" = FALSE
WHERE
"active" = TRUE
AND "last_seen" < NOW() - $1::INTERVAL
RETURNING "id"
"#,
)
.bind(options.worker_inactive_after)
.fetch_all(&self.db)
.await?;
if options.fail_inactive_worker_tasks {
let worker_ids = worker_id_rows
.into_iter()
.map(|(id,)| id)
.collect::<Vec<_>>();
query(
r#"--sql
UPDATE "ora"."task"
SET
"status" = 'failed',
"failure_reason" = 'worker has became inactive',
"failed_at" = NOW()
WHERE "worker_id" = ANY($1) AND "active"
RETURNING pg_notify('ora_task_failed', "id"::TEXT) AS "notified";
"#,
)
.bind(worker_ids)
.execute(&self.db)
.await?;
}
query(
r#"--sql
DELETE FROM "ora"."worker"
WHERE
"active" = FALSE
AND "last_seen" < NOW() - $1::INTERVAL
"#,
)
.bind(options.remove_inactive_workers_after)
.execute(&self.db)
.await?;
Ok(())
}
}