graphile_worker_queries 0.1.1

Database query helpers for graphile_worker
Documentation
use std::time::Duration;

use chrono::{DateTime, Utc};
use graphile_worker_database::{DbExecutorArg, DbParams, Schema};
use indoc::formatdoc;

use crate::errors::Result;

use crate::rows::get_required;
use crate::schema_names::PrivateTable;

#[derive(Debug, Clone, serde::Serialize)]
pub struct ActiveWorkerRow {
    pub worker_id: String,
    pub last_heartbeat_at: DateTime<Utc>,
    pub started_at: DateTime<Utc>,
    pub metadata: Option<serde_json::Value>,
    pub is_stale: bool,
}

pub async fn list_active_workers(
    mut executor: impl DbExecutorArg,
    schema: impl Into<Schema>,
    sweep_threshold: Duration,
) -> Result<Vec<ActiveWorkerRow>> {
    let schema = schema.into();
    let workers = PrivateTable::Workers.qualified(&schema);
    let sql = formatdoc!(
        r#"
            SELECT
                workers.id AS worker_id,
                workers.last_heartbeat_at,
                workers.started_at,
                workers.metadata
            FROM {workers} AS workers
            ORDER BY workers.last_heartbeat_at DESC;
        "#
    );

    let rows = executor.fetch_all(&sql, DbParams::new()).await?;
    let now = Utc::now();

    rows.iter()
        .map(|row| {
            let last_heartbeat_at = get_required(row, "last_heartbeat_at")?;
            let is_stale = now
                .signed_duration_since(last_heartbeat_at)
                .to_std()
                .ok()
                .is_some_and(|elapsed| elapsed >= sweep_threshold);
            Ok(ActiveWorkerRow {
                worker_id: get_required(row, "worker_id")?,
                last_heartbeat_at,
                started_at: get_required(row, "started_at")?,
                metadata: get_required(row, "metadata")?,
                is_stale,
            })
        })
        .collect()
}