1use ahash::HashMap;
2use futures::StreamExt;
3use metrics::{counter, describe_counter, describe_gauge, gauge};
4use ora_storage::{JobQueryFilters, ScheduleQueryFilters, Storage};
5
6use crate::events::EventBus;
7
8pub(crate) async fn collect_metrics(store: &impl Storage, bus: &EventBus) -> eyre::Result<()> {
14 let mut events = bus.subscribe_audit_events();
15
16 describe_gauge!("ora_server_active_jobs", "Number of active jobs.");
17 describe_counter!("ora_server_jobs_added_total", "Number of jobs added.");
18 describe_counter!(
19 "ora_server_jobs_cancelled_total",
20 "Number of jobs cancelled."
21 );
22 describe_counter!("ora_server_jobs_deleted_total", "Number of jobs deleted.");
23 describe_counter!(
24 "ora_server_jobs_succeeded_total",
25 "Number of jobs succeeded."
26 );
27 describe_counter!("ora_server_jobs_failed_total", "Number of jobs failed.");
28
29 describe_gauge!(
30 "ora_server_executors_connected",
31 "Number of connected executors."
32 );
33
34 describe_counter!(
35 "ora_server_snapshots_exported_total",
36 "Number of snapshots exported."
37 );
38 describe_counter!(
39 "ora_server_snapshots_imported_total",
40 "Number of snapshots imported."
41 );
42
43 describe_counter!(
44 "ora_server_schedules_added_total",
45 "Number of schedules added."
46 );
47
48 describe_gauge!("ora_server_active_schedules", "Number of active schedules.");
49
50 describe_counter!(
51 "ora_server_schedules_cancelled_total",
52 "Number of schedules cancelled."
53 );
54
55 describe_counter!(
56 "ora_server_schedules_deleted_total",
57 "Number of schedules deleted."
58 );
59
60 let active_job_count = store
61 .count_jobs(JobQueryFilters {
62 active: Some(true),
63 ..Default::default()
64 })
65 .await?;
66
67 #[allow(clippy::cast_precision_loss)]
68 gauge!("ora_server_active_jobs").set(active_job_count as f64);
69
70 let active_schedules_count = store
71 .count_schedules(ScheduleQueryFilters {
72 active: Some(true),
73 ..Default::default()
74 })
75 .await?;
76
77 #[allow(clippy::cast_precision_loss)]
78 gauge!("ora_server_active_schedules").set(active_schedules_count as f64);
79
80 let mut job_job_types = HashMap::default();
81 let mut execution_jobs = HashMap::default();
82
83 while let Some(event) = events.next().await {
84 match event.kind {
85 crate::AuditEventKind::JobAdded {
86 job_id,
87 job_type_id,
88 } => {
89 counter!(
90 "ora_server_jobs_added_total",
91 "job_type_id" => job_type_id.clone(),
92 )
93 .increment(1);
94
95 gauge!("ora_server_active_jobs").increment(1);
96
97 job_job_types.insert(job_id, job_type_id);
98 }
99 crate::AuditEventKind::JobCancelled { job_id } => {
100 gauge!("ora_server_active_jobs").decrement(1);
101
102 if let Some(job_type_id) = job_job_types.remove(&job_id) {
103 counter!(
104 "ora_server_jobs_cancelled_total",
105 "job_type_id" => job_type_id,
106 )
107 .increment(1);
108 }
109 }
110 crate::AuditEventKind::JobDeleted { .. } => {
111 counter!("ora_server_jobs_deleted_total").increment(1);
112 }
113 crate::AuditEventKind::ExecutionAdded {
114 execution_id,
115 job_id,
116 ..
117 } => {
118 execution_jobs.insert(execution_id, job_id);
119 }
120
121 crate::AuditEventKind::ExecutionSucceeded { execution_id } => {
122 gauge!("ora_server_active_jobs").decrement(1);
123
124 if let Some(job_id) = execution_jobs.remove(&execution_id) {
125 if let Some(job_type_id) = job_job_types.remove(&job_id) {
126 counter!(
127 "ora_server_jobs_succeeded_total",
128 "job_type_id" => job_type_id,
129 )
130 .increment(1);
131 }
132 }
133 }
134 crate::AuditEventKind::ExecutionFailed {
135 execution_id,
136 terminal,
137 } => {
138 if terminal {
139 gauge!("ora_server_active_jobs").decrement(1);
140
141 if let Some(job_id) = execution_jobs.remove(&execution_id) {
142 if let Some(job_type_id) = job_job_types.remove(&job_id) {
143 counter!(
144 "ora_server_jobs_failed_total",
145 "job_type_id" => job_type_id,
146 )
147 .increment(1);
148 }
149 }
150 }
151 }
152 crate::AuditEventKind::ExecutorConnected { .. } => {
153 gauge!("ora_server_executors_connected").increment(1);
154 }
155 crate::AuditEventKind::ExecutorDisconnected { .. } => {
156 gauge!("ora_server_executors_connected").decrement(1);
157 }
158 crate::AuditEventKind::ScheduleAdded { .. } => {
159 counter!("ora_server_schedules_added_total").increment(1);
160 gauge!("ora_server_active_schedules").increment(1);
161 }
162 crate::AuditEventKind::ScheduleCancelled { .. } => {
163 counter!("ora_server_schedules_cancelled_total").increment(1);
164 gauge!("ora_server_active_schedules").decrement(1);
165 }
166 crate::AuditEventKind::ScheduleUnschedulable { .. } => {
167 gauge!("ora_server_active_schedules").decrement(1);
168 }
169 crate::AuditEventKind::ScheduleDeleted { .. } => {
170 counter!("ora_server_schedules_deleted_total").increment(1);
171 }
172 crate::AuditEventKind::SnapshotExported => {
173 counter!("ora_server_snapshots_exported_total").increment(1);
174 }
175 crate::AuditEventKind::SnapshotImported => {
176 counter!("ora_server_snapshots_imported_total").increment(1);
177 }
178 crate::AuditEventKind::ExecutionReady { .. }
179 | crate::AuditEventKind::ExecutionAssigned { .. }
180 | crate::AuditEventKind::ExecutionStarted { .. } => {}
181 }
182 }
183
184 Ok(())
185}