use serde::{Deserialize, Serialize};
use crate::job_envelope::JobEnvelope;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stats {
pub global: StatsGlobal,
pub processes: Vec<Process>,
pub processing: Vec<StatsProcessing>,
pub queues: Vec<QueueStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatsGlobal {
pub jobs: usize,
pub enqueued: usize,
pub processed: i64,
pub failed: i64,
pub dead: usize,
pub scheduled: usize,
pub retries: usize,
pub latency_s_max: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatsProcessing {
pub process_id: String,
pub job_envelope: JobEnvelope,
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
pub struct QueueRateStats {
pub window_minutes: usize,
pub processed_per_minute: f64,
pub succeeded_per_minute: f64,
pub failed_per_minute: f64,
pub growth_per_minute: f64,
pub effective_drain_per_minute: f64,
pub eta_s: Option<f64>,
}
impl QueueRateStats {
pub(crate) fn calculate(
window_minutes: usize,
enqueued: usize,
window_start_enqueued: usize,
processed: u64,
succeeded: u64,
failed: u64,
) -> Self {
let window_minutes = window_minutes.max(1);
let window = window_minutes as f64;
let processed_per_minute = processed as f64 / window;
let succeeded_per_minute = succeeded as f64 / window;
let failed_per_minute = failed as f64 / window;
let growth_per_minute = (enqueued as f64 - window_start_enqueued as f64) / window;
let effective_drain_per_minute =
Self::effective_drain(processed_per_minute, growth_per_minute);
Self {
window_minutes,
processed_per_minute,
succeeded_per_minute,
failed_per_minute,
growth_per_minute,
effective_drain_per_minute,
eta_s: Self::eta(enqueued, effective_drain_per_minute),
}
}
pub(crate) fn aggregate(
window_minutes: usize,
enqueued: usize,
rates: impl IntoIterator<Item = Self>,
) -> Self {
let window_minutes = window_minutes.max(1);
let mut processed_per_minute = 0.0;
let mut succeeded_per_minute = 0.0;
let mut failed_per_minute = 0.0;
let mut growth_per_minute = 0.0;
for rate in rates {
processed_per_minute += rate.processed_per_minute;
succeeded_per_minute += rate.succeeded_per_minute;
failed_per_minute += rate.failed_per_minute;
growth_per_minute += rate.growth_per_minute;
}
let effective_drain_per_minute =
Self::effective_drain(processed_per_minute, growth_per_minute);
Self {
window_minutes,
processed_per_minute,
succeeded_per_minute,
failed_per_minute,
growth_per_minute,
effective_drain_per_minute,
eta_s: Self::eta(enqueued, effective_drain_per_minute),
}
}
fn eta(enqueued: usize, effective_drain_per_minute: f64) -> Option<f64> {
if enqueued == 0 {
Some(0.0)
} else if effective_drain_per_minute > 0.0 {
Some(enqueued as f64 / (effective_drain_per_minute / 60.0))
} else {
None
}
}
fn effective_drain(processed_per_minute: f64, growth_per_minute: f64) -> f64 {
if processed_per_minute > 0.0 {
(-growth_per_minute).max(0.0)
} else {
0.0
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueStats {
pub key: String,
pub enqueued: usize,
pub processed: i64,
pub succeeded: i64,
pub panicked: i64,
pub failed: i64,
pub latency_s: f64,
#[serde(default)]
pub rate: QueueRateStats,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub queues: Vec<DynamicQueueStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DynamicQueueStats {
pub suffix: String,
pub enqueued: usize,
pub processed: i64,
pub succeeded: i64,
pub panicked: i64,
pub failed: i64,
pub latency_s: f64,
#[serde(default)]
pub rate: QueueRateStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Process {
pub hostname: String,
pub pid: u32,
pub heartbeat_at: i64,
pub started_at: i64,
}
impl Process {
#[must_use]
pub fn id(&self) -> String {
format!("{}-{}", self.hostname, self.pid)
}
}
#[cfg(test)]
mod tests {
use super::QueueRateStats;
fn assert_close(actual: f64, expected: f64) {
assert!(
(actual - expected).abs() < 1e-9,
"expected {expected}, got {actual}"
);
}
#[test]
fn queue_rate_calculates_eta_when_effective_drain_is_positive() {
let rate = QueueRateStats::calculate(10, 10, 30, 50, 40, 10);
assert_close(rate.processed_per_minute, 5.0);
assert_close(rate.succeeded_per_minute, 4.0);
assert_close(rate.failed_per_minute, 1.0);
assert_close(rate.growth_per_minute, -2.0);
assert_close(rate.effective_drain_per_minute, 2.0);
assert_close(rate.eta_s.expect("eta should be finite"), 300.0);
}
#[test]
fn queue_rate_eta_is_unknown_when_queue_grows_despite_processing() {
let rate = QueueRateStats::calculate(10, 20, 0, 100, 100, 0);
assert_close(rate.processed_per_minute, 10.0);
assert_close(rate.growth_per_minute, 2.0);
assert_close(rate.effective_drain_per_minute, 0.0);
assert!(rate.eta_s.is_none());
}
#[test]
fn queue_rate_eta_is_zero_for_empty_queue() {
let rate = QueueRateStats::calculate(10, 0, 10, 0, 0, 0);
assert_close(rate.growth_per_minute, -1.0);
assert_eq!(rate.eta_s, Some(0.0));
}
#[test]
fn queue_rate_eta_is_unknown_without_processing_rate() {
let rate = QueueRateStats::calculate(10, 10, 20, 0, 0, 0);
assert_close(rate.growth_per_minute, -1.0);
assert_close(rate.effective_drain_per_minute, 0.0);
assert!(rate.eta_s.is_none());
}
#[test]
fn queue_rate_aggregates_dynamic_children() {
let first = QueueRateStats::calculate(10, 10, 20, 30, 25, 5);
let second = QueueRateStats::calculate(10, 20, 30, 40, 35, 5);
let rate = QueueRateStats::aggregate(10, 30, [first, second]);
assert_close(rate.processed_per_minute, 7.0);
assert_close(rate.succeeded_per_minute, 6.0);
assert_close(rate.failed_per_minute, 1.0);
assert_close(rate.growth_per_minute, -2.0);
assert_close(rate.effective_drain_per_minute, 2.0);
assert_close(rate.eta_s.expect("eta should be finite"), 900.0);
}
}