1use std::{io, sync::Arc};
69
70#[cfg(not(feature = "mocks"))]
71use aya::Ebpf;
72use aya::{
73 maps::MapError,
74 util::{nr_cpus, online_cpus},
75};
76use aya_metrics_common::Meter;
77#[cfg(feature = "mocks")]
78use aya_metrics_mocks::{Ebpf, PerCpuArray};
79use futures::{lock::Mutex, stream::FuturesUnordered, StreamExt};
80use metrics::{Counter, Label, Unit};
81use thiserror::Error;
82use tokio::time::{self, Duration};
83
84#[cfg(not(feature = "mocks"))]
85type PerCpuArray<V> = aya::maps::PerCpuArray<aya::maps::MapData, V>;
86
87type AdditionalLabels = Vec<Label>;
88
89const METRIC_LABEL_CPU: &str = "cpu";
90
91#[derive(Clone, Debug)]
93pub enum Dimension {
94 By(AdditionalLabels),
96 ByCpu(AdditionalLabels),
98}
99
100type Dimensions = Vec<Dimension>;
101
102#[derive(Debug)]
104pub struct Metric<M: Meter> {
105 meter: M,
107 unit: Unit,
109 dimensions: Dimensions,
111}
112
113impl<M: Meter> Metric<M> {
114 pub fn new(meter: M, unit: Unit, dimensions: Dimensions) -> Self {
116 Metric {
117 meter,
118 unit,
119 dimensions,
120 }
121 }
122}
123
124pub struct EbpfMetrics<M: Meter> {
126 counters: PerCpuArray<u64>,
127 metrics: Vec<Metric<M>>,
128 period: Duration,
129}
130
131impl<M: Meter> EbpfMetrics<M> {
132 pub fn new(bpf: &mut Ebpf, metrics: Vec<Metric<M>>, period: Duration) -> Result<EbpfMetrics<M>, Error> {
136 let counters = bpf
138 .take_map(M::kind().map_name())
139 .ok_or(aya::maps::MapError::InvalidName {
140 name: M::kind().map_name().to_string(),
141 })
142 .and_then(PerCpuArray::try_from)
143 .map_err(Error::MapError)?;
144
145 Ok(EbpfMetrics {
146 counters,
147 metrics,
148 period,
149 })
150 }
151
152 pub async fn run(self) -> Result<(), Error> {
154 let counters = Arc::new(Mutex::new(self.counters));
156
157 let mut futures = FuturesUnordered::new();
159 for metric in self.metrics {
160 futures.push(EbpfMetrics::emit_metrics(counters.clone(), metric, self.period))
161 }
162
163 futures.select_next_some().await
166 }
167
168 async fn emit_metrics(
169 bpf_counters: Arc<Mutex<PerCpuArray<u64>>>,
170 metric: Metric<M>,
171 period: Duration,
172 ) -> Result<(), Error> {
173 metrics::describe_counter!(metric.meter.name(), metric.unit.clone(), metric.meter.description());
174
175 let mut interval = time::interval(period);
176 let cpu_count = nr_cpus().map_err(|(_, err)| Error::InvalidPossibleCpu(err))?;
177 let cpus = online_cpus().map_err(|(_, err)| Error::InvalidOnlineCpu(err))?;
178
179 let mut counter_handles = Vec::new();
181 let mut counter_handles_by_cpu = Vec::new();
182 for dimension in &metric.dimensions {
183 match dimension {
184 Dimension::By(labels) => {
185 let handle = metrics::counter!(metric.meter.name(), labels.clone());
186 counter_handles.push(handle);
187 }
188 Dimension::ByCpu(labels) => {
189 let mut handles = vec![Counter::noop(); cpu_count];
190 for cpu_id in &cpus {
191 let cpu_label = Label::new(METRIC_LABEL_CPU, cpu_id.to_string());
192 let mut labels = labels.clone();
193 labels.push(cpu_label);
194 let handle = metrics::counter!(metric.meter.name(), labels.clone());
195 handles[*cpu_id as usize] = handle;
196 }
197 counter_handles_by_cpu.push(handles);
198 }
199 }
200 }
201
202 let mut prev_values = vec![0u64; cpu_count];
204
205 loop {
206 interval.tick().await;
207
208 let counter_values = {
210 let guard = bpf_counters.lock().await;
211 guard.get(&metric.meter.index(), 0).map_err(Error::MapError)?
212 };
213
214 let mut delta_sum = 0;
216
217 for cpu_id in &cpus {
219 let cpu_id = *cpu_id as usize;
220 if let Some(value) = counter_values.get::<usize>(cpu_id) {
222 let value = *value;
223 let prev_value = prev_values[cpu_id];
224 let delta = value - prev_value;
225
226 delta_sum += delta;
228 prev_values[cpu_id] = value;
230
231 for handles in &mut counter_handles_by_cpu {
233 handles[cpu_id].increment(delta);
234 }
235 } }
237
238 for handle in &counter_handles {
240 handle.increment(delta_sum);
241 }
242 }
243 }
244}
245
246#[derive(Error, Debug)]
248pub enum Error {
249 #[error("error opening metric array")]
251 MapError(#[from] MapError),
252
253 #[error("invalid /sys/devices/system/cpu/possible format")]
255 InvalidPossibleCpu(#[source] io::Error),
256
257 #[error("invalid /sys/devices/system/cpu/online format")]
259 InvalidOnlineCpu(#[source] io::Error),
260}
261
262#[cfg(test)]
264mod mocks;
265
266#[cfg(test)]
268mod test {
269 use super::*;
270 use aya::maps::PerCpuValues;
271 use metrics::Unit;
272 use metrics::{Key, Label};
273
274 use mocks::metrics::MockRecorder;
275
276 const HOSTNAME: &str = "this.hostname.test";
277 const INTERFACE: &str = "tst0";
278 const METRIC_LABEL_HOSTNAME: &str = "hostname";
279 const METRIC_LABEL_INTERFACE: &str = "interface";
280
281 #[derive(Copy, Clone, Debug)]
282 enum MockCounter {
283 Packets,
284 }
285
286 impl aya_metrics_common::Counter for MockCounter {
287 fn name(self) -> String {
288 match self {
289 MockCounter::Packets => "packets".to_string(),
290 }
291 }
292
293 fn index(&self) -> u32 {
294 match self {
295 MockCounter::Packets => 0,
296 }
297 }
298 }
299
300 fn get_packets_metric() -> Metric<MockCounter> {
301 Metric::new(
302 MockCounter::Packets,
303 Unit::Count,
304 vec![
305 Dimension::By(vec![]),
306 Dimension::By(vec![Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME.to_string())]),
307 Dimension::ByCpu(vec![
308 Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME.to_string()),
309 Label::new(METRIC_LABEL_INTERFACE, INTERFACE.to_string()),
310 ]),
311 ],
312 )
313 }
314
315 #[tokio::test(start_paused = true)]
316 async fn test_run_registers_counters() -> Result<(), anyhow::Error> {
317 let recorder = MockRecorder::new();
318 let _guard = metrics::set_default_local_recorder(&recorder);
319
320 let metrics = EbpfMetrics::new(&mut Ebpf {}, vec![get_packets_metric()], Duration::from_secs(60))?;
321 tokio::spawn(async move { metrics.run().await });
322
323 tokio::task::yield_now().await;
325
326 expect_counters(&recorder, 0)?;
328
329 Ok(())
330 }
331
332 #[tokio::test(start_paused = true)]
333 async fn test_run_failure_when_empty_map() {
334 let empty_per_cpu_array = PerCpuArray::new(0, 0u64);
335 let metrics = EbpfMetrics {
336 counters: empty_per_cpu_array,
337 metrics: vec![get_packets_metric()],
338 period: Duration::from_secs(60),
339 };
340 let handle = tokio::spawn(async move { metrics.run().await });
341
342 tokio::task::yield_now().await;
344 handle
345 .await
346 .expect("Task should complete")
347 .expect_err("Expected error opening metric array");
348 }
349
350 #[tokio::test(start_paused = true)]
351 async fn test_emit_metrics_registers_counters() -> Result<(), anyhow::Error> {
352 let recorder = MockRecorder::new();
353 let _guard = metrics::set_default_local_recorder(&recorder);
354
355 tokio::spawn(EbpfMetrics::emit_metrics(
356 Arc::new(Mutex::new(PerCpuArray::new(1, 0u64))),
357 get_packets_metric(),
358 Duration::from_secs(60),
359 ));
360
361 tokio::task::yield_now().await;
363
364 expect_counters(&recorder, 0)?;
366
367 Ok(())
368 }
369
370 #[tokio::test(start_paused = true)]
371 async fn test_emit_metrics_increments_counters() -> Result<(), anyhow::Error> {
372 let recorder = MockRecorder::new();
373 let _guard = metrics::set_default_local_recorder(&recorder);
374
375 let mut per_cpu_array = PerCpuArray::new(1, 0u64);
376
377 tokio::spawn(EbpfMetrics::emit_metrics(
378 Arc::new(Mutex::new(per_cpu_array.clone())),
379 get_packets_metric(),
380 Duration::from_secs(60),
381 ));
382
383 tokio::task::yield_now().await;
385 expect_counters(&recorder, 0)?;
387
388 per_cpu_array.set(0, PerCpuValues::try_from(vec![42u64; nr_cpus().map_err(|(_, err)| err)?])?, 0)?;
390 time::advance(Duration::from_secs(60)).await;
392 tokio::task::yield_now().await;
394 expect_counters(&recorder, 42)?;
396
397 per_cpu_array.set(0, PerCpuValues::try_from(vec![50u64; nr_cpus().map_err(|(_, err)| err)?])?, 0)?;
399 time::advance(Duration::from_secs(60)).await;
401 tokio::task::yield_now().await;
403 expect_counters(&recorder, 42 + 8)?;
405
406 Ok(())
407 }
408
409 fn expect_counters(recorder: &MockRecorder, packets: u64) -> Result<(), anyhow::Error> {
410 let actual = recorder
411 .get_counter(&Key::from_parts(
412 MockCounter::Packets.name(),
413 vec![Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME)],
414 ))
415 .expect("Packet counter should be registered with hostname label");
416 assert_eq!(actual, packets * online_cpus().map_err(|(_, err)| err)?.len() as u64);
417
418 let actual = recorder
419 .get_counter(&Key::from_parts(MockCounter::Packets.name(), vec![]))
420 .expect("Packet counter should be registered with no labels");
421 assert_eq!(actual, packets * online_cpus().map_err(|(_, err)| err)?.len() as u64);
422
423 online_cpus().map_err(|(_, err)| err)?.iter().for_each(|cpu_id| {
424 let actual = recorder
425 .get_counter(&Key::from_parts(
426 MockCounter::Packets.name(),
427 vec![
428 Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME),
429 Label::new(METRIC_LABEL_INTERFACE, INTERFACE),
430 Label::new(METRIC_LABEL_CPU, cpu_id.to_string()),
431 ],
432 ))
433 .expect("Packet counter should be registered with hostname, interface and cpu labels");
434 assert_eq!(actual, packets);
435 });
436
437 Ok(())
438 }
439}