Skip to main content

oxana/
metrics.rs

1//! Sidekiq-style execution metrics for Oxana workers.
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6use crate::result_collector::{WorkerResult, WorkerResultKind};
7
8pub(crate) const METRICS_RETENTION_SECS: i64 = 8 * 60 * 60;
9pub(crate) const DEFAULT_METRIC_MINUTES: usize = 60;
10pub(crate) const MAX_METRIC_MINUTES: usize = 8 * 60;
11pub(crate) const QUEUE_RATE_WINDOW_MINUTES: usize = 10;
12pub(crate) const HISTOGRAM_BUCKET_COUNT: usize = 14;
13
14/// Maximum execution time represented by each histogram bucket.
15pub const HISTOGRAM_BUCKET_INTERVALS_MS: [u64; HISTOGRAM_BUCKET_COUNT] = [
16    25,
17    50,
18    100,
19    250,
20    500,
21    1000,
22    2500,
23    5000,
24    10000,
25    30000,
26    60000,
27    120000,
28    300000,
29    u64::MAX,
30];
31
32/// Display labels for [`HISTOGRAM_BUCKET_INTERVALS_MS`].
33pub const HISTOGRAM_BUCKET_LABELS: [&str; HISTOGRAM_BUCKET_COUNT] = [
34    "25ms", "50ms", "100ms", "250ms", "500ms", "1s", "2.5s", "5s", "10s", "30s", "60s", "120s",
35    "5min", "Slow",
36];
37
38pub(crate) const METRIC_PROCESSED_JOBS: &str = "p";
39pub(crate) const METRIC_FAILED_JOBS: &str = "f";
40pub(crate) const METRIC_PANICKED_JOBS: &str = "pn";
41pub(crate) const METRIC_SUCCESSFUL_EXECUTIONS: &str = "xs";
42pub(crate) const METRIC_FAILED_EXECUTIONS: &str = "xf";
43pub(crate) const METRIC_PANICKED_EXECUTIONS: &str = "xpn";
44pub(crate) const METRIC_EXECUTION_MS: &str = "ms";
45pub(crate) const QUEUE_METRIC_PROCESSED_JOBS: &str = "p";
46pub(crate) const QUEUE_METRIC_SUCCEEDED_JOBS: &str = "s";
47pub(crate) const QUEUE_METRIC_FAILED_JOBS: &str = "f";
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
50pub struct MetricIdentity {
51    pub worker: String,
52}
53
54impl MetricIdentity {
55    pub(crate) fn from_worker_result(result: &WorkerResult) -> Self {
56        Self {
57            worker: result.worker_name.clone(),
58        }
59    }
60
61    pub(crate) fn field_key(&self) -> String {
62        format!("{}:{}", self.worker.len(), self.worker)
63    }
64
65    pub(crate) fn from_field_key(key: &str) -> Option<Self> {
66        let (worker_len, rest) = key.split_once(':')?;
67        let worker_len = worker_len.parse::<usize>().ok()?;
68
69        if rest.len() < worker_len || !rest.is_char_boundary(worker_len) {
70            return None;
71        }
72
73        let (worker, _) = rest.split_at(worker_len);
74        Some(Self {
75            worker: worker.to_string(),
76        })
77    }
78
79    pub(crate) fn metric_field(&self, metric: &str) -> String {
80        format!("{}|{metric}", self.field_key())
81    }
82}
83
84#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
85pub struct JobMetricsQuery {
86    pub minutes: usize,
87}
88
89impl JobMetricsQuery {
90    #[must_use]
91    pub fn new(minutes: usize) -> Self {
92        Self { minutes }
93    }
94
95    #[must_use]
96    pub fn effective_minutes(&self) -> usize {
97        if self.minutes == 0 {
98            DEFAULT_METRIC_MINUTES
99        } else {
100            self.minutes.min(MAX_METRIC_MINUTES)
101        }
102    }
103}
104
105impl Default for JobMetricsQuery {
106    fn default() -> Self {
107        Self {
108            minutes: DEFAULT_METRIC_MINUTES,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Default, Serialize, Deserialize)]
114pub struct JobMetricsTotals {
115    /// Jobs completed, including successes, failures, and panics. Batch jobs count individually.
116    pub processed: u64,
117    /// Jobs completed successfully. Batch jobs count individually.
118    pub succeeded: u64,
119    /// Jobs that failed or panicked. Batch jobs count individually.
120    pub failed: u64,
121    /// Jobs that panicked. Batch jobs count individually.
122    pub panicked: u64,
123    /// Successful worker executions. Batch workers count once per batch.
124    pub successful_executions: u64,
125    /// Worker executions that failed or panicked. Batch workers count once per batch.
126    pub failed_executions: u64,
127    /// Panicked worker executions. Batch workers count once per batch.
128    pub panicked_executions: u64,
129    /// Total duration for successful worker executions. Batch duration counts once per batch.
130    pub execution_ms: u64,
131}
132
133impl JobMetricsTotals {
134    #[must_use]
135    pub fn average_execution_ms(&self) -> f64 {
136        if self.successful_executions == 0 {
137            0.0
138        } else {
139            self.execution_ms as f64 / self.successful_executions as f64
140        }
141    }
142
143    #[must_use]
144    pub fn execution_seconds(&self) -> f64 {
145        self.execution_ms as f64 / 1000.0
146    }
147
148    #[must_use]
149    pub fn failed_executions_without_panics(&self) -> u64 {
150        self.failed_executions
151            .saturating_sub(self.panicked_executions)
152    }
153
154    fn add_metric(&mut self, metric: &str, value: u64) {
155        match metric {
156            METRIC_PROCESSED_JOBS => self.processed = self.processed.saturating_add(value),
157            METRIC_FAILED_JOBS => self.failed = self.failed.saturating_add(value),
158            METRIC_PANICKED_JOBS => self.panicked = self.panicked.saturating_add(value),
159            METRIC_SUCCESSFUL_EXECUTIONS => {
160                self.successful_executions = self.successful_executions.saturating_add(value);
161            }
162            METRIC_FAILED_EXECUTIONS => {
163                self.failed_executions = self.failed_executions.saturating_add(value);
164            }
165            METRIC_PANICKED_EXECUTIONS => {
166                self.panicked_executions = self.panicked_executions.saturating_add(value);
167            }
168            METRIC_EXECUTION_MS => self.execution_ms = self.execution_ms.saturating_add(value),
169            _ => {}
170        }
171    }
172
173    fn finalize(&mut self) {
174        self.succeeded = self.processed.saturating_sub(self.failed);
175        if self.successful_executions + self.failed_executions + self.panicked_executions == 0 {
176            self.successful_executions = self.succeeded;
177            self.failed_executions = self.failed;
178            self.panicked_executions = self.panicked;
179        }
180    }
181}
182
183#[derive(Debug, Clone, Default, Serialize, Deserialize)]
184pub struct JobMetricsPoint {
185    /// Start timestamp for this minute bucket.
186    pub timestamp: i64,
187    /// Jobs completed, including successes, failures, and panics. Batch jobs count individually.
188    pub processed: u64,
189    /// Jobs completed successfully. Batch jobs count individually.
190    pub succeeded: u64,
191    /// Jobs that failed or panicked. Batch jobs count individually.
192    pub failed: u64,
193    /// Jobs that panicked. Batch jobs count individually.
194    pub panicked: u64,
195    /// Successful worker executions. Batch workers count once per batch.
196    pub successful_executions: u64,
197    /// Worker executions that failed or panicked. Batch workers count once per batch.
198    pub failed_executions: u64,
199    /// Panicked worker executions. Batch workers count once per batch.
200    pub panicked_executions: u64,
201    /// Total duration for successful worker executions. Batch duration counts once per batch.
202    pub execution_ms: u64,
203}
204
205impl JobMetricsPoint {
206    #[must_use]
207    pub fn average_execution_ms(&self) -> f64 {
208        if self.successful_executions == 0 {
209            0.0
210        } else {
211            self.execution_ms as f64 / self.successful_executions as f64
212        }
213    }
214
215    #[must_use]
216    pub fn failed_executions_without_panics(&self) -> u64 {
217        self.failed_executions
218            .saturating_sub(self.panicked_executions)
219    }
220
221    fn add_metric(&mut self, metric: &str, value: u64) {
222        match metric {
223            METRIC_PROCESSED_JOBS => self.processed = self.processed.saturating_add(value),
224            METRIC_FAILED_JOBS => self.failed = self.failed.saturating_add(value),
225            METRIC_PANICKED_JOBS => self.panicked = self.panicked.saturating_add(value),
226            METRIC_SUCCESSFUL_EXECUTIONS => {
227                self.successful_executions = self.successful_executions.saturating_add(value);
228            }
229            METRIC_FAILED_EXECUTIONS => {
230                self.failed_executions = self.failed_executions.saturating_add(value);
231            }
232            METRIC_PANICKED_EXECUTIONS => {
233                self.panicked_executions = self.panicked_executions.saturating_add(value);
234            }
235            METRIC_EXECUTION_MS => self.execution_ms = self.execution_ms.saturating_add(value),
236            _ => {}
237        }
238    }
239
240    fn finalize(&mut self) {
241        self.succeeded = self.processed.saturating_sub(self.failed);
242        if self.successful_executions + self.failed_executions + self.panicked_executions == 0 {
243            self.successful_executions = self.succeeded;
244            self.failed_executions = self.failed;
245            self.panicked_executions = self.panicked;
246        }
247    }
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct WorkerMetricsSummary {
252    /// Worker these metrics belong to.
253    pub identity: MetricIdentity,
254    /// Totals for this worker over the queried window.
255    pub totals: JobMetricsTotals,
256    /// Per-minute points for this worker over the queried window.
257    pub series: Vec<JobMetricsPoint>,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct JobMetricsSnapshot {
262    /// Inclusive start timestamp for the queried window.
263    pub starts_at: i64,
264    /// Inclusive end timestamp for the queried window.
265    pub ends_at: i64,
266    /// Number of minute buckets returned.
267    pub minutes: usize,
268    /// Totals across all workers in the queried window.
269    pub totals: JobMetricsTotals,
270    /// Per-minute totals across all workers in the queried window.
271    pub series: Vec<JobMetricsPoint>,
272    /// Per-worker summaries sorted by total execution time descending.
273    pub workers: Vec<WorkerMetricsSummary>,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct JobMetricsHistogramBucket {
278    pub label: String,
279    pub upper_bound_ms: Option<u64>,
280    pub count: u64,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
284pub struct JobMetricsDetail {
285    /// Worker these metrics belong to.
286    pub identity: MetricIdentity,
287    /// Inclusive start timestamp for the queried window.
288    pub starts_at: i64,
289    /// Inclusive end timestamp for the queried window.
290    pub ends_at: i64,
291    /// Number of minute buckets returned.
292    pub minutes: usize,
293    /// Totals for this worker in the queried window.
294    pub totals: JobMetricsTotals,
295    /// Per-minute points for this worker in the queried window.
296    pub series: Vec<JobMetricsPoint>,
297    /// Execution-time histogram for successful worker executions.
298    pub histogram: Vec<JobMetricsHistogramBucket>,
299}
300
301#[derive(Debug, Clone, Default, Serialize, Deserialize)]
302pub struct QueueLengthMetricsPoint {
303    /// Start timestamp for this minute bucket.
304    pub timestamp: i64,
305    /// Jobs enqueued in this queue when the sample was recorded.
306    pub enqueued: u64,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct QueueLengthMetricsSeries {
311    /// Queue name.
312    pub queue: String,
313    /// Per-minute queue length samples.
314    pub series: Vec<QueueLengthMetricsPoint>,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct QueueLengthMetricsSnapshot {
319    /// Inclusive start timestamp for the queried window.
320    pub starts_at: i64,
321    /// Inclusive end timestamp for the queried window.
322    pub ends_at: i64,
323    /// Number of minute buckets returned.
324    pub minutes: usize,
325    /// Per-queue length samples sorted by peak queue length descending.
326    pub queues: Vec<QueueLengthMetricsSeries>,
327}
328
329#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
330pub(crate) struct QueueCounterTotals {
331    pub(crate) processed: u64,
332    pub(crate) succeeded: u64,
333    pub(crate) failed: u64,
334}
335
336impl QueueCounterTotals {
337    fn add_metric(&mut self, metric: &str, value: u64) {
338        match metric {
339            QUEUE_METRIC_PROCESSED_JOBS => {
340                self.processed = self.processed.saturating_add(value);
341            }
342            QUEUE_METRIC_SUCCEEDED_JOBS => {
343                self.succeeded = self.succeeded.saturating_add(value);
344            }
345            QUEUE_METRIC_FAILED_JOBS => {
346                self.failed = self.failed.saturating_add(value);
347            }
348            _ => {}
349        }
350    }
351}
352
353#[derive(Clone, Default)]
354pub(crate) struct JobMetricsBuffer {
355    entries: HashMap<(i64, MetricIdentity), PendingJobMetrics>,
356    queue_entries: HashMap<(i64, String), QueueCounterTotals>,
357}
358
359impl JobMetricsBuffer {
360    pub(crate) fn is_empty(&self) -> bool {
361        self.entries.is_empty() && self.queue_entries.is_empty()
362    }
363
364    pub(crate) fn clear(&mut self) {
365        self.entries.clear();
366        self.queue_entries.clear();
367    }
368
369    pub(crate) fn record(&mut self, result: &WorkerResult) {
370        let minute = chrono::Utc::now().timestamp().div_euclid(60);
371        let identity = MetricIdentity::from_worker_result(result);
372        let metrics = self.entries.entry((minute, identity)).or_default();
373        let queue_metrics = self
374            .queue_entries
375            .entry((minute, result.queue.clone()))
376            .or_default();
377
378        metrics.processed = metrics.processed.saturating_add(result.job_count);
379        queue_metrics.processed = queue_metrics.processed.saturating_add(result.job_count);
380
381        match result.kind {
382            WorkerResultKind::Success => {
383                metrics.successful_executions = metrics.successful_executions.saturating_add(1);
384                metrics.execution_ms = metrics.execution_ms.saturating_add(result.execution_ms);
385                queue_metrics.succeeded = queue_metrics.succeeded.saturating_add(result.job_count);
386                let bucket = histogram_bucket_index(result.execution_ms);
387                if let Some(count) = metrics.histogram.get_mut(bucket) {
388                    *count = count.saturating_add(1);
389                }
390            }
391            WorkerResultKind::Panicked => {
392                metrics.failed = metrics.failed.saturating_add(result.job_count);
393                metrics.panicked = metrics.panicked.saturating_add(result.job_count);
394                metrics.failed_executions = metrics.failed_executions.saturating_add(1);
395                metrics.panicked_executions = metrics.panicked_executions.saturating_add(1);
396                queue_metrics.failed = queue_metrics.failed.saturating_add(result.job_count);
397            }
398            WorkerResultKind::Failed => {
399                metrics.failed = metrics.failed.saturating_add(result.job_count);
400                metrics.failed_executions = metrics.failed_executions.saturating_add(1);
401                queue_metrics.failed = queue_metrics.failed.saturating_add(result.job_count);
402            }
403        }
404    }
405
406    pub(crate) fn records(
407        &self,
408    ) -> impl Iterator<Item = (i64, &MetricIdentity, &PendingJobMetrics)> {
409        self.entries
410            .iter()
411            .map(|((minute, identity), metrics)| (*minute, identity, metrics))
412    }
413
414    pub(crate) fn queue_records(&self) -> impl Iterator<Item = (i64, &str, &QueueCounterTotals)> {
415        self.queue_entries
416            .iter()
417            .map(|((minute, queue), metrics)| (*minute, queue.as_str(), metrics))
418    }
419}
420
421#[derive(Clone, Default)]
422pub(crate) struct PendingJobMetrics {
423    pub(crate) processed: u64,
424    pub(crate) failed: u64,
425    pub(crate) panicked: u64,
426    pub(crate) successful_executions: u64,
427    pub(crate) failed_executions: u64,
428    pub(crate) panicked_executions: u64,
429    pub(crate) execution_ms: u64,
430    pub(crate) histogram: [u64; HISTOGRAM_BUCKET_COUNT],
431}
432
433#[derive(Default)]
434pub(crate) struct JobMetricsAggregation {
435    pub(crate) totals: JobMetricsTotals,
436    pub(crate) series: Vec<JobMetricsPoint>,
437    pub(crate) workers: Vec<WorkerMetricsSummary>,
438}
439
440struct WorkerMetricsSummaryBuilder {
441    totals: JobMetricsTotals,
442    series: Vec<JobMetricsPoint>,
443}
444
445#[must_use]
446pub(crate) fn metric_minutes(now_ts: i64, query: JobMetricsQuery) -> Vec<i64> {
447    let minutes = query.effective_minutes();
448    let end_minute = now_ts.div_euclid(60);
449    let start_minute = end_minute - i64::try_from(minutes).unwrap_or(i64::MAX) + 1;
450    (start_minute..=end_minute).collect()
451}
452
453#[must_use]
454pub(crate) fn aggregate_counter_hashes(
455    minutes: &[i64],
456    hashes: Vec<HashMap<String, i64>>,
457    filter: Option<&MetricIdentity>,
458) -> JobMetricsAggregation {
459    let mut aggregation = JobMetricsAggregation {
460        series: minutes
461            .iter()
462            .map(|minute| JobMetricsPoint {
463                timestamp: minute * 60,
464                ..JobMetricsPoint::default()
465            })
466            .collect(),
467        ..JobMetricsAggregation::default()
468    };
469    let worker_series_template: Vec<JobMetricsPoint> = minutes
470        .iter()
471        .map(|minute| JobMetricsPoint {
472            timestamp: minute * 60,
473            ..JobMetricsPoint::default()
474        })
475        .collect();
476    let mut workers: HashMap<MetricIdentity, WorkerMetricsSummaryBuilder> = HashMap::new();
477
478    for (idx, hash) in hashes.into_iter().enumerate() {
479        let Some(point) = aggregation.series.get_mut(idx) else {
480            continue;
481        };
482
483        for (field, raw_value) in hash {
484            let Some((identity, metric)) = split_metric_field(&field) else {
485                continue;
486            };
487            if filter.is_some_and(|expected| expected != &identity) {
488                continue;
489            }
490
491            let value = u64::try_from(raw_value).unwrap_or_default();
492            point.add_metric(metric, value);
493            aggregation.totals.add_metric(metric, value);
494            let worker_summary =
495                workers
496                    .entry(identity)
497                    .or_insert_with(|| WorkerMetricsSummaryBuilder {
498                        totals: JobMetricsTotals::default(),
499                        series: worker_series_template.clone(),
500                    });
501            worker_summary.totals.add_metric(metric, value);
502            if let Some(worker_point) = worker_summary.series.get_mut(idx) {
503                worker_point.add_metric(metric, value);
504            }
505        }
506
507        point.finalize();
508    }
509
510    aggregation.totals.finalize();
511    aggregation.workers = workers
512        .into_iter()
513        .map(|(identity, mut summary)| {
514            summary.totals.finalize();
515            for point in &mut summary.series {
516                point.finalize();
517            }
518            WorkerMetricsSummary {
519                identity,
520                totals: summary.totals,
521                series: summary.series,
522            }
523        })
524        .collect();
525    aggregation.workers.sort_by(|a, b| {
526        b.totals
527            .execution_ms
528            .cmp(&a.totals.execution_ms)
529            .then_with(|| b.totals.processed.cmp(&a.totals.processed))
530            .then_with(|| a.identity.worker.cmp(&b.identity.worker))
531    });
532
533    aggregation
534}
535
536#[must_use]
537pub(crate) fn queue_metric_field(queue: &str, metric: &str) -> String {
538    format!("{}:{queue}|{metric}", queue.len())
539}
540
541#[must_use]
542pub(crate) fn aggregate_queue_counter_hashes(
543    hashes: Vec<HashMap<String, i64>>,
544) -> HashMap<String, QueueCounterTotals> {
545    let mut queues = HashMap::new();
546
547    for hash in hashes {
548        for (field, raw_value) in hash {
549            let Some((queue, metric)) = split_queue_metric_field(&field) else {
550                continue;
551            };
552            let value = u64::try_from(raw_value).unwrap_or_default();
553            queues
554                .entry(queue)
555                .or_insert_with(QueueCounterTotals::default)
556                .add_metric(metric, value);
557        }
558    }
559
560    queues
561}
562
563#[must_use]
564pub(crate) fn queue_length_series_from_hashes(
565    minutes: &[i64],
566    hashes: Vec<HashMap<String, i64>>,
567) -> Vec<QueueLengthMetricsSeries> {
568    let series_template: Vec<QueueLengthMetricsPoint> = minutes
569        .iter()
570        .map(|minute| QueueLengthMetricsPoint {
571            timestamp: minute * 60,
572            enqueued: 0,
573        })
574        .collect();
575    let mut queues: HashMap<String, Vec<QueueLengthMetricsPoint>> = HashMap::new();
576
577    for (idx, hash) in hashes.into_iter().enumerate() {
578        for (queue, raw_value) in hash {
579            let value = u64::try_from(raw_value).unwrap_or_default();
580            let series = queues
581                .entry(queue)
582                .or_insert_with(|| series_template.clone());
583            if let Some(point) = series.get_mut(idx) {
584                point.enqueued = value;
585            }
586        }
587    }
588
589    let mut queues: Vec<QueueLengthMetricsSeries> = queues
590        .into_iter()
591        .map(|(queue, series)| QueueLengthMetricsSeries { queue, series })
592        .collect();
593
594    queues.sort_by(|a, b| {
595        let a_peak = a
596            .series
597            .iter()
598            .map(|point| point.enqueued)
599            .max()
600            .unwrap_or_default();
601        let b_peak = b
602            .series
603            .iter()
604            .map(|point| point.enqueued)
605            .max()
606            .unwrap_or_default();
607
608        b_peak.cmp(&a_peak).then_with(|| a.queue.cmp(&b.queue))
609    });
610
611    queues
612}
613
614#[must_use]
615pub(crate) fn histogram_bucket_index(duration_ms: u64) -> usize {
616    HISTOGRAM_BUCKET_INTERVALS_MS
617        .iter()
618        .position(|upper| duration_ms < *upper)
619        .unwrap_or(HISTOGRAM_BUCKET_COUNT - 1)
620}
621
622#[must_use]
623pub(crate) fn histogram_bitfield_increment_args(
624    buckets: &[u64; HISTOGRAM_BUCKET_COUNT],
625) -> Vec<String> {
626    let mut args = vec!["OVERFLOW".to_string(), "SAT".to_string()];
627    for (idx, value) in buckets.iter().enumerate() {
628        if *value == 0 {
629            continue;
630        }
631        args.push("INCRBY".to_string());
632        args.push("u16".to_string());
633        args.push(format!("#{idx}"));
634        args.push(value.to_string());
635    }
636    args
637}
638
639#[must_use]
640pub(crate) fn histogram_bitfield_fetch_args() -> Vec<String> {
641    let mut args = Vec::with_capacity(HISTOGRAM_BUCKET_COUNT * 3);
642    for idx in 0..HISTOGRAM_BUCKET_COUNT {
643        args.push("GET".to_string());
644        args.push("u16".to_string());
645        args.push(format!("#{idx}"));
646    }
647    args
648}
649
650#[must_use]
651pub(crate) fn histogram_buckets_from_counts(
652    counts: &[u64; HISTOGRAM_BUCKET_COUNT],
653) -> Vec<JobMetricsHistogramBucket> {
654    HISTOGRAM_BUCKET_LABELS
655        .iter()
656        .zip(counts.iter())
657        .enumerate()
658        .map(|(idx, (label, count))| JobMetricsHistogramBucket {
659            label: (*label).to_string(),
660            upper_bound_ms: (idx < HISTOGRAM_BUCKET_COUNT - 1)
661                .then(|| HISTOGRAM_BUCKET_INTERVALS_MS.get(idx).copied())
662                .flatten(),
663            count: *count,
664        })
665        .collect()
666}
667
668fn split_metric_field(field: &str) -> Option<(MetricIdentity, &str)> {
669    let (identity_key, metric) = field.rsplit_once('|')?;
670    match metric {
671        METRIC_PROCESSED_JOBS
672        | METRIC_FAILED_JOBS
673        | METRIC_PANICKED_JOBS
674        | METRIC_SUCCESSFUL_EXECUTIONS
675        | METRIC_FAILED_EXECUTIONS
676        | METRIC_PANICKED_EXECUTIONS
677        | METRIC_EXECUTION_MS => {
678            MetricIdentity::from_field_key(identity_key).map(|id| (id, metric))
679        }
680        _ => None,
681    }
682}
683
684fn split_queue_metric_field(field: &str) -> Option<(String, &str)> {
685    let (queue_key, metric) = field.rsplit_once('|')?;
686    match metric {
687        QUEUE_METRIC_PROCESSED_JOBS | QUEUE_METRIC_SUCCEEDED_JOBS | QUEUE_METRIC_FAILED_JOBS => {
688            let (queue_len, queue) = queue_key.split_once(':')?;
689            let queue_len = queue_len.parse::<usize>().ok()?;
690
691            if queue.len() != queue_len || !queue.is_char_boundary(queue_len) {
692                return None;
693            }
694
695            Some((queue.to_string(), metric))
696        }
697        _ => None,
698    }
699}
700
701#[cfg(test)]
702mod tests {
703    use std::collections::HashMap;
704
705    use super::*;
706
707    #[test]
708    fn histogram_bucket_boundaries_use_configured_thresholds() {
709        assert_eq!(histogram_bucket_index(0), 0);
710        assert_eq!(histogram_bucket_index(24), 0);
711        assert_eq!(histogram_bucket_index(25), 1);
712        assert_eq!(histogram_bucket_index(49), 1);
713        assert_eq!(histogram_bucket_index(50), 2);
714        assert_eq!(histogram_bucket_index(299_999), 12);
715        assert_eq!(histogram_bucket_index(300_000), 13);
716    }
717
718    #[test]
719    fn bitfield_increment_args_use_saturated_u16_counters() {
720        let mut buckets = [0_u64; HISTOGRAM_BUCKET_COUNT];
721        *buckets
722            .first_mut()
723            .expect("histogram should include the first bucket") = 2;
724        *buckets
725            .get_mut(13)
726            .expect("histogram should include the last bucket") = 70_000;
727
728        let args = histogram_bitfield_increment_args(&buckets);
729
730        assert_eq!(
731            args,
732            vec![
733                "OVERFLOW", "SAT", "INCRBY", "u16", "#0", "2", "INCRBY", "u16", "#13", "70000",
734            ]
735        );
736    }
737
738    #[test]
739    fn query_aggregation_computes_totals_and_clamps_minutes() {
740        let identity = MetricIdentity {
741            worker: "WorkerA".to_string(),
742        };
743        let other = MetricIdentity {
744            worker: "WorkerB".to_string(),
745        };
746        let minutes = metric_minutes(10_000, JobMetricsQuery::new(999));
747        assert_eq!(minutes.len(), MAX_METRIC_MINUTES);
748
749        let mut hashes = vec![HashMap::new(); minutes.len()];
750        let first_hash = hashes
751            .get_mut(0)
752            .expect("query should include a first minute bucket");
753        first_hash.insert(identity.metric_field(METRIC_PROCESSED_JOBS), 3);
754        first_hash.insert(identity.metric_field(METRIC_FAILED_JOBS), 1);
755        first_hash.insert(identity.metric_field(METRIC_SUCCESSFUL_EXECUTIONS), 2);
756        first_hash.insert(identity.metric_field(METRIC_FAILED_EXECUTIONS), 1);
757        first_hash.insert(identity.metric_field(METRIC_EXECUTION_MS), 250);
758
759        let second_hash = hashes
760            .get_mut(1)
761            .expect("query should include a second minute bucket");
762        second_hash.insert(other.metric_field(METRIC_PROCESSED_JOBS), 2);
763        second_hash.insert(other.metric_field(METRIC_FAILED_JOBS), 1);
764        second_hash.insert(other.metric_field(METRIC_PANICKED_JOBS), 1);
765        second_hash.insert(other.metric_field(METRIC_SUCCESSFUL_EXECUTIONS), 1);
766        second_hash.insert(other.metric_field(METRIC_FAILED_EXECUTIONS), 1);
767        second_hash.insert(other.metric_field(METRIC_PANICKED_EXECUTIONS), 1);
768        second_hash.insert(other.metric_field(METRIC_EXECUTION_MS), 100);
769
770        let aggregation = aggregate_counter_hashes(&minutes, hashes.clone(), None);
771        assert_eq!(aggregation.totals.processed, 5);
772        assert_eq!(aggregation.totals.failed, 2);
773        assert_eq!(aggregation.totals.panicked, 1);
774        assert_eq!(aggregation.totals.succeeded, 3);
775        assert_eq!(aggregation.totals.successful_executions, 3);
776        assert_eq!(aggregation.totals.failed_executions, 2);
777        assert_eq!(aggregation.totals.panicked_executions, 1);
778        assert_eq!(aggregation.totals.failed_executions_without_panics(), 1);
779        assert_eq!(aggregation.totals.execution_ms, 350);
780        assert_eq!(aggregation.workers.len(), 2);
781        let first_worker = aggregation
782            .workers
783            .first()
784            .expect("first worker summary should exist");
785        let first_point = first_worker
786            .series
787            .first()
788            .expect("first worker should have a first series point");
789        assert_eq!(first_worker.series.len(), MAX_METRIC_MINUTES);
790        assert_eq!(first_point.processed, 3);
791        assert_eq!(first_point.failed, 1);
792        assert_eq!(first_point.succeeded, 2);
793        assert_eq!(first_point.successful_executions, 2);
794        assert_eq!(first_point.execution_ms, 250);
795
796        let second_worker = aggregation
797            .workers
798            .get(1)
799            .expect("second worker summary should exist");
800        assert_eq!(second_worker.totals.panicked, 1);
801        assert_eq!(second_worker.totals.failed_executions, 1);
802        assert_eq!(second_worker.totals.panicked_executions, 1);
803        assert_eq!(second_worker.totals.failed_executions_without_panics(), 0);
804
805        let filtered = aggregate_counter_hashes(&minutes, hashes, Some(&identity));
806        assert_eq!(filtered.totals.processed, 3);
807        assert_eq!(filtered.totals.failed, 1);
808        assert_eq!(filtered.totals.succeeded, 2);
809        assert_eq!(filtered.totals.successful_executions, 2);
810        assert_eq!(filtered.totals.execution_ms, 250);
811        assert_eq!(filtered.workers.len(), 1);
812        let filtered_worker = filtered
813            .workers
814            .first()
815            .expect("filtered worker summary should exist");
816        let empty_point = filtered_worker
817            .series
818            .get(1)
819            .expect("filtered worker should have a second series point");
820        assert_eq!(empty_point.processed, 0);
821    }
822
823    #[test]
824    fn metric_identity_round_trips_with_delimiters() {
825        let identity = MetricIdentity {
826            worker: "crate::worker|Name".to_string(),
827        };
828
829        assert_eq!(
830            MetricIdentity::from_field_key(&identity.field_key()),
831            Some(identity)
832        );
833    }
834
835    #[test]
836    fn queue_counter_hashes_aggregate_with_missing_fields_as_zero() {
837        let queue = "critical:tenant|fast";
838        let mut hashes = vec![HashMap::new(), HashMap::new()];
839        let second_hash = hashes
840            .get_mut(1)
841            .expect("query should include a second minute bucket");
842        second_hash.insert(queue_metric_field(queue, QUEUE_METRIC_PROCESSED_JOBS), 5);
843        second_hash.insert(queue_metric_field(queue, QUEUE_METRIC_FAILED_JOBS), 2);
844
845        let counters = aggregate_queue_counter_hashes(hashes);
846        let totals = counters
847            .get(queue)
848            .expect("queue counters should be aggregated");
849
850        assert_eq!(totals.processed, 5);
851        assert_eq!(totals.succeeded, 0);
852        assert_eq!(totals.failed, 2);
853    }
854
855    #[test]
856    fn job_metrics_buffer_records_queue_counters() {
857        let mut buffer = JobMetricsBuffer::default();
858
859        buffer.record(&WorkerResult {
860            kind: WorkerResultKind::Success,
861            worker_name: "Worker".to_string(),
862            queue: "default".to_string(),
863            execution_ms: 10,
864            job_count: 3,
865        });
866        buffer.record(&WorkerResult {
867            kind: WorkerResultKind::Panicked,
868            worker_name: "Worker".to_string(),
869            queue: "default".to_string(),
870            execution_ms: 10,
871            job_count: 1,
872        });
873
874        let queue_records = buffer.queue_records().collect::<Vec<_>>();
875        assert_eq!(queue_records.len(), 1);
876        let (_, queue, counters) = queue_records
877            .first()
878            .expect("queue counter record should exist");
879        assert_eq!(*queue, "default");
880        assert_eq!(counters.processed, 4);
881        assert_eq!(counters.succeeded, 3);
882        assert_eq!(counters.failed, 1);
883    }
884
885    #[test]
886    fn queue_length_series_aggregates_and_sorts_by_peak_length() {
887        let minutes = vec![10, 11, 12];
888        let mut hashes = vec![HashMap::new(); minutes.len()];
889        hashes
890            .first_mut()
891            .expect("query should include a first minute bucket")
892            .insert("default".to_string(), 2);
893        hashes
894            .get_mut(1)
895            .expect("query should include a second minute bucket")
896            .insert("critical".to_string(), 7);
897        let third_hash = hashes
898            .get_mut(2)
899            .expect("query should include a third minute bucket");
900        third_hash.insert("default".to_string(), 4);
901        third_hash.insert("critical".to_string(), 0);
902
903        let queues = queue_length_series_from_hashes(&minutes, hashes);
904
905        assert_eq!(queues.len(), 2);
906        let critical = queues
907            .first()
908            .expect("critical queue length series should exist");
909        assert_eq!(critical.queue, "critical");
910        assert_eq!(
911            critical
912                .series
913                .first()
914                .expect("critical series should include the first point")
915                .timestamp,
916            600
917        );
918        assert_eq!(
919            critical
920                .series
921                .iter()
922                .map(|point| point.enqueued)
923                .collect::<Vec<_>>(),
924            vec![0, 7, 0]
925        );
926
927        let default = queues
928            .get(1)
929            .expect("default queue length series should exist");
930        assert_eq!(default.queue, "default");
931        assert_eq!(
932            default
933                .series
934                .iter()
935                .map(|point| point.enqueued)
936                .collect::<Vec<_>>(),
937            vec![2, 0, 4]
938        );
939    }
940
941    #[test]
942    fn metric_identity_reads_legacy_worker_queue_keys_as_worker_only() {
943        let legacy_key = "6:Workerdefault";
944
945        assert_eq!(
946            MetricIdentity::from_field_key(legacy_key),
947            Some(MetricIdentity {
948                worker: "Worker".to_string(),
949            })
950        );
951    }
952}