1use axum_prometheus::{
8 metrics::{describe_counter, describe_gauge, describe_histogram, gauge, Unit},
9 metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle},
10 utils::SECONDS_DURATION_BUCKETS,
11 GenericMetricLayer, PrometheusMetricLayer, AXUM_HTTP_REQUESTS_DURATION_SECONDS,
12};
13use tokio_util::sync::CancellationToken;
14
15use crate::db::DBPools;
16
17pub const SUBMISSIONS_TOTAL_COUNTER: &str = "submissions_total_count";
18pub const SUBMISSIONS_COMPLETED_COUNTER: &str = "submissions_completed_count";
19pub const SUBMISSIONS_FAILED_COUNTER: &str = "submissions_failed_count";
20pub const SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM: &str = "submissions_complete_duration_seconds";
21pub const SUBMISSIONS_DURATION_FAIL_HISTOGRAM: &str = "submissions_fail_duration_seconds";
22
23pub const CHUNKS_COMPLETED_COUNTER: &str = "chunks_completed_count";
24pub const CHUNKS_FAILED_COUNTER: &str = "chunks_failed_count";
25pub const CHUNKS_RETRIED_COUNTER: &str = "chunks_retried_count";
26pub const CHUNKS_SKIPPED_COUNTER: &str = "chunks_skipped_count";
27pub const CHUNKS_TOTAL_COUNTER: &str = "chunks_total_count";
28pub const CHUNKS_BACKLOG_GAUGE: &str = "chunks_in_backlog_count";
29
30pub const CHUNKS_DURATION_COMPLETED_HISTOGRAM: &str = "chunks_duration_completed_seconds";
31pub const CHUNKS_DURATION_FAILED_HISTOGRAM: &str = "chunks_duration_failure_seconds";
32
33pub const RESERVER_RESERVATIONS_SUCCEEDED_COUNTER: &str = "reserver_reservations_succeeded_count";
34pub const RESERVER_RESERVATIONS_FAILED_COUNTER: &str = "reserver_reservations_failed_count";
35pub const RESERVER_CHUNKS_RESERVED_GAUGE: &str = "reserver_chunks_reserved_count";
36
37pub const CONSUMERS_CONNECTED_GAUGE: &str = "consumers_connected_count";
38pub const CONSUMER_FETCH_AND_RESERVE_CHUNKS_HISTOGRAM: &str =
39 "consumer_fetch_and_reserve_chunks_duration_seconds";
40pub const CONSUMER_COMPLETE_CHUNK_DURATION: &str = "consumer_complete_chunk_duration_seconds";
41pub const CONSUMER_FAIL_CHUNK_DURATION: &str = "consumer_fail_chunk_duration_seconds";
42
43pub const OPERATIONS_BACKLOG_GAUGE: &str = "operations_in_backlog_count";
44
45pub fn describe_metrics() {
46 describe_counter!(SUBMISSIONS_TOTAL_COUNTER, Unit::Count, "Total count of submissions (in backlog + completed + failed), i.e. total that ever entered the system");
47 describe_counter!(
48 SUBMISSIONS_COMPLETED_COUNTER,
49 Unit::Count,
50 "Number of submissions completed successfully"
51 );
52 describe_counter!(
53 SUBMISSIONS_FAILED_COUNTER,
54 Unit::Count,
55 "Number of submissions failed permanently"
56 );
57 describe_histogram!(SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its final chunk being completed. Does not count failed submissions.");
58
59 describe_counter!(
60 CHUNKS_COMPLETED_COUNTER,
61 Unit::Count,
62 "Number of chunks completed"
63 );
64 describe_counter!(
65 CHUNKS_FAILED_COUNTER,
66 Unit::Count,
67 "Number of chunks failed permanently (retries exhausted). Does not include skipped chunks"
68 );
69 describe_counter!(
70 CHUNKS_RETRIED_COUNTER,
71 Unit::Count,
72 "Number of chunks that failed temporarily and will be retried"
73 );
74 describe_counter!(
75 CHUNKS_SKIPPED_COUNTER,
76 Unit::Count,
77 "Number of chunks skipped (because another chunk in the submission failed)"
78 );
79 describe_counter!(CHUNKS_TOTAL_COUNTER, Unit::Count, "Total count of chunks (in backlog + completed + failed), i.e. total that ever entered the system");
80 describe_gauge!(CHUNKS_BACKLOG_GAUGE, Unit::Count, "Number of chunks in the backlog. Note that this is a _gauge_ reflecting the accurate state of the DB");
84 describe_histogram!(CHUNKS_DURATION_COMPLETED_HISTOGRAM, Unit::Seconds, "How long it took from the moment a chunk was reserved until 'complete_chunk' was called, as measured from Opsqueue (so including network overhead and reading/writing to the object_store to get the chunk data)");
85 describe_histogram!(CHUNKS_DURATION_FAILED_HISTOGRAM, Unit::Seconds, "How long it took from the moment a chunk was reserved until 'fail_chunk' was called, as measured from Opsqueue (so including network overhead and reading/writing to the object_store to get the chunk data). Includes chunks that are retried.");
86
87 describe_gauge!(RESERVER_CHUNKS_RESERVED_GAUGE, Unit::Count, "Number of chunks currently reserved by the reserver, i.e. being worked on by the consumers");
88
89 describe_gauge!(
90 CONSUMERS_CONNECTED_GAUGE,
91 Unit::Count,
92 "Number of healthy websocket connections between the system and consumers"
93 );
94 describe_histogram!(CONSUMER_FETCH_AND_RESERVE_CHUNKS_HISTOGRAM, Unit::Seconds, "Time spent by Opsqueue (SQLite + reserver) to reserve `limit` chunks for a consumer using strategy `strategy`");
95 describe_histogram!(
96 CONSUMER_COMPLETE_CHUNK_DURATION,
97 Unit::Seconds,
98 "Time spent by Opsqueue to mark a given chunk as completed"
99 );
100 describe_histogram!(
101 CONSUMER_FAIL_CHUNK_DURATION,
102 Unit::Seconds,
103 "Time spent by Opsqueue to mark a given chunk as failed"
104 );
105
106 describe_gauge!(
107 OPERATIONS_BACKLOG_GAUGE,
108 Unit::Count,
109 "Slight overestimation of the number of operations in the backlog (chunk_size * chunk_count for each submission). For simplicity, we always consider the last chunk full. This is a gauge reflecting the rough current DB state."
110 );
111}
112
113pub type PrometheusConfig = (
114 GenericMetricLayer<'static, PrometheusHandle, axum_prometheus::Handle>,
115 PrometheusHandle,
116);
117
118#[must_use]
119pub fn setup_prometheus() -> (
120 GenericMetricLayer<'static, PrometheusHandle, axum_prometheus::Handle>,
121 PrometheusHandle,
122) {
123 let metric_layer = PrometheusMetricLayer::new();
125 let metric_handle = PrometheusBuilder::new()
127 .set_buckets_for_metric(
128 Matcher::Full(AXUM_HTTP_REQUESTS_DURATION_SECONDS.to_string()),
129 SECONDS_DURATION_BUCKETS,
130 )
131 .expect("Building Prometheus failed")
132 .install_recorder()
133 .expect("Installing global Prometheus recorder failed");
134
135 describe_metrics();
136
137 (metric_layer, metric_handle)
138}
139
140pub fn time_delta_as_f64(td: chrono::TimeDelta) -> f64 {
144 const NANOS_PER_SEC: f64 = 1_000_000_000.0;
145 (td.num_seconds() as f64) + (td.subsec_nanos() as f64) / NANOS_PER_SEC
146}
147
148pub async fn calculate_scaling_metrics(db_pool: &DBPools) -> anyhow::Result<()> {
150 let mut conn = db_pool.reader_conn().await?;
151 let chunks_backlog_count: u64 = crate::common::chunk::db::count_chunks(&mut conn)
152 .await?
153 .into();
154 gauge!(CHUNKS_BACKLOG_GAUGE).set(chunks_backlog_count as f64);
155 let ops_backlog_count: f64 =
156 crate::common::chunk::db::count_ops_in_backlog_estimate(&mut conn).await?;
157 gauge!(OPERATIONS_BACKLOG_GAUGE).set(ops_backlog_count);
158
159 Ok(())
160}
161
162pub async fn periodically_calculate_scaling_metrics(
165 db_pool: &DBPools,
166 cancellation_token: &CancellationToken,
167) {
168 const METRICS_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
169 loop {
170 tokio::select! {
171 _ = cancellation_token.cancelled() => break,
172 _ = tokio::time::sleep(METRICS_INTERVAL) => {
173 if let Err(e) = calculate_scaling_metrics(db_pool).await {
174 tracing::error!("Error calculating scaling metrics: {}", e);
175 }
176 }
177 }
178 }
179}