use std::collections::HashSet;
use chrono::{DateTime, Utc};
use forge_jobs::{
JobRecord, PodRecord, ProcessRecord, QueueConfigRow, QueueCounts, SlotAssignment,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct QueueOverviewDto {
pub name: String,
pub paused: bool,
pub max_workers: i32,
pub counts: StatusCountsDto,
pub processes: Vec<QueueProcessDto>,
pub retain_done_days: u32,
pub retain_dead_days: u32,
pub backoff_enabled: bool,
pub backoff_base_seconds: u32,
pub backoff_max_seconds: u32,
pub throttled_until: Option<DateTime<Utc>>,
pub oldest_pending_age_seconds: u64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[non_exhaustive]
pub struct StatusCountsDto {
pub pending: u64,
pub scheduled: u64,
pub in_progress: u64,
pub done: u64,
pub failed: u64,
pub dead: u64,
}
impl From<QueueCounts> for StatusCountsDto {
fn from(c: QueueCounts) -> Self {
Self {
pending: c.pending,
scheduled: c.scheduled,
in_progress: c.in_progress,
done: c.done,
failed: c.failed,
dead: c.dead,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct QueueProcessDto {
pub process_id: String,
pub queue_name: String,
pub host_id: String,
pub started_at: DateTime<Utc>,
pub heartbeat_at: DateTime<Utc>,
pub current_job_id: Option<String>,
}
impl From<ProcessRecord> for QueueProcessDto {
fn from(p: ProcessRecord) -> Self {
Self {
process_id: p.process_id,
queue_name: p.queue_name,
host_id: p.host_id,
started_at: p.started_at,
heartbeat_at: p.heartbeat_at,
current_job_id: p.current_job.map(|id| id.as_str().to_owned()),
}
}
}
#[must_use]
pub fn overview_dto(
cfg: QueueConfigRow,
counts: QueueCounts,
processes: Vec<ProcessRecord>,
oldest_pending_age_seconds: u64,
) -> QueueOverviewDto {
QueueOverviewDto {
name: cfg.name,
paused: cfg.paused,
max_workers: cfg.max_workers,
counts: counts.into(),
processes: processes.into_iter().map(Into::into).collect(),
retain_done_days: u32::try_from(cfg.retain_done_for_days).unwrap_or(7),
retain_dead_days: u32::try_from(cfg.retain_dead_for_days).unwrap_or(30),
backoff_enabled: cfg.backoff_enabled,
backoff_base_seconds: u32::try_from(cfg.backoff_base_seconds).unwrap_or(60),
backoff_max_seconds: u32::try_from(cfg.backoff_max_seconds).unwrap_or(1800),
throttled_until: cfg.throttled_until,
oldest_pending_age_seconds,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct WorkerDto {
pub host_id: String,
pub worker_name: Option<String>,
pub queues: Vec<String>,
pub slots: Vec<WorkerSlotDto>,
pub workers_live: u32,
pub in_flight: u32,
pub heartbeat_at: DateTime<Utc>,
pub heartbeat_age_seconds: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct WorkerSlotDto {
pub queue_name: String,
pub slots: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct WorkersOverviewDto {
pub workers: Vec<WorkerDto>,
pub unassigned_queues: Vec<String>,
}
#[must_use]
pub fn workers_overview_dto(
pods: Vec<PodRecord>,
processes: &[ProcessRecord],
slots: &[SlotAssignment],
queue_names: &[String],
now: DateTime<Utc>,
stale_before: DateTime<Utc>,
) -> WorkersOverviewDto {
let workers = pods
.into_iter()
.map(|pod| {
let host_procs = processes
.iter()
.filter(|p| p.host_id == pod.host_id && p.heartbeat_at >= stale_before);
let workers_live = host_procs.clone().count();
let in_flight = host_procs.filter(|p| p.current_job.is_some()).count();
let slots = slots
.iter()
.filter(|s| s.host_id == pod.host_id && s.slots > 0)
.map(|s| WorkerSlotDto {
queue_name: s.queue_name.clone(),
slots: s.slots,
})
.collect();
let heartbeat_age_seconds =
u64::try_from((now - pod.heartbeat_at).num_seconds().max(0)).unwrap_or(0);
WorkerDto {
host_id: pod.host_id,
worker_name: pod.worker_name,
queues: pod.queues,
slots,
workers_live: u32::try_from(workers_live).unwrap_or(u32::MAX),
in_flight: u32::try_from(in_flight).unwrap_or(u32::MAX),
heartbeat_at: pod.heartbeat_at,
heartbeat_age_seconds,
}
})
.collect::<Vec<_>>();
let served: HashSet<&str> = workers
.iter()
.flat_map(|w| w.slots.iter().map(|s| s.queue_name.as_str()))
.collect();
let unassigned_queues = queue_names
.iter()
.filter(|name| !served.contains(name.as_str()))
.cloned()
.collect();
WorkersOverviewDto {
workers,
unassigned_queues,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct StorageInfoDto {
pub backend: String,
pub fields: Vec<(String, String)>,
}
impl From<forge_jobs::StorageInfo> for StorageInfoDto {
fn from(info: forge_jobs::StorageInfo) -> Self {
Self {
backend: info.backend,
fields: info.fields,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct JobRowDto {
pub id: String,
pub queue_name: String,
pub kind: String,
pub status: String,
pub priority: i32,
pub attempts: i32,
pub max_attempts: i32,
pub enqueued_at: DateTime<Utc>,
pub scheduled_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub last_error: Option<String>,
pub process_id: Option<String>,
pub dedupe_key: Option<String>,
pub heartbeat_at: Option<DateTime<Utc>>,
}
impl From<&JobRecord> for JobRowDto {
fn from(row: &JobRecord) -> Self {
Self {
id: row.id.as_str().to_owned(),
queue_name: row.queue_name.clone(),
kind: row.kind.clone(),
status: row.status.as_str().to_owned(),
priority: row.priority,
attempts: row.attempts,
max_attempts: row.max_attempts,
enqueued_at: row.enqueued_at,
scheduled_at: row.scheduled_at,
started_at: row.started_at,
completed_at: row.completed_at,
last_error: row.last_error.clone(),
process_id: row.process_id.clone(),
dedupe_key: row.dedupe_key.clone(),
heartbeat_at: row.heartbeat_at,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct JobInspectDto {
pub row: JobRowDto,
pub payload: serde_json::Value,
pub error_history: Vec<serde_json::Value>,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct JobsFilterDto {
#[serde(default)]
pub queues: Vec<String>,
#[serde(default)]
pub kinds: Vec<String>,
#[serde(default)]
pub statuses: Vec<String>,
#[serde(default)]
pub from: Option<DateTime<Utc>>,
#[serde(default)]
pub to: Option<DateTime<Utc>>,
#[serde(default)]
pub payload_search: Option<String>,
}
#[derive(Debug, Default, Deserialize)]
pub struct JobsListArgs {
#[serde(default)]
pub filter: JobsFilterDto,
pub limit: u32,
pub offset: u32,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct JobsPageDto {
pub rows: Vec<JobRowDto>,
pub total: u64,
pub limit: u32,
pub offset: u32,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct TimelineBucket {
pub at: DateTime<Utc>,
pub enqueued: u64,
pub started: u64,
pub retried: u64,
pub completed: u64,
pub failed: u64,
pub processing_p50_ms: u64,
pub processing_p95_ms: u64,
pub processing_p99_ms: u64,
pub total_p50_ms: u64,
pub total_p95_ms: u64,
pub total_p99_ms: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct MetricSeriesBucket {
pub at: DateTime<Utc>,
pub enqueued: u64,
pub completed: u64,
pub failed: u64,
pub proc_p50_ms: u64,
pub proc_p95_ms: u64,
pub proc_p99_ms: u64,
pub total_p50_ms: u64,
pub total_p95_ms: u64,
pub total_p99_ms: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ResourceBucket {
pub at: DateTime<Utc>,
pub cpu_pct: f64,
pub rss_bytes: u64,
pub disk_read_bytes: u64,
pub disk_write_bytes: u64,
pub disk_used_pct: f64,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ResourceHostSeries {
pub host: String,
pub name: Option<String>,
pub buckets: Vec<ResourceBucket>,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct DbHealthBucket {
pub at: DateTime<Utc>,
pub read_p50_ms: u64,
pub read_p95_ms: u64,
pub read_p99_ms: u64,
pub reads_per_min: u64,
pub write_p50_ms: u64,
pub write_p95_ms: u64,
pub write_p99_ms: u64,
pub writes_per_min: u64,
pub pool_active: u64,
pub pool_idle: u64,
pub pool_max: u64,
pub pool_used_pct: f64,
pub db_size_bytes: u64,
pub wal_bytes: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct DbHealthHostSeries {
pub host: String,
pub name: Option<String>,
pub buckets: Vec<DbHealthBucket>,
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct CronScheduleDto {
pub name: String,
pub kind: String,
pub payload: serde_json::Value,
pub queue_name: Option<String>,
pub cron_expr: String,
pub enabled: bool,
pub max_attempts: Option<i32>,
#[serde(default)]
pub dedupe_key: Option<String>,
pub last_fired_at: Option<DateTime<Utc>>,
pub next_fire_at: Option<DateTime<Utc>>,
pub last_error: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl From<forge_jobs::CronScheduleRecord> for CronScheduleDto {
fn from(r: forge_jobs::CronScheduleRecord) -> Self {
Self {
name: r.name,
kind: r.kind,
payload: r.payload,
queue_name: r.queue_name,
cron_expr: r.cron_expr,
enabled: r.enabled,
max_attempts: r.max_attempts,
dedupe_key: r.dedupe_key,
last_fired_at: r.last_fired_at,
next_fire_at: r.next_fire_at,
last_error: r.last_error,
created_at: r.created_at,
updated_at: r.updated_at,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct CleanupReportDto {
pub done_deleted: u64,
pub dead_deleted: u64,
}
pub const JOB_STATUSES: &[&str] = &["pending", "in_progress", "done", "failed", "dead"];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetBackoffRequest {
pub enabled: bool,
pub base_seconds: i32,
pub max_seconds: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetMaxWorkersRequest {
pub n: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetPausedRequest {
pub paused: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SetRetentionRequest {
pub done_days: i32,
pub dead_days: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IdsRequest {
pub ids: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusRequest {
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteByStatusRequest {
pub status: String,
#[serde(default)]
pub queue_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteDoneOlderThanRequest {
pub days: u32,
#[serde(default)]
pub queue_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronSetEnabledRequest {
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronSetExprRequest {
pub expr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronSetDedupeRequest {
pub dedupe: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobsEnqueueRequest {
pub kind: String,
pub payload: serde_json::Value,
#[serde(default)]
pub queue_name: Option<String>,
#[serde(default)]
pub dedupe_key: Option<String>,
#[serde(default)]
pub run_at: Option<DateTime<Utc>>,
#[serde(default)]
pub priority: Option<i32>,
#[serde(default)]
pub max_attempts: Option<i32>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EnqueueDemoRequest {
#[serde(default)]
pub payload: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ProcessesQuery {
#[serde(default)]
pub queue_name: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct KindsQuery {
#[serde(default)]
pub queue_name: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct FailedQuery {
pub limit: u32,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ScheduledQuery {
#[serde(default)]
pub queue_name: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct TimelineQuery {
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
pub bucket_secs: u32,
}
#[derive(Debug, Clone, Deserialize)]
pub struct MetricSeriesQuery {
pub queue: String,
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
pub bucket_secs: u32,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SeriesQuery {
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
pub bucket_secs: u32,
}
#[cfg(test)]
mod tests {
use super::*;
use forge_jobs::JobId;
fn pod(host: &str, queues: &[&str], hb: DateTime<Utc>) -> PodRecord {
PodRecord {
host_id: host.to_owned(),
worker_name: None,
queues: queues.iter().map(|q| (*q).to_owned()).collect(),
heartbeat_at: hb,
}
}
fn proc(host: &str, queue: &str, hb: DateTime<Utc>, running: bool) -> ProcessRecord {
ProcessRecord {
process_id: format!("{queue}-0-{host}"),
queue_name: queue.to_owned(),
host_id: host.to_owned(),
started_at: hb,
heartbeat_at: hb,
current_job: running.then(|| JobId::new("01HXXXXXXXXXXXXXXXXXXXXXXXX")),
}
}
fn slot(host: &str, queue: &str, slots: i32) -> SlotAssignment {
SlotAssignment {
queue_name: queue.to_owned(),
host_id: host.to_owned(),
slots,
}
}
#[test]
fn stale_worker_rows_are_excluded_from_counts() {
let now = Utc::now();
let stale_before = now - chrono::Duration::seconds(60);
let pods = vec![pod("h1", &["gh"], now)];
let procs = vec![
proc("h1", "gh", now, true), proc("h1", "gh", now - chrono::Duration::seconds(120), true), ];
let slots = vec![slot("h1", "gh", 2)];
let queue_names = vec!["gh".to_owned()];
let dto = workers_overview_dto(pods, &procs, &slots, &queue_names, now, stale_before);
assert_eq!(dto.workers.len(), 1);
assert_eq!(dto.workers[0].workers_live, 1, "only the fresh slot counts");
assert_eq!(
dto.workers[0].in_flight, 1,
"stale slot's job not counted in-flight"
);
}
#[test]
fn unassigned_covers_declared_but_unserved_queues() {
let now = Utc::now();
let stale_before = now - chrono::Duration::seconds(60);
let pods = vec![pod("h1", &["gh", "idle"], now)];
let procs = vec![proc("h1", "gh", now, false)];
let slots = vec![slot("h1", "gh", 3)]; let queue_names = vec!["gh".to_owned(), "idle".to_owned(), "paused".to_owned()];
let dto = workers_overview_dto(pods, &procs, &slots, &queue_names, now, stale_before);
assert!(!dto.unassigned_queues.contains(&"gh".to_owned()));
assert!(
dto.unassigned_queues.contains(&"idle".to_owned()),
"declared but 0 slots → unassigned"
);
assert!(
dto.unassigned_queues.contains(&"paused".to_owned()),
"undeclared → unassigned"
);
}
}