aya_metrics/
lib.rs

1//! Gets metrics from an eBPF program!
2//!
3//! This is a generalized user space implementation to collect custom metrics from an eBPF program.
4//!
5//! The module provides the [EbpfMetrics] type, which reads counters created in eBPF and emits them using the [metrics]
6//! crate. Any implementation of the [metrics::recorder::Recorder] trait can be used once it is set as the global recorder.
7//!
8//! # Example:
9//!
10//! Define counters:
11//!
12//! ```
13//! # use std::time::Duration;
14//! # use aya_metrics::{EbpfMetrics, Dimension, Metric};
15//! # use metrics::{Label, Unit};
16//!
17//! #[derive(Copy, Clone)]
18//! enum MyCounter {
19//!     Packets,
20//!     Bytes,
21//! }
22//!
23//! impl aya_metrics_common::Counter for MyCounter {
24//!     fn name(self) -> String {
25//!         match self {
26//!             MyCounter::Packets => "packets_counter".to_string(),
27//!             MyCounter::Bytes => "bytes_counter".to_string(),
28//!         }
29//!     }
30//!
31//!     fn index(&self) -> u32 {
32//!         match self {
33//!             MyCounter::Packets => 0,
34//!             MyCounter::Bytes => 1,
35//!         }
36//!     }
37//! }
38//!
39//! let metrics = vec![
40//!     Metric::new(
41//!         MyCounter::Packets,
42//!         Unit::Count,
43//!         vec![
44//!             Dimension::By(vec![]),
45//!             Dimension::By(vec![Label::new("hostname", "test.hostname")]),
46//!         ]
47//!     )
48//! ];
49//! ```
50//!
51//! Emit metrics:
52//!
53//! ```ignore
54//! # let mut bpf = aya::Ebpf::load(&[]).unwrap();
55//! // start emitting metrics using the global recorder
56//! EbpfMetrics::new(&mut bpf, metrics, Duration::from_secs(60)).unwrap();
57//! ```
58//!
59//! With the following eBPF code:
60//!
61//! ```ignore
62//! use aya_metrics_common::metrics::{counter, Counter};
63//! use my_crate::MyCounter;
64//!
65//! counter(MyCounter::Packets, 1);
66//! ```
67//!
68use std::{io, sync::Arc};
69
70#[cfg(not(feature = "mocks"))]
71use aya::Ebpf;
72use aya::{
73    maps::MapError,
74    util::{nr_cpus, online_cpus},
75};
76use aya_metrics_common::Meter;
77#[cfg(feature = "mocks")]
78use aya_metrics_mocks::{Ebpf, PerCpuArray};
79use futures::{lock::Mutex, stream::FuturesUnordered, StreamExt};
80use metrics::{Counter, Label, Unit};
81use thiserror::Error;
82use tokio::time::{self, Duration};
83
84#[cfg(not(feature = "mocks"))]
85type PerCpuArray<V> = aya::maps::PerCpuArray<aya::maps::MapData, V>;
86
87type AdditionalLabels = Vec<Label>;
88
89const METRIC_LABEL_CPU: &str = "cpu";
90
91/// Defines the dimension of a particular [`Metric`].
92#[derive(Clone, Debug)]
93pub enum Dimension {
94    /// Dimension with additional labels.
95    By(AdditionalLabels),
96    /// Dimension with cpu and additional labels.
97    ByCpu(AdditionalLabels),
98}
99
100type Dimensions = Vec<Dimension>;
101
102/// Defines a metric that [`EbpfMetrics`] can report on.
103#[derive(Debug)]
104pub struct Metric<M: Meter> {
105    /// The meter to take values from.
106    meter: M,
107    /// The unit with which to emit the metric.
108    unit: Unit,
109    /// The dimensions with which to emit the metric.
110    dimensions: Dimensions,
111}
112
113impl<M: Meter> Metric<M> {
114    /// Create a new [`Metric`]
115    pub fn new(meter: M, unit: Unit, dimensions: Dimensions) -> Self {
116        Metric {
117            meter,
118            unit,
119            dimensions,
120        }
121    }
122}
123
124/// Emits custom metrics generated by an eBPF program using the [metrics] crate.
125pub struct EbpfMetrics<M: Meter> {
126    counters: PerCpuArray<u64>,
127    metrics: Vec<Metric<M>>,
128    period: Duration,
129}
130
131impl<M: Meter> EbpfMetrics<M> {
132    /// Create [`EbpfMetrics<M>`] from [`Ebpf`] for specific metrics.
133    ///
134    /// When `EbpfMetrics<M>::run()` is invoked metrics will be periodically emitted with the given recorder.
135    pub fn new(bpf: &mut Ebpf, metrics: Vec<Metric<M>>, period: Duration) -> Result<EbpfMetrics<M>, Error> {
136        // Take ownership of the BPF counters map
137        let counters = bpf
138            .take_map(M::kind().map_name())
139            .ok_or(aya::maps::MapError::InvalidName {
140                name: M::kind().map_name().to_string(),
141            })
142            .and_then(PerCpuArray::try_from)
143            .map_err(Error::MapError)?;
144
145        Ok(EbpfMetrics {
146            counters,
147            metrics,
148            period,
149        })
150    }
151
152    /// Periodically emit metrics
153    pub async fn run(self) -> Result<(), Error> {
154        // Share the counters amongst the futures.
155        let counters = Arc::new(Mutex::new(self.counters));
156
157        // Create a future for each metric.
158        let mut futures = FuturesUnordered::new();
159        for metric in self.metrics {
160            futures.push(EbpfMetrics::emit_metrics(counters.clone(), metric, self.period))
161        }
162
163        // Gracefully terminate if any future unexpectedly terminates and propagate any errors.
164        // Remaining futures will be cancelled when dropped.
165        futures.select_next_some().await
166    }
167
168    async fn emit_metrics(
169        bpf_counters: Arc<Mutex<PerCpuArray<u64>>>,
170        metric: Metric<M>,
171        period: Duration,
172    ) -> Result<(), Error> {
173        metrics::describe_counter!(metric.meter.name(), metric.unit.clone(), metric.meter.description());
174
175        let mut interval = time::interval(period);
176        let cpu_count = nr_cpus().map_err(|(_, err)| Error::InvalidPossibleCpu(err))?;
177        let cpus = online_cpus().map_err(|(_, err)| Error::InvalidOnlineCpu(err))?;
178
179        // Pre-register all counters and store their handles for better performance
180        let mut counter_handles = Vec::new();
181        let mut counter_handles_by_cpu = Vec::new();
182        for dimension in &metric.dimensions {
183            match dimension {
184                Dimension::By(labels) => {
185                    let handle = metrics::counter!(metric.meter.name(), labels.clone());
186                    counter_handles.push(handle);
187                }
188                Dimension::ByCpu(labels) => {
189                    let mut handles = vec![Counter::noop(); cpu_count];
190                    for cpu_id in &cpus {
191                        let cpu_label = Label::new(METRIC_LABEL_CPU, cpu_id.to_string());
192                        let mut labels = labels.clone();
193                        labels.push(cpu_label);
194                        let handle = metrics::counter!(metric.meter.name(), labels.clone());
195                        handles[*cpu_id as usize] = handle;
196                    }
197                    counter_handles_by_cpu.push(handles);
198                }
199            }
200        }
201
202        // Store the previous state of the counters to calculate the delta for the next period
203        let mut prev_values = vec![0u64; cpu_count];
204
205        loop {
206            interval.tick().await;
207
208            // Get counter values per CPU
209            let counter_values = {
210                let guard = bpf_counters.lock().await;
211                guard.get(&metric.meter.index(), 0).map_err(Error::MapError)?
212            };
213
214            // Keep a sum across CPUs
215            let mut delta_sum = 0;
216
217            // Iterate over each CPU
218            for cpu_id in &cpus {
219                let cpu_id = *cpu_id as usize;
220                // Get the latest value for this CPU
221                if let Some(value) = counter_values.get::<usize>(cpu_id) {
222                    let value = *value;
223                    let prev_value = prev_values[cpu_id];
224                    let delta = value - prev_value;
225
226                    // Update the sum across CPUs
227                    delta_sum += delta;
228                    // Store the state for the next period
229                    prev_values[cpu_id] = value;
230
231                    // Emit metric by cpu number with any additional labels
232                    for handles in &mut counter_handles_by_cpu {
233                        handles[cpu_id].increment(delta);
234                    }
235                } // GRCOV_IGNORE_LINE (apparently there is a hidden else block!)
236            }
237
238            // Emit metric with any additional labels
239            for handle in &counter_handles {
240                handle.increment(delta_sum);
241            }
242        }
243    }
244}
245
246/// Errors occuring from working with EbpfMetrics
247#[derive(Error, Debug)]
248pub enum Error {
249    /// Errors occuring while reading maps
250    #[error("error opening metric array")]
251    MapError(#[from] MapError),
252
253    /// Errors occuring while listing possible CPUs
254    #[error("invalid /sys/devices/system/cpu/possible format")]
255    InvalidPossibleCpu(#[source] io::Error),
256
257    /// Errors occuring while listing online CPUs
258    #[error("invalid /sys/devices/system/cpu/online format")]
259    InvalidOnlineCpu(#[source] io::Error),
260}
261
262// Only compile mocks when testing!
263#[cfg(test)]
264mod mocks;
265
266// GRCOV_STOP_COVERAGE
267#[cfg(test)]
268mod test {
269    use super::*;
270    use aya::maps::PerCpuValues;
271    use metrics::Unit;
272    use metrics::{Key, Label};
273
274    use mocks::metrics::MockRecorder;
275
276    const HOSTNAME: &str = "this.hostname.test";
277    const INTERFACE: &str = "tst0";
278    const METRIC_LABEL_HOSTNAME: &str = "hostname";
279    const METRIC_LABEL_INTERFACE: &str = "interface";
280
281    #[derive(Copy, Clone, Debug)]
282    enum MockCounter {
283        Packets,
284    }
285
286    impl aya_metrics_common::Counter for MockCounter {
287        fn name(self) -> String {
288            match self {
289                MockCounter::Packets => "packets".to_string(),
290            }
291        }
292
293        fn index(&self) -> u32 {
294            match self {
295                MockCounter::Packets => 0,
296            }
297        }
298    }
299
300    fn get_packets_metric() -> Metric<MockCounter> {
301        Metric::new(
302            MockCounter::Packets,
303            Unit::Count,
304            vec![
305                Dimension::By(vec![]),
306                Dimension::By(vec![Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME.to_string())]),
307                Dimension::ByCpu(vec![
308                    Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME.to_string()),
309                    Label::new(METRIC_LABEL_INTERFACE, INTERFACE.to_string()),
310                ]),
311            ],
312        )
313    }
314
315    #[tokio::test(start_paused = true)]
316    async fn test_run_registers_counters() -> Result<(), anyhow::Error> {
317        let recorder = MockRecorder::new();
318        let _guard = metrics::set_default_local_recorder(&recorder);
319
320        let metrics = EbpfMetrics::new(&mut Ebpf {}, vec![get_packets_metric()], Duration::from_secs(60))?;
321        tokio::spawn(async move { metrics.run().await });
322
323        // Give the task a chance to run
324        tokio::task::yield_now().await;
325
326        // All counters get incremented once, immediately
327        expect_counters(&recorder, 0)?;
328
329        Ok(())
330    }
331
332    #[tokio::test(start_paused = true)]
333    async fn test_run_failure_when_empty_map() {
334        let empty_per_cpu_array = PerCpuArray::new(0, 0u64);
335        let metrics = EbpfMetrics {
336            counters: empty_per_cpu_array,
337            metrics: vec![get_packets_metric()],
338            period: Duration::from_secs(60),
339        };
340        let handle = tokio::spawn(async move { metrics.run().await });
341
342        // Give the task a chance to run
343        tokio::task::yield_now().await;
344        handle
345            .await
346            .expect("Task should complete")
347            .expect_err("Expected error opening metric array");
348    }
349
350    #[tokio::test(start_paused = true)]
351    async fn test_emit_metrics_registers_counters() -> Result<(), anyhow::Error> {
352        let recorder = MockRecorder::new();
353        let _guard = metrics::set_default_local_recorder(&recorder);
354
355        tokio::spawn(EbpfMetrics::emit_metrics(
356            Arc::new(Mutex::new(PerCpuArray::new(1, 0u64))),
357            get_packets_metric(),
358            Duration::from_secs(60),
359        ));
360
361        // Give the task a chance to run
362        tokio::task::yield_now().await;
363
364        // All counters get incremented once, immediately
365        expect_counters(&recorder, 0)?;
366
367        Ok(())
368    }
369
370    #[tokio::test(start_paused = true)]
371    async fn test_emit_metrics_increments_counters() -> Result<(), anyhow::Error> {
372        let recorder = MockRecorder::new();
373        let _guard = metrics::set_default_local_recorder(&recorder);
374
375        let mut per_cpu_array = PerCpuArray::new(1, 0u64);
376
377        tokio::spawn(EbpfMetrics::emit_metrics(
378            Arc::new(Mutex::new(per_cpu_array.clone())),
379            get_packets_metric(),
380            Duration::from_secs(60),
381        ));
382
383        // Give the task a chance to run
384        tokio::task::yield_now().await;
385        // Validate the initial registration and increment (time=0s)
386        expect_counters(&recorder, 0)?;
387
388        // Update the counters
389        per_cpu_array.set(0, PerCpuValues::try_from(vec![42u64; nr_cpus().map_err(|(_, err)| err)?])?, 0)?;
390        // Time travel 60 seconds forward!
391        time::advance(Duration::from_secs(60)).await;
392        // Give the task a chance to run
393        tokio::task::yield_now().await;
394        // Validate the second increment (time=60s)
395        expect_counters(&recorder, 42)?;
396
397        // Update the counters
398        per_cpu_array.set(0, PerCpuValues::try_from(vec![50u64; nr_cpus().map_err(|(_, err)| err)?])?, 0)?;
399        // Time travel 60 seconds forward!
400        time::advance(Duration::from_secs(60)).await;
401        // Give the task a chance to run
402        tokio::task::yield_now().await;
403        // Validate the third increment (time=120s)
404        expect_counters(&recorder, 42 + 8)?;
405
406        Ok(())
407    }
408
409    fn expect_counters(recorder: &MockRecorder, packets: u64) -> Result<(), anyhow::Error> {
410        let actual = recorder
411            .get_counter(&Key::from_parts(
412                MockCounter::Packets.name(),
413                vec![Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME)],
414            ))
415            .expect("Packet counter should be registered with hostname label");
416        assert_eq!(actual, packets * online_cpus().map_err(|(_, err)| err)?.len() as u64);
417
418        let actual = recorder
419            .get_counter(&Key::from_parts(MockCounter::Packets.name(), vec![]))
420            .expect("Packet counter should be registered with no labels");
421        assert_eq!(actual, packets * online_cpus().map_err(|(_, err)| err)?.len() as u64);
422
423        online_cpus().map_err(|(_, err)| err)?.iter().for_each(|cpu_id| {
424            let actual = recorder
425                .get_counter(&Key::from_parts(
426                    MockCounter::Packets.name(),
427                    vec![
428                        Label::new(METRIC_LABEL_HOSTNAME, HOSTNAME),
429                        Label::new(METRIC_LABEL_INTERFACE, INTERFACE),
430                        Label::new(METRIC_LABEL_CPU, cpu_id.to_string()),
431                    ],
432                ))
433                .expect("Packet counter should be registered with hostname, interface and cpu labels");
434            assert_eq!(actual, packets);
435        });
436
437        Ok(())
438    }
439}