forge-jobs 0.2.0

Sidekiq-style job queue with embedded SQLite and pluggable Postgres. Per-queue workers + cron + cluster-wide rate-limit budget + cancellation that survives across replicas.
Documentation
//! Cluster worker rebalancing.
//!
//! `queue.max_workers` is a *cluster total*. Without coordination, N
//! replicas would each run `max_workers` workers (3 pods × 10 = 30, not
//! the 10 the operator asked for). The rebalancer — run only by the
//! elected coordinator (it reuses the cron leadership lease) — splits
//! each queue's total fairly across live pods and writes each pod's
//! share to `pod_slot_assignment`. Every supervisor reads its own row
//! and scales local workers to match (10 over 3 pods → 4 / 3 / 3).
//!
//! Two background pieces live here:
//!  - [`pod_heartbeat_loop`]: every pod stamps its liveness so the
//!    rebalancer can see it even when it holds 0 slots.
//!  - [`rebalance_loop`]: the coordinator recomputes assignments.
//!
//! On `SQLite` (single process) the lone pod wins the lease and is
//! handed every queue's full total — same effective behavior as before.

use std::time::Duration;

use chrono::Utc;
use tokio_util::sync::CancellationToken;

use super::cron::CRON_LEASE_TTL;
use crate::storage::Storage;

/// How often the coordinator recomputes pod→slot assignments.
pub const REBALANCE_TICK: Duration = Duration::from_secs(5);

/// Split `total` slots across `pods` pods: each gets `total / pods`,
/// and the first `total % pods` (by the caller's sort order) get one
/// extra. `total = 10, pods = 3` → `[4, 3, 3]`. `pods = 0` → empty.
fn fair_shares(total: usize, pods: usize) -> Vec<usize> {
    if pods == 0 {
        return Vec::new();
    }
    let base = total / pods;
    let extra = total % pods;
    (0..pods)
        .map(|i| if i < extra { base + 1 } else { base })
        .collect()
}

/// Per-pod liveness heartbeat. Independent of workers so a pod assigned
/// 0 slots stays visible to the rebalancer and can be handed slots when
/// totals grow.
pub(super) async fn pod_heartbeat_loop(
    storage: Storage,
    host_id: String,
    shutdown: CancellationToken,
) {
    let mut tick = tokio::time::interval(super::SUPERVISOR_TICK);
    tick.tick().await;
    loop {
        tokio::select! {
            biased;
            () = shutdown.cancelled() => return,
            _ = tick.tick() => {
                if let Err(e) = storage.procs.pod_heartbeat(&host_id).await {
                    tracing::warn!(?e, %host_id, "rebalance: pod heartbeat failed");
                }
            }
        }
    }
}

/// Coordinator loop. Only the cron-lease holder rebalances, so exactly
/// one pod writes assignments per tick.
pub(super) async fn rebalance_loop(storage: Storage, host_id: String, shutdown: CancellationToken) {
    let mut tick = tokio::time::interval(REBALANCE_TICK);
    tick.tick().await;
    loop {
        tokio::select! {
            biased;
            () = shutdown.cancelled() => return,
            _ = tick.tick() => {
                match storage.cron.try_cron_lease(&host_id, CRON_LEASE_TTL).await {
                    Ok(true) => {}
                    Ok(false) => continue,
                    Err(e) => {
                        tracing::warn!(?e, %host_id, "rebalance: lease check failed");
                        continue;
                    }
                }
                if let Err(e) = rebalance_once(&storage).await {
                    tracing::warn!(?e, "rebalance: tick failed");
                }
            }
        }
    }
}

/// One rebalance pass: for every queue, split `max_workers` across the
/// live pods and persist each pod's share. Run by the coordinator loop;
/// exposed so tests and ops tooling can trigger a pass directly.
///
/// # Errors
///
/// Surfaces storage errors from listing pods/queues. Per-pod `set_slots`
/// failures are logged and skipped, not propagated.
pub async fn rebalance_once(storage: &Storage) -> crate::storage::error::Result<()> {
    let stale_before = Utc::now() - super::STALE_THRESHOLD;
    let pods = storage.procs.list_live_pods(stale_before).await?;
    if pods.is_empty() {
        return Ok(());
    }
    let queues = storage.config.list_queues().await?;
    for q in queues {
        let total = usize::try_from(q.max_workers).unwrap_or(0);
        let shares = fair_shares(total, pods.len());
        for (host, slots) in pods.iter().zip(shares) {
            let slots = i32::try_from(slots).unwrap_or(0);
            if let Err(e) = storage.procs.set_slots(&q.name, host, slots).await {
                tracing::warn!(?e, queue = %q.name, %host, "rebalance: set_slots failed");
            }
        }
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::fair_shares;

    #[test]
    fn fair_shares_distributes_remainder_to_leaders() {
        assert_eq!(fair_shares(10, 3), vec![4, 3, 3]);
        assert_eq!(fair_shares(9, 3), vec![3, 3, 3]);
        assert_eq!(fair_shares(2, 3), vec![1, 1, 0]);
        assert_eq!(fair_shares(0, 3), vec![0, 0, 0]);
        assert_eq!(fair_shares(7, 1), vec![7]);
    }

    #[test]
    fn fair_shares_zero_pods_is_empty() {
        assert!(fair_shares(10, 0).is_empty());
    }

    #[test]
    fn fair_shares_conserves_the_total() {
        for total in 0..50 {
            for pods in 1..8 {
                let sum: usize = fair_shares(total, pods).iter().sum();
                assert_eq!(sum, total, "total={total} pods={pods}");
            }
        }
    }
}