1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5
6use crate::result_collector::{WorkerResult, WorkerResultKind};
7
8pub(crate) const METRICS_RETENTION_SECS: i64 = 8 * 60 * 60;
9pub(crate) const DEFAULT_METRIC_MINUTES: usize = 60;
10pub(crate) const MAX_METRIC_MINUTES: usize = 8 * 60;
11pub(crate) const QUEUE_RATE_WINDOW_MINUTES: usize = 10;
12pub(crate) const HISTOGRAM_BUCKET_COUNT: usize = 14;
13
14pub const HISTOGRAM_BUCKET_INTERVALS_MS: [u64; HISTOGRAM_BUCKET_COUNT] = [
16 25,
17 50,
18 100,
19 250,
20 500,
21 1000,
22 2500,
23 5000,
24 10000,
25 30000,
26 60000,
27 120000,
28 300000,
29 u64::MAX,
30];
31
32pub const HISTOGRAM_BUCKET_LABELS: [&str; HISTOGRAM_BUCKET_COUNT] = [
34 "25ms", "50ms", "100ms", "250ms", "500ms", "1s", "2.5s", "5s", "10s", "30s", "60s", "120s",
35 "5min", "Slow",
36];
37
38pub(crate) const METRIC_PROCESSED_JOBS: &str = "p";
39pub(crate) const METRIC_FAILED_JOBS: &str = "f";
40pub(crate) const METRIC_PANICKED_JOBS: &str = "pn";
41pub(crate) const METRIC_SUCCESSFUL_EXECUTIONS: &str = "xs";
42pub(crate) const METRIC_FAILED_EXECUTIONS: &str = "xf";
43pub(crate) const METRIC_PANICKED_EXECUTIONS: &str = "xpn";
44pub(crate) const METRIC_EXECUTION_MS: &str = "ms";
45pub(crate) const QUEUE_METRIC_PROCESSED_JOBS: &str = "p";
46pub(crate) const QUEUE_METRIC_SUCCEEDED_JOBS: &str = "s";
47pub(crate) const QUEUE_METRIC_FAILED_JOBS: &str = "f";
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
50pub struct MetricIdentity {
51 pub worker: String,
52}
53
54impl MetricIdentity {
55 pub(crate) fn from_worker_result(result: &WorkerResult) -> Self {
56 Self {
57 worker: result.worker_name.clone(),
58 }
59 }
60
61 pub(crate) fn field_key(&self) -> String {
62 format!("{}:{}", self.worker.len(), self.worker)
63 }
64
65 pub(crate) fn from_field_key(key: &str) -> Option<Self> {
66 let (worker_len, rest) = key.split_once(':')?;
67 let worker_len = worker_len.parse::<usize>().ok()?;
68
69 if rest.len() < worker_len || !rest.is_char_boundary(worker_len) {
70 return None;
71 }
72
73 let (worker, _) = rest.split_at(worker_len);
74 Some(Self {
75 worker: worker.to_string(),
76 })
77 }
78
79 pub(crate) fn metric_field(&self, metric: &str) -> String {
80 format!("{}|{metric}", self.field_key())
81 }
82}
83
84#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
85pub struct JobMetricsQuery {
86 pub minutes: usize,
87}
88
89impl JobMetricsQuery {
90 #[must_use]
91 pub fn new(minutes: usize) -> Self {
92 Self { minutes }
93 }
94
95 #[must_use]
96 pub fn effective_minutes(&self) -> usize {
97 if self.minutes == 0 {
98 DEFAULT_METRIC_MINUTES
99 } else {
100 self.minutes.min(MAX_METRIC_MINUTES)
101 }
102 }
103}
104
105impl Default for JobMetricsQuery {
106 fn default() -> Self {
107 Self {
108 minutes: DEFAULT_METRIC_MINUTES,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Default, Serialize, Deserialize)]
114pub struct JobMetricsTotals {
115 pub processed: u64,
117 pub succeeded: u64,
119 pub failed: u64,
121 pub panicked: u64,
123 pub successful_executions: u64,
125 pub failed_executions: u64,
127 pub panicked_executions: u64,
129 pub execution_ms: u64,
131}
132
133impl JobMetricsTotals {
134 #[must_use]
135 pub fn average_execution_ms(&self) -> f64 {
136 if self.successful_executions == 0 {
137 0.0
138 } else {
139 self.execution_ms as f64 / self.successful_executions as f64
140 }
141 }
142
143 #[must_use]
144 pub fn execution_seconds(&self) -> f64 {
145 self.execution_ms as f64 / 1000.0
146 }
147
148 #[must_use]
149 pub fn failed_executions_without_panics(&self) -> u64 {
150 self.failed_executions
151 .saturating_sub(self.panicked_executions)
152 }
153
154 fn add_metric(&mut self, metric: &str, value: u64) {
155 match metric {
156 METRIC_PROCESSED_JOBS => self.processed = self.processed.saturating_add(value),
157 METRIC_FAILED_JOBS => self.failed = self.failed.saturating_add(value),
158 METRIC_PANICKED_JOBS => self.panicked = self.panicked.saturating_add(value),
159 METRIC_SUCCESSFUL_EXECUTIONS => {
160 self.successful_executions = self.successful_executions.saturating_add(value);
161 }
162 METRIC_FAILED_EXECUTIONS => {
163 self.failed_executions = self.failed_executions.saturating_add(value);
164 }
165 METRIC_PANICKED_EXECUTIONS => {
166 self.panicked_executions = self.panicked_executions.saturating_add(value);
167 }
168 METRIC_EXECUTION_MS => self.execution_ms = self.execution_ms.saturating_add(value),
169 _ => {}
170 }
171 }
172
173 fn finalize(&mut self) {
174 self.succeeded = self.processed.saturating_sub(self.failed);
175 if self.successful_executions + self.failed_executions + self.panicked_executions == 0 {
176 self.successful_executions = self.succeeded;
177 self.failed_executions = self.failed;
178 self.panicked_executions = self.panicked;
179 }
180 }
181}
182
183#[derive(Debug, Clone, Default, Serialize, Deserialize)]
184pub struct JobMetricsPoint {
185 pub timestamp: i64,
187 pub processed: u64,
189 pub succeeded: u64,
191 pub failed: u64,
193 pub panicked: u64,
195 pub successful_executions: u64,
197 pub failed_executions: u64,
199 pub panicked_executions: u64,
201 pub execution_ms: u64,
203}
204
205impl JobMetricsPoint {
206 #[must_use]
207 pub fn average_execution_ms(&self) -> f64 {
208 if self.successful_executions == 0 {
209 0.0
210 } else {
211 self.execution_ms as f64 / self.successful_executions as f64
212 }
213 }
214
215 #[must_use]
216 pub fn failed_executions_without_panics(&self) -> u64 {
217 self.failed_executions
218 .saturating_sub(self.panicked_executions)
219 }
220
221 fn add_metric(&mut self, metric: &str, value: u64) {
222 match metric {
223 METRIC_PROCESSED_JOBS => self.processed = self.processed.saturating_add(value),
224 METRIC_FAILED_JOBS => self.failed = self.failed.saturating_add(value),
225 METRIC_PANICKED_JOBS => self.panicked = self.panicked.saturating_add(value),
226 METRIC_SUCCESSFUL_EXECUTIONS => {
227 self.successful_executions = self.successful_executions.saturating_add(value);
228 }
229 METRIC_FAILED_EXECUTIONS => {
230 self.failed_executions = self.failed_executions.saturating_add(value);
231 }
232 METRIC_PANICKED_EXECUTIONS => {
233 self.panicked_executions = self.panicked_executions.saturating_add(value);
234 }
235 METRIC_EXECUTION_MS => self.execution_ms = self.execution_ms.saturating_add(value),
236 _ => {}
237 }
238 }
239
240 fn finalize(&mut self) {
241 self.succeeded = self.processed.saturating_sub(self.failed);
242 if self.successful_executions + self.failed_executions + self.panicked_executions == 0 {
243 self.successful_executions = self.succeeded;
244 self.failed_executions = self.failed;
245 self.panicked_executions = self.panicked;
246 }
247 }
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct WorkerMetricsSummary {
252 pub identity: MetricIdentity,
254 pub totals: JobMetricsTotals,
256 pub series: Vec<JobMetricsPoint>,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct JobMetricsSnapshot {
262 pub starts_at: i64,
264 pub ends_at: i64,
266 pub minutes: usize,
268 pub totals: JobMetricsTotals,
270 pub series: Vec<JobMetricsPoint>,
272 pub workers: Vec<WorkerMetricsSummary>,
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct JobMetricsHistogramBucket {
278 pub label: String,
279 pub upper_bound_ms: Option<u64>,
280 pub count: u64,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
284pub struct JobMetricsDetail {
285 pub identity: MetricIdentity,
287 pub starts_at: i64,
289 pub ends_at: i64,
291 pub minutes: usize,
293 pub totals: JobMetricsTotals,
295 pub series: Vec<JobMetricsPoint>,
297 pub histogram: Vec<JobMetricsHistogramBucket>,
299}
300
301#[derive(Debug, Clone, Default, Serialize, Deserialize)]
302pub struct QueueLengthMetricsPoint {
303 pub timestamp: i64,
305 pub enqueued: u64,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize)]
310pub struct QueueLengthMetricsSeries {
311 pub queue: String,
313 pub series: Vec<QueueLengthMetricsPoint>,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct QueueLengthMetricsSnapshot {
319 pub starts_at: i64,
321 pub ends_at: i64,
323 pub minutes: usize,
325 pub queues: Vec<QueueLengthMetricsSeries>,
327}
328
329#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
330pub(crate) struct QueueCounterTotals {
331 pub(crate) processed: u64,
332 pub(crate) succeeded: u64,
333 pub(crate) failed: u64,
334}
335
336impl QueueCounterTotals {
337 fn add_metric(&mut self, metric: &str, value: u64) {
338 match metric {
339 QUEUE_METRIC_PROCESSED_JOBS => {
340 self.processed = self.processed.saturating_add(value);
341 }
342 QUEUE_METRIC_SUCCEEDED_JOBS => {
343 self.succeeded = self.succeeded.saturating_add(value);
344 }
345 QUEUE_METRIC_FAILED_JOBS => {
346 self.failed = self.failed.saturating_add(value);
347 }
348 _ => {}
349 }
350 }
351}
352
353#[derive(Clone, Default)]
354pub(crate) struct JobMetricsBuffer {
355 entries: HashMap<(i64, MetricIdentity), PendingJobMetrics>,
356 queue_entries: HashMap<(i64, String), QueueCounterTotals>,
357}
358
359impl JobMetricsBuffer {
360 pub(crate) fn is_empty(&self) -> bool {
361 self.entries.is_empty() && self.queue_entries.is_empty()
362 }
363
364 pub(crate) fn clear(&mut self) {
365 self.entries.clear();
366 self.queue_entries.clear();
367 }
368
369 pub(crate) fn record(&mut self, result: &WorkerResult) {
370 let minute = chrono::Utc::now().timestamp().div_euclid(60);
371 let identity = MetricIdentity::from_worker_result(result);
372 let metrics = self.entries.entry((minute, identity)).or_default();
373 let queue_metrics = self
374 .queue_entries
375 .entry((minute, result.queue.clone()))
376 .or_default();
377
378 metrics.processed = metrics.processed.saturating_add(result.job_count);
379 queue_metrics.processed = queue_metrics.processed.saturating_add(result.job_count);
380
381 match result.kind {
382 WorkerResultKind::Success => {
383 metrics.successful_executions = metrics.successful_executions.saturating_add(1);
384 metrics.execution_ms = metrics.execution_ms.saturating_add(result.execution_ms);
385 queue_metrics.succeeded = queue_metrics.succeeded.saturating_add(result.job_count);
386 let bucket = histogram_bucket_index(result.execution_ms);
387 if let Some(count) = metrics.histogram.get_mut(bucket) {
388 *count = count.saturating_add(1);
389 }
390 }
391 WorkerResultKind::Panicked => {
392 metrics.failed = metrics.failed.saturating_add(result.job_count);
393 metrics.panicked = metrics.panicked.saturating_add(result.job_count);
394 metrics.failed_executions = metrics.failed_executions.saturating_add(1);
395 metrics.panicked_executions = metrics.panicked_executions.saturating_add(1);
396 queue_metrics.failed = queue_metrics.failed.saturating_add(result.job_count);
397 }
398 WorkerResultKind::Failed => {
399 metrics.failed = metrics.failed.saturating_add(result.job_count);
400 metrics.failed_executions = metrics.failed_executions.saturating_add(1);
401 queue_metrics.failed = queue_metrics.failed.saturating_add(result.job_count);
402 }
403 }
404 }
405
406 pub(crate) fn records(
407 &self,
408 ) -> impl Iterator<Item = (i64, &MetricIdentity, &PendingJobMetrics)> {
409 self.entries
410 .iter()
411 .map(|((minute, identity), metrics)| (*minute, identity, metrics))
412 }
413
414 pub(crate) fn queue_records(&self) -> impl Iterator<Item = (i64, &str, &QueueCounterTotals)> {
415 self.queue_entries
416 .iter()
417 .map(|((minute, queue), metrics)| (*minute, queue.as_str(), metrics))
418 }
419}
420
421#[derive(Clone, Default)]
422pub(crate) struct PendingJobMetrics {
423 pub(crate) processed: u64,
424 pub(crate) failed: u64,
425 pub(crate) panicked: u64,
426 pub(crate) successful_executions: u64,
427 pub(crate) failed_executions: u64,
428 pub(crate) panicked_executions: u64,
429 pub(crate) execution_ms: u64,
430 pub(crate) histogram: [u64; HISTOGRAM_BUCKET_COUNT],
431}
432
433#[derive(Default)]
434pub(crate) struct JobMetricsAggregation {
435 pub(crate) totals: JobMetricsTotals,
436 pub(crate) series: Vec<JobMetricsPoint>,
437 pub(crate) workers: Vec<WorkerMetricsSummary>,
438}
439
440struct WorkerMetricsSummaryBuilder {
441 totals: JobMetricsTotals,
442 series: Vec<JobMetricsPoint>,
443}
444
445#[must_use]
446pub(crate) fn metric_minutes(now_ts: i64, query: JobMetricsQuery) -> Vec<i64> {
447 let minutes = query.effective_minutes();
448 let end_minute = now_ts.div_euclid(60);
449 let start_minute = end_minute - i64::try_from(minutes).unwrap_or(i64::MAX) + 1;
450 (start_minute..=end_minute).collect()
451}
452
453#[must_use]
454pub(crate) fn aggregate_counter_hashes(
455 minutes: &[i64],
456 hashes: Vec<HashMap<String, i64>>,
457 filter: Option<&MetricIdentity>,
458) -> JobMetricsAggregation {
459 let mut aggregation = JobMetricsAggregation {
460 series: minutes
461 .iter()
462 .map(|minute| JobMetricsPoint {
463 timestamp: minute * 60,
464 ..JobMetricsPoint::default()
465 })
466 .collect(),
467 ..JobMetricsAggregation::default()
468 };
469 let worker_series_template: Vec<JobMetricsPoint> = minutes
470 .iter()
471 .map(|minute| JobMetricsPoint {
472 timestamp: minute * 60,
473 ..JobMetricsPoint::default()
474 })
475 .collect();
476 let mut workers: HashMap<MetricIdentity, WorkerMetricsSummaryBuilder> = HashMap::new();
477
478 for (idx, hash) in hashes.into_iter().enumerate() {
479 let Some(point) = aggregation.series.get_mut(idx) else {
480 continue;
481 };
482
483 for (field, raw_value) in hash {
484 let Some((identity, metric)) = split_metric_field(&field) else {
485 continue;
486 };
487 if filter.is_some_and(|expected| expected != &identity) {
488 continue;
489 }
490
491 let value = u64::try_from(raw_value).unwrap_or_default();
492 point.add_metric(metric, value);
493 aggregation.totals.add_metric(metric, value);
494 let worker_summary =
495 workers
496 .entry(identity)
497 .or_insert_with(|| WorkerMetricsSummaryBuilder {
498 totals: JobMetricsTotals::default(),
499 series: worker_series_template.clone(),
500 });
501 worker_summary.totals.add_metric(metric, value);
502 if let Some(worker_point) = worker_summary.series.get_mut(idx) {
503 worker_point.add_metric(metric, value);
504 }
505 }
506
507 point.finalize();
508 }
509
510 aggregation.totals.finalize();
511 aggregation.workers = workers
512 .into_iter()
513 .map(|(identity, mut summary)| {
514 summary.totals.finalize();
515 for point in &mut summary.series {
516 point.finalize();
517 }
518 WorkerMetricsSummary {
519 identity,
520 totals: summary.totals,
521 series: summary.series,
522 }
523 })
524 .collect();
525 aggregation.workers.sort_by(|a, b| {
526 b.totals
527 .execution_ms
528 .cmp(&a.totals.execution_ms)
529 .then_with(|| b.totals.processed.cmp(&a.totals.processed))
530 .then_with(|| a.identity.worker.cmp(&b.identity.worker))
531 });
532
533 aggregation
534}
535
536#[must_use]
537pub(crate) fn queue_metric_field(queue: &str, metric: &str) -> String {
538 format!("{}:{queue}|{metric}", queue.len())
539}
540
541#[must_use]
542pub(crate) fn aggregate_queue_counter_hashes(
543 hashes: Vec<HashMap<String, i64>>,
544) -> HashMap<String, QueueCounterTotals> {
545 let mut queues = HashMap::new();
546
547 for hash in hashes {
548 for (field, raw_value) in hash {
549 let Some((queue, metric)) = split_queue_metric_field(&field) else {
550 continue;
551 };
552 let value = u64::try_from(raw_value).unwrap_or_default();
553 queues
554 .entry(queue)
555 .or_insert_with(QueueCounterTotals::default)
556 .add_metric(metric, value);
557 }
558 }
559
560 queues
561}
562
563#[must_use]
564pub(crate) fn queue_length_series_from_hashes(
565 minutes: &[i64],
566 hashes: Vec<HashMap<String, i64>>,
567) -> Vec<QueueLengthMetricsSeries> {
568 let series_template: Vec<QueueLengthMetricsPoint> = minutes
569 .iter()
570 .map(|minute| QueueLengthMetricsPoint {
571 timestamp: minute * 60,
572 enqueued: 0,
573 })
574 .collect();
575 let mut queues: HashMap<String, Vec<QueueLengthMetricsPoint>> = HashMap::new();
576
577 for (idx, hash) in hashes.into_iter().enumerate() {
578 for (queue, raw_value) in hash {
579 let value = u64::try_from(raw_value).unwrap_or_default();
580 let series = queues
581 .entry(queue)
582 .or_insert_with(|| series_template.clone());
583 if let Some(point) = series.get_mut(idx) {
584 point.enqueued = value;
585 }
586 }
587 }
588
589 let mut queues: Vec<QueueLengthMetricsSeries> = queues
590 .into_iter()
591 .map(|(queue, series)| QueueLengthMetricsSeries { queue, series })
592 .collect();
593
594 queues.sort_by(|a, b| {
595 let a_peak = a
596 .series
597 .iter()
598 .map(|point| point.enqueued)
599 .max()
600 .unwrap_or_default();
601 let b_peak = b
602 .series
603 .iter()
604 .map(|point| point.enqueued)
605 .max()
606 .unwrap_or_default();
607
608 b_peak.cmp(&a_peak).then_with(|| a.queue.cmp(&b.queue))
609 });
610
611 queues
612}
613
614#[must_use]
615pub(crate) fn histogram_bucket_index(duration_ms: u64) -> usize {
616 HISTOGRAM_BUCKET_INTERVALS_MS
617 .iter()
618 .position(|upper| duration_ms < *upper)
619 .unwrap_or(HISTOGRAM_BUCKET_COUNT - 1)
620}
621
622#[must_use]
623pub(crate) fn histogram_bitfield_increment_args(
624 buckets: &[u64; HISTOGRAM_BUCKET_COUNT],
625) -> Vec<String> {
626 let mut args = vec!["OVERFLOW".to_string(), "SAT".to_string()];
627 for (idx, value) in buckets.iter().enumerate() {
628 if *value == 0 {
629 continue;
630 }
631 args.push("INCRBY".to_string());
632 args.push("u16".to_string());
633 args.push(format!("#{idx}"));
634 args.push(value.to_string());
635 }
636 args
637}
638
639#[must_use]
640pub(crate) fn histogram_bitfield_fetch_args() -> Vec<String> {
641 let mut args = Vec::with_capacity(HISTOGRAM_BUCKET_COUNT * 3);
642 for idx in 0..HISTOGRAM_BUCKET_COUNT {
643 args.push("GET".to_string());
644 args.push("u16".to_string());
645 args.push(format!("#{idx}"));
646 }
647 args
648}
649
650#[must_use]
651pub(crate) fn histogram_buckets_from_counts(
652 counts: &[u64; HISTOGRAM_BUCKET_COUNT],
653) -> Vec<JobMetricsHistogramBucket> {
654 HISTOGRAM_BUCKET_LABELS
655 .iter()
656 .zip(counts.iter())
657 .enumerate()
658 .map(|(idx, (label, count))| JobMetricsHistogramBucket {
659 label: (*label).to_string(),
660 upper_bound_ms: (idx < HISTOGRAM_BUCKET_COUNT - 1)
661 .then(|| HISTOGRAM_BUCKET_INTERVALS_MS.get(idx).copied())
662 .flatten(),
663 count: *count,
664 })
665 .collect()
666}
667
668fn split_metric_field(field: &str) -> Option<(MetricIdentity, &str)> {
669 let (identity_key, metric) = field.rsplit_once('|')?;
670 match metric {
671 METRIC_PROCESSED_JOBS
672 | METRIC_FAILED_JOBS
673 | METRIC_PANICKED_JOBS
674 | METRIC_SUCCESSFUL_EXECUTIONS
675 | METRIC_FAILED_EXECUTIONS
676 | METRIC_PANICKED_EXECUTIONS
677 | METRIC_EXECUTION_MS => {
678 MetricIdentity::from_field_key(identity_key).map(|id| (id, metric))
679 }
680 _ => None,
681 }
682}
683
684fn split_queue_metric_field(field: &str) -> Option<(String, &str)> {
685 let (queue_key, metric) = field.rsplit_once('|')?;
686 match metric {
687 QUEUE_METRIC_PROCESSED_JOBS | QUEUE_METRIC_SUCCEEDED_JOBS | QUEUE_METRIC_FAILED_JOBS => {
688 let (queue_len, queue) = queue_key.split_once(':')?;
689 let queue_len = queue_len.parse::<usize>().ok()?;
690
691 if queue.len() != queue_len || !queue.is_char_boundary(queue_len) {
692 return None;
693 }
694
695 Some((queue.to_string(), metric))
696 }
697 _ => None,
698 }
699}
700
701#[cfg(test)]
702mod tests {
703 use std::collections::HashMap;
704
705 use super::*;
706
707 #[test]
708 fn histogram_bucket_boundaries_use_configured_thresholds() {
709 assert_eq!(histogram_bucket_index(0), 0);
710 assert_eq!(histogram_bucket_index(24), 0);
711 assert_eq!(histogram_bucket_index(25), 1);
712 assert_eq!(histogram_bucket_index(49), 1);
713 assert_eq!(histogram_bucket_index(50), 2);
714 assert_eq!(histogram_bucket_index(299_999), 12);
715 assert_eq!(histogram_bucket_index(300_000), 13);
716 }
717
718 #[test]
719 fn bitfield_increment_args_use_saturated_u16_counters() {
720 let mut buckets = [0_u64; HISTOGRAM_BUCKET_COUNT];
721 *buckets
722 .first_mut()
723 .expect("histogram should include the first bucket") = 2;
724 *buckets
725 .get_mut(13)
726 .expect("histogram should include the last bucket") = 70_000;
727
728 let args = histogram_bitfield_increment_args(&buckets);
729
730 assert_eq!(
731 args,
732 vec![
733 "OVERFLOW", "SAT", "INCRBY", "u16", "#0", "2", "INCRBY", "u16", "#13", "70000",
734 ]
735 );
736 }
737
738 #[test]
739 fn query_aggregation_computes_totals_and_clamps_minutes() {
740 let identity = MetricIdentity {
741 worker: "WorkerA".to_string(),
742 };
743 let other = MetricIdentity {
744 worker: "WorkerB".to_string(),
745 };
746 let minutes = metric_minutes(10_000, JobMetricsQuery::new(999));
747 assert_eq!(minutes.len(), MAX_METRIC_MINUTES);
748
749 let mut hashes = vec![HashMap::new(); minutes.len()];
750 let first_hash = hashes
751 .get_mut(0)
752 .expect("query should include a first minute bucket");
753 first_hash.insert(identity.metric_field(METRIC_PROCESSED_JOBS), 3);
754 first_hash.insert(identity.metric_field(METRIC_FAILED_JOBS), 1);
755 first_hash.insert(identity.metric_field(METRIC_SUCCESSFUL_EXECUTIONS), 2);
756 first_hash.insert(identity.metric_field(METRIC_FAILED_EXECUTIONS), 1);
757 first_hash.insert(identity.metric_field(METRIC_EXECUTION_MS), 250);
758
759 let second_hash = hashes
760 .get_mut(1)
761 .expect("query should include a second minute bucket");
762 second_hash.insert(other.metric_field(METRIC_PROCESSED_JOBS), 2);
763 second_hash.insert(other.metric_field(METRIC_FAILED_JOBS), 1);
764 second_hash.insert(other.metric_field(METRIC_PANICKED_JOBS), 1);
765 second_hash.insert(other.metric_field(METRIC_SUCCESSFUL_EXECUTIONS), 1);
766 second_hash.insert(other.metric_field(METRIC_FAILED_EXECUTIONS), 1);
767 second_hash.insert(other.metric_field(METRIC_PANICKED_EXECUTIONS), 1);
768 second_hash.insert(other.metric_field(METRIC_EXECUTION_MS), 100);
769
770 let aggregation = aggregate_counter_hashes(&minutes, hashes.clone(), None);
771 assert_eq!(aggregation.totals.processed, 5);
772 assert_eq!(aggregation.totals.failed, 2);
773 assert_eq!(aggregation.totals.panicked, 1);
774 assert_eq!(aggregation.totals.succeeded, 3);
775 assert_eq!(aggregation.totals.successful_executions, 3);
776 assert_eq!(aggregation.totals.failed_executions, 2);
777 assert_eq!(aggregation.totals.panicked_executions, 1);
778 assert_eq!(aggregation.totals.failed_executions_without_panics(), 1);
779 assert_eq!(aggregation.totals.execution_ms, 350);
780 assert_eq!(aggregation.workers.len(), 2);
781 let first_worker = aggregation
782 .workers
783 .first()
784 .expect("first worker summary should exist");
785 let first_point = first_worker
786 .series
787 .first()
788 .expect("first worker should have a first series point");
789 assert_eq!(first_worker.series.len(), MAX_METRIC_MINUTES);
790 assert_eq!(first_point.processed, 3);
791 assert_eq!(first_point.failed, 1);
792 assert_eq!(first_point.succeeded, 2);
793 assert_eq!(first_point.successful_executions, 2);
794 assert_eq!(first_point.execution_ms, 250);
795
796 let second_worker = aggregation
797 .workers
798 .get(1)
799 .expect("second worker summary should exist");
800 assert_eq!(second_worker.totals.panicked, 1);
801 assert_eq!(second_worker.totals.failed_executions, 1);
802 assert_eq!(second_worker.totals.panicked_executions, 1);
803 assert_eq!(second_worker.totals.failed_executions_without_panics(), 0);
804
805 let filtered = aggregate_counter_hashes(&minutes, hashes, Some(&identity));
806 assert_eq!(filtered.totals.processed, 3);
807 assert_eq!(filtered.totals.failed, 1);
808 assert_eq!(filtered.totals.succeeded, 2);
809 assert_eq!(filtered.totals.successful_executions, 2);
810 assert_eq!(filtered.totals.execution_ms, 250);
811 assert_eq!(filtered.workers.len(), 1);
812 let filtered_worker = filtered
813 .workers
814 .first()
815 .expect("filtered worker summary should exist");
816 let empty_point = filtered_worker
817 .series
818 .get(1)
819 .expect("filtered worker should have a second series point");
820 assert_eq!(empty_point.processed, 0);
821 }
822
823 #[test]
824 fn metric_identity_round_trips_with_delimiters() {
825 let identity = MetricIdentity {
826 worker: "crate::worker|Name".to_string(),
827 };
828
829 assert_eq!(
830 MetricIdentity::from_field_key(&identity.field_key()),
831 Some(identity)
832 );
833 }
834
835 #[test]
836 fn queue_counter_hashes_aggregate_with_missing_fields_as_zero() {
837 let queue = "critical:tenant|fast";
838 let mut hashes = vec![HashMap::new(), HashMap::new()];
839 let second_hash = hashes
840 .get_mut(1)
841 .expect("query should include a second minute bucket");
842 second_hash.insert(queue_metric_field(queue, QUEUE_METRIC_PROCESSED_JOBS), 5);
843 second_hash.insert(queue_metric_field(queue, QUEUE_METRIC_FAILED_JOBS), 2);
844
845 let counters = aggregate_queue_counter_hashes(hashes);
846 let totals = counters
847 .get(queue)
848 .expect("queue counters should be aggregated");
849
850 assert_eq!(totals.processed, 5);
851 assert_eq!(totals.succeeded, 0);
852 assert_eq!(totals.failed, 2);
853 }
854
855 #[test]
856 fn job_metrics_buffer_records_queue_counters() {
857 let mut buffer = JobMetricsBuffer::default();
858
859 buffer.record(&WorkerResult {
860 kind: WorkerResultKind::Success,
861 worker_name: "Worker".to_string(),
862 queue: "default".to_string(),
863 execution_ms: 10,
864 job_count: 3,
865 });
866 buffer.record(&WorkerResult {
867 kind: WorkerResultKind::Panicked,
868 worker_name: "Worker".to_string(),
869 queue: "default".to_string(),
870 execution_ms: 10,
871 job_count: 1,
872 });
873
874 let queue_records = buffer.queue_records().collect::<Vec<_>>();
875 assert_eq!(queue_records.len(), 1);
876 let (_, queue, counters) = queue_records
877 .first()
878 .expect("queue counter record should exist");
879 assert_eq!(*queue, "default");
880 assert_eq!(counters.processed, 4);
881 assert_eq!(counters.succeeded, 3);
882 assert_eq!(counters.failed, 1);
883 }
884
885 #[test]
886 fn queue_length_series_aggregates_and_sorts_by_peak_length() {
887 let minutes = vec![10, 11, 12];
888 let mut hashes = vec![HashMap::new(); minutes.len()];
889 hashes
890 .first_mut()
891 .expect("query should include a first minute bucket")
892 .insert("default".to_string(), 2);
893 hashes
894 .get_mut(1)
895 .expect("query should include a second minute bucket")
896 .insert("critical".to_string(), 7);
897 let third_hash = hashes
898 .get_mut(2)
899 .expect("query should include a third minute bucket");
900 third_hash.insert("default".to_string(), 4);
901 third_hash.insert("critical".to_string(), 0);
902
903 let queues = queue_length_series_from_hashes(&minutes, hashes);
904
905 assert_eq!(queues.len(), 2);
906 let critical = queues
907 .first()
908 .expect("critical queue length series should exist");
909 assert_eq!(critical.queue, "critical");
910 assert_eq!(
911 critical
912 .series
913 .first()
914 .expect("critical series should include the first point")
915 .timestamp,
916 600
917 );
918 assert_eq!(
919 critical
920 .series
921 .iter()
922 .map(|point| point.enqueued)
923 .collect::<Vec<_>>(),
924 vec![0, 7, 0]
925 );
926
927 let default = queues
928 .get(1)
929 .expect("default queue length series should exist");
930 assert_eq!(default.queue, "default");
931 assert_eq!(
932 default
933 .series
934 .iter()
935 .map(|point| point.enqueued)
936 .collect::<Vec<_>>(),
937 vec![2, 0, 4]
938 );
939 }
940
941 #[test]
942 fn metric_identity_reads_legacy_worker_queue_keys_as_worker_only() {
943 let legacy_key = "6:Workerdefault";
944
945 assert_eq!(
946 MetricIdentity::from_field_key(legacy_key),
947 Some(MetricIdentity {
948 worker: "Worker".to_string(),
949 })
950 );
951 }
952}