opsqueue/
prometheus.rs

1//! Define common metrics exposed via a Prometheus endpoint
2//!
3//! This allows inspection of how well the queue is performing under production load.
4//!
5//! Note that we explicitly have a separate endpoint to check the queue health,
6//! which is more fine-grained than Prometheus' way to check whether a service is 'up'.
7use axum_prometheus::{
8    metrics::{describe_counter, describe_gauge, describe_histogram, gauge, Unit},
9    metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle},
10    utils::SECONDS_DURATION_BUCKETS,
11    GenericMetricLayer, PrometheusMetricLayer, AXUM_HTTP_REQUESTS_DURATION_SECONDS,
12};
13use tokio_util::sync::CancellationToken;
14
15use crate::db::DBPools;
16
17pub const SUBMISSIONS_TOTAL_COUNTER: &str = "submissions_total_count";
18pub const SUBMISSIONS_COMPLETED_COUNTER: &str = "submissions_completed_count";
19pub const SUBMISSIONS_FAILED_COUNTER: &str = "submissions_failed_count";
20pub const SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM: &str = "submissions_complete_duration_seconds";
21pub const SUBMISSIONS_DURATION_FAIL_HISTOGRAM: &str = "submissions_fail_duration_seconds";
22
23pub const CHUNKS_COMPLETED_COUNTER: &str = "chunks_completed_count";
24pub const CHUNKS_FAILED_COUNTER: &str = "chunks_failed_count";
25pub const CHUNKS_RETRIED_COUNTER: &str = "chunks_retried_count";
26pub const CHUNKS_SKIPPED_COUNTER: &str = "chunks_skipped_count";
27pub const CHUNKS_TOTAL_COUNTER: &str = "chunks_total_count";
28pub const CHUNKS_BACKLOG_GAUGE: &str = "chunks_in_backlog_count";
29
30pub const CHUNKS_DURATION_COMPLETED_HISTOGRAM: &str = "chunks_duration_completed_seconds";
31pub const CHUNKS_DURATION_FAILED_HISTOGRAM: &str = "chunks_duration_failure_seconds";
32
33pub const RESERVER_RESERVATIONS_SUCCEEDED_COUNTER: &str = "reserver_reservations_succeeded_count";
34pub const RESERVER_RESERVATIONS_FAILED_COUNTER: &str = "reserver_reservations_failed_count";
35pub const RESERVER_CHUNKS_RESERVED_GAUGE: &str = "reserver_chunks_reserved_count";
36
37pub const CONSUMERS_CONNECTED_GAUGE: &str = "consumers_connected_count";
38pub const CONSUMER_FETCH_AND_RESERVE_CHUNKS_HISTOGRAM: &str =
39    "consumer_fetch_and_reserve_chunks_duration_seconds";
40pub const CONSUMER_COMPLETE_CHUNK_DURATION: &str = "consumer_complete_chunk_duration_seconds";
41pub const CONSUMER_FAIL_CHUNK_DURATION: &str = "consumer_fail_chunk_duration_seconds";
42
43pub const OPERATIONS_BACKLOG_GAUGE: &str = "operations_in_backlog_count";
44
45pub fn describe_metrics() {
46    describe_counter!(SUBMISSIONS_TOTAL_COUNTER, Unit::Count, "Total count of submissions (in backlog + completed + failed), i.e. total that ever entered the system");
47    describe_counter!(
48        SUBMISSIONS_COMPLETED_COUNTER,
49        Unit::Count,
50        "Number of submissions completed successfully"
51    );
52    describe_counter!(
53        SUBMISSIONS_FAILED_COUNTER,
54        Unit::Count,
55        "Number of submissions failed permanently"
56    );
57    describe_histogram!(SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its final chunk being completed. Does not count failed submissions.");
58
59    describe_counter!(
60        CHUNKS_COMPLETED_COUNTER,
61        Unit::Count,
62        "Number of chunks completed"
63    );
64    describe_counter!(
65        CHUNKS_FAILED_COUNTER,
66        Unit::Count,
67        "Number of chunks failed permanently (retries exhausted). Does not include skipped chunks"
68    );
69    describe_counter!(
70        CHUNKS_RETRIED_COUNTER,
71        Unit::Count,
72        "Number of chunks that failed temporarily and will be retried"
73    );
74    describe_counter!(
75        CHUNKS_SKIPPED_COUNTER,
76        Unit::Count,
77        "Number of chunks skipped (because another chunk in the submission failed)"
78    );
79    describe_counter!(CHUNKS_TOTAL_COUNTER, Unit::Count, "Total count of chunks (in backlog + completed + failed), i.e. total that ever entered the system");
80    // We could calculate the backlog size from TOTAL - COMPLETED - FAILED - SKIPPED
81    // but since it will commonly be used for checking whether we should autoscale,
82    // it's much nicer to measure/expose it directly
83    describe_gauge!(CHUNKS_BACKLOG_GAUGE, Unit::Count, "Number of chunks in the backlog. Note that this is a _gauge_ reflecting the accurate state of the DB");
84    describe_histogram!(CHUNKS_DURATION_COMPLETED_HISTOGRAM, Unit::Seconds, "How long it took from the moment a chunk was reserved until 'complete_chunk' was called, as measured from Opsqueue (so including network overhead and reading/writing to the object_store to get the chunk data)");
85    describe_histogram!(CHUNKS_DURATION_FAILED_HISTOGRAM, Unit::Seconds, "How long it took from the moment a chunk was reserved until 'fail_chunk' was called, as measured from Opsqueue (so including network overhead and reading/writing to the object_store to get the chunk data). Includes chunks that are retried.");
86
87    describe_gauge!(RESERVER_CHUNKS_RESERVED_GAUGE, Unit::Count, "Number of chunks currently reserved by the reserver, i.e. being worked on by the consumers");
88
89    describe_gauge!(
90        CONSUMERS_CONNECTED_GAUGE,
91        Unit::Count,
92        "Number of healthy websocket connections between the system and consumers"
93    );
94    describe_histogram!(CONSUMER_FETCH_AND_RESERVE_CHUNKS_HISTOGRAM, Unit::Seconds, "Time spent by Opsqueue (SQLite + reserver) to reserve `limit` chunks for a consumer using strategy `strategy`");
95    describe_histogram!(
96        CONSUMER_COMPLETE_CHUNK_DURATION,
97        Unit::Seconds,
98        "Time spent by Opsqueue to mark a given chunk as completed"
99    );
100    describe_histogram!(
101        CONSUMER_FAIL_CHUNK_DURATION,
102        Unit::Seconds,
103        "Time spent by Opsqueue to mark a given chunk as failed"
104    );
105
106    describe_gauge!(
107        OPERATIONS_BACKLOG_GAUGE,
108        Unit::Count,
109        "Slight overestimation of the number of operations in the backlog (chunk_size * chunk_count for each submission). For simplicity, we always consider the last chunk full. This is a gauge reflecting the rough current DB state."
110    );
111}
112
113pub type PrometheusConfig = (
114    GenericMetricLayer<'static, PrometheusHandle, axum_prometheus::Handle>,
115    PrometheusHandle,
116);
117
118#[must_use]
119pub fn setup_prometheus() -> (
120    GenericMetricLayer<'static, PrometheusHandle, axum_prometheus::Handle>,
121    PrometheusHandle,
122) {
123    // PrometheusMetricLayer::pair()
124    let metric_layer = PrometheusMetricLayer::new();
125    // This is the default if you use `PrometheusMetricLayer::pair`.
126    let metric_handle = PrometheusBuilder::new()
127        .set_buckets_for_metric(
128            Matcher::Full(AXUM_HTTP_REQUESTS_DURATION_SECONDS.to_string()),
129            SECONDS_DURATION_BUCKETS,
130        )
131        .expect("Building Prometheus failed")
132        .install_recorder()
133        .expect("Installing global Prometheus recorder failed");
134
135    describe_metrics();
136
137    (metric_layer, metric_handle)
138}
139
140/// Returns the number of seconds contained by this TimeDelta as f64, with nanosecond precision.
141///
142/// Adapted from <https://doc.rust-lang.org/std/time/struct.Duration.html#method.as_secs_f64>
143pub fn time_delta_as_f64(td: chrono::TimeDelta) -> f64 {
144    const NANOS_PER_SEC: f64 = 1_000_000_000.0;
145    (td.num_seconds() as f64) + (td.subsec_nanos() as f64) / NANOS_PER_SEC
146}
147
148/// Calculates the backlog-size metrics used for autoscaling.
149pub async fn calculate_scaling_metrics(db_pool: &DBPools) -> anyhow::Result<()> {
150    let mut conn = db_pool.reader_conn().await?;
151    let chunks_backlog_count: u64 = crate::common::chunk::db::count_chunks(&mut conn)
152        .await?
153        .into();
154    gauge!(CHUNKS_BACKLOG_GAUGE).set(chunks_backlog_count as f64);
155    let ops_backlog_count: f64 =
156        crate::common::chunk::db::count_ops_in_backlog_estimate(&mut conn).await?;
157    gauge!(OPERATIONS_BACKLOG_GAUGE).set(ops_backlog_count);
158
159    Ok(())
160}
161
162/// Calculate the backlog-size metrics used for autoscaling periodically;
163/// currently every 5 seconds.
164pub async fn periodically_calculate_scaling_metrics(
165    db_pool: &DBPools,
166    cancellation_token: &CancellationToken,
167) {
168    const METRICS_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
169    loop {
170        tokio::select! {
171            _ = cancellation_token.cancelled() => break,
172            _ = tokio::time::sleep(METRICS_INTERVAL) => {
173                if let Err(e) = calculate_scaling_metrics(db_pool).await {
174                    tracing::error!("Error calculating scaling metrics: {}", e);
175                }
176            }
177        }
178    }
179}