ora_server/
metrics.rs

1use ahash::HashMap;
2use futures::StreamExt;
3use metrics::{counter, describe_counter, describe_gauge, gauge};
4use ora_storage::{JobQueryFilters, ScheduleQueryFilters, Storage};
5
6use crate::events::EventBus;
7
8/// Collect and expose metrics of the server.
9///
10/// Note that the metrics collected here are not detailed
11/// and are only meant to provide a high-level overview of
12/// the server's activity.
13pub(crate) async fn collect_metrics(store: &impl Storage, bus: &EventBus) -> eyre::Result<()> {
14    let mut events = bus.subscribe_audit_events();
15
16    describe_gauge!("ora_server_active_jobs", "Number of active jobs.");
17    describe_counter!("ora_server_jobs_added_total", "Number of jobs added.");
18    describe_counter!(
19        "ora_server_jobs_cancelled_total",
20        "Number of jobs cancelled."
21    );
22    describe_counter!("ora_server_jobs_deleted_total", "Number of jobs deleted.");
23    describe_counter!(
24        "ora_server_jobs_succeeded_total",
25        "Number of jobs succeeded."
26    );
27    describe_counter!("ora_server_jobs_failed_total", "Number of jobs failed.");
28
29    describe_gauge!(
30        "ora_server_executors_connected",
31        "Number of connected executors."
32    );
33
34    describe_counter!(
35        "ora_server_snapshots_exported_total",
36        "Number of snapshots exported."
37    );
38    describe_counter!(
39        "ora_server_snapshots_imported_total",
40        "Number of snapshots imported."
41    );
42
43    describe_counter!(
44        "ora_server_schedules_added_total",
45        "Number of schedules added."
46    );
47
48    describe_gauge!("ora_server_active_schedules", "Number of active schedules.");
49
50    describe_counter!(
51        "ora_server_schedules_cancelled_total",
52        "Number of schedules cancelled."
53    );
54
55    describe_counter!(
56        "ora_server_schedules_deleted_total",
57        "Number of schedules deleted."
58    );
59
60    let active_job_count = store
61        .count_jobs(JobQueryFilters {
62            active: Some(true),
63            ..Default::default()
64        })
65        .await?;
66
67    #[allow(clippy::cast_precision_loss)]
68    gauge!("ora_server_active_jobs").set(active_job_count as f64);
69
70    let active_schedules_count = store
71        .count_schedules(ScheduleQueryFilters {
72            active: Some(true),
73            ..Default::default()
74        })
75        .await?;
76
77    #[allow(clippy::cast_precision_loss)]
78    gauge!("ora_server_active_schedules").set(active_schedules_count as f64);
79
80    let mut job_job_types = HashMap::default();
81    let mut execution_jobs = HashMap::default();
82
83    while let Some(event) = events.next().await {
84        match event.kind {
85            crate::AuditEventKind::JobAdded {
86                job_id,
87                job_type_id,
88            } => {
89                counter!(
90                    "ora_server_jobs_added_total",
91                    "job_type_id" => job_type_id.clone(),
92                )
93                .increment(1);
94
95                gauge!("ora_server_active_jobs").increment(1);
96
97                job_job_types.insert(job_id, job_type_id);
98            }
99            crate::AuditEventKind::JobCancelled { job_id } => {
100                gauge!("ora_server_active_jobs").decrement(1);
101
102                if let Some(job_type_id) = job_job_types.remove(&job_id) {
103                    counter!(
104                        "ora_server_jobs_cancelled_total",
105                        "job_type_id" => job_type_id,
106                    )
107                    .increment(1);
108                }
109            }
110            crate::AuditEventKind::JobDeleted { .. } => {
111                counter!("ora_server_jobs_deleted_total").increment(1);
112            }
113            crate::AuditEventKind::ExecutionAdded {
114                execution_id,
115                job_id,
116                ..
117            } => {
118                execution_jobs.insert(execution_id, job_id);
119            }
120
121            crate::AuditEventKind::ExecutionSucceeded { execution_id } => {
122                gauge!("ora_server_active_jobs").decrement(1);
123
124                if let Some(job_id) = execution_jobs.remove(&execution_id) {
125                    if let Some(job_type_id) = job_job_types.remove(&job_id) {
126                        counter!(
127                            "ora_server_jobs_succeeded_total",
128                            "job_type_id" => job_type_id,
129                        )
130                        .increment(1);
131                    }
132                }
133            }
134            crate::AuditEventKind::ExecutionFailed {
135                execution_id,
136                terminal,
137            } => {
138                if terminal {
139                    gauge!("ora_server_active_jobs").decrement(1);
140
141                    if let Some(job_id) = execution_jobs.remove(&execution_id) {
142                        if let Some(job_type_id) = job_job_types.remove(&job_id) {
143                            counter!(
144                                "ora_server_jobs_failed_total",
145                                "job_type_id" => job_type_id,
146                            )
147                            .increment(1);
148                        }
149                    }
150                }
151            }
152            crate::AuditEventKind::ExecutorConnected { .. } => {
153                gauge!("ora_server_executors_connected").increment(1);
154            }
155            crate::AuditEventKind::ExecutorDisconnected { .. } => {
156                gauge!("ora_server_executors_connected").decrement(1);
157            }
158            crate::AuditEventKind::ScheduleAdded { .. } => {
159                counter!("ora_server_schedules_added_total").increment(1);
160                gauge!("ora_server_active_schedules").increment(1);
161            }
162            crate::AuditEventKind::ScheduleCancelled { .. } => {
163                counter!("ora_server_schedules_cancelled_total").increment(1);
164                gauge!("ora_server_active_schedules").decrement(1);
165            }
166            crate::AuditEventKind::ScheduleUnschedulable { .. } => {
167                gauge!("ora_server_active_schedules").decrement(1);
168            }
169            crate::AuditEventKind::ScheduleDeleted { .. } => {
170                counter!("ora_server_schedules_deleted_total").increment(1);
171            }
172            crate::AuditEventKind::SnapshotExported => {
173                counter!("ora_server_snapshots_exported_total").increment(1);
174            }
175            crate::AuditEventKind::SnapshotImported => {
176                counter!("ora_server_snapshots_imported_total").increment(1);
177            }
178            crate::AuditEventKind::ExecutionReady { .. }
179            | crate::AuditEventKind::ExecutionAssigned { .. }
180            | crate::AuditEventKind::ExecutionStarted { .. } => {}
181        }
182    }
183
184    Ok(())
185}