bpf_loader_lib/skeleton/poller/
mod.rs

1//!  SPDX-License-Identifier: MIT
2//!
3//! Copyright (c) 2023, eunomia-bpf
4//! All rights reserved.
5//!
6
7use std::{
8    sync::{
9        atomic::{AtomicBool, Ordering},
10        Arc,
11    },
12    time::Duration,
13};
14
15use crate::{
16    export_event::{
17        EventExporter, ExporterInternalImplementation, InternalBufferValueEventProcessor,
18        InternalSampleMapProcessor,
19    },
20    meta::MapSampleMeta,
21};
22use anyhow::anyhow;
23use anyhow::{bail, Context, Result};
24use libbpf_rs::{Map, MapFlags, PerfBuffer, PerfBufferBuilder, RingBuffer, RingBufferBuilder};
25use log::error;
26
27use super::BpfSkeleton;
28#[macro_export]
29macro_rules! program_poll_loop {
30    ($handle: expr, $blk: block) => {{
31        use log::info;
32        use std::time::Duration;
33        info!("Running ebpf program...");
34        while !$handle.should_terminate() {
35            while $handle.should_pause() {
36                std::hint::spin_loop();
37                std::thread::sleep(Duration::from_millis(1));
38            }
39            if $handle.should_terminate() {
40                info!("Program terminated");
41                break;
42            }
43            $blk;
44        }
45        info!("Program exited");
46    }};
47}
48#[ouroboros::self_referencing]
49pub(crate) struct RingBufPollerContext {
50    exporter: Arc<EventExporter>,
51    #[borrows(exporter)]
52    event_processor: &'this dyn InternalBufferValueEventProcessor,
53    #[borrows(event_processor)]
54    #[covariant]
55    ringbuf: RingBuffer<'this>,
56    poll_timeout_ms: u64,
57}
58
59#[ouroboros::self_referencing]
60pub(crate) struct PerfEventPollerContext {
61    exporter: Arc<EventExporter>,
62    #[borrows(exporter)]
63    event_processor: &'this dyn InternalBufferValueEventProcessor,
64    error_flag: AtomicBool,
65    #[borrows(event_processor, error_flag)]
66    #[covariant]
67    perf: PerfBuffer<'this>,
68    poll_timeout_ms: u64,
69}
70#[ouroboros::self_referencing]
71pub(crate) struct SampleMapPollerContext<'a> {
72    map: &'a Map,
73    exporter: Arc<EventExporter>,
74    sample_config: &'a MapSampleMeta,
75    #[borrows(exporter)]
76    event_processor: &'this dyn InternalSampleMapProcessor,
77}
78
79pub(crate) enum Poller<'a> {
80    RingBuf(RingBufPollerContext),
81    PerfEvent(PerfEventPollerContext),
82    SampleMap(SampleMapPollerContext<'a>),
83}
84
85impl<'a> Drop for Poller<'a> {
86    fn drop(&mut self) {
87        if let Poller::SampleMap(ctx) = self {
88            if ctx.borrow_sample_config().clear_map {
89                // Clean up the map
90                let keys = ctx.borrow_map().keys().collect::<Vec<_>>();
91                for key in keys.into_iter() {
92                    ctx.borrow_map().delete(&key).ok();
93                }
94            }
95        }
96    }
97}
98
99impl<'a> Poller<'a> {
100    pub(crate) fn poll(&self) -> Result<()> {
101        match self {
102            Poller::RingBuf(rb) => {
103                rb.borrow_ringbuf()
104                    .poll(Duration::from_millis(*rb.borrow_poll_timeout_ms()))
105                    .map_err(|e| anyhow!("Failed to poll ringbuf: {}, see logs for details", e))?;
106            }
107            Poller::PerfEvent(ctx) => {
108                ctx.borrow_perf()
109                    .poll(Duration::from_millis(*ctx.borrow_poll_timeout_ms()))
110                    .map_err(|e| anyhow!("Failed to poll perf event: {}", e))?;
111                if ctx.borrow_error_flag().load(Ordering::Relaxed) {
112                    bail!("Failed to poll perf event. See log for details");
113                }
114            }
115            Poller::SampleMap(ctx) => {
116                for key in ctx.borrow_map().keys() {
117                    let value = ctx
118                        .borrow_map()
119                        .lookup(&key, MapFlags::empty())
120                        .map_err(|e| {
121                            anyhow!("Failed to lookup value of the key `{:?}`: {}", key, e)
122                        })?
123                        .ok_or_else(|| anyhow!("Value of key `{:?}` should exist", key))?;
124                    ctx.borrow_event_processor()
125                        .handle_event(&key, &value)
126                        .with_context(|| anyhow!("Failed to handle event"))?;
127                }
128                std::thread::sleep(Duration::from_millis(
129                    ctx.borrow_sample_config().interval as u64,
130                ));
131            }
132        };
133        Ok(())
134    }
135}
136
137impl BpfSkeleton {
138    #[inline]
139    pub(crate) fn wait_for_no_export_program(&self) -> Result<()> {
140        program_poll_loop!(self.handle, {
141            std::hint::spin_loop();
142            std::thread::sleep(Duration::from_millis(1));
143        });
144        Ok(())
145    }
146
147    pub(crate) fn build_ringbuf_poller(
148        &self,
149        map: &Map,
150        exporter: Arc<EventExporter>,
151    ) -> Result<RingBufPollerContext> {
152        let ctx = RingBufPollerContextTryBuilder {
153            exporter,
154            event_processor_builder: |v: &Arc<EventExporter>| {
155                let event_processor = match &v.internal_impl {
156                    ExporterInternalImplementation::BufferValueProcessor {
157                        event_processor,
158                        ..
159                    } => &**event_processor,
160                    _ => bail!("Expected the exporter uses ringbuf processor"),
161                };
162                Ok(event_processor)
163            },
164            ringbuf_builder: |event_processor| {
165                let mut builder = RingBufferBuilder::new();
166                builder
167                    .add(map, |data: &[u8]| {
168                        if let Err(e) = event_processor.handle_event(data) {
169                            error!("Failed to process event: \n{:?}", e);
170                            -1
171                        } else {
172                            0
173                        }
174                    })
175                    .with_context(|| anyhow!("Failed to add ringbuf callback"))?;
176
177                let ringbuf = builder
178                    .build()
179                    .with_context(|| anyhow!("Failed to build ringbuf poller"))?;
180                Ok(ringbuf)
181            },
182            poll_timeout_ms: self.meta.poll_timeout_ms as u64,
183        }
184        .try_build()?;
185
186        Ok(ctx)
187    }
188    #[inline]
189    pub(crate) fn build_perfevent_poller(
190        &self,
191        map: &Map,
192        exporter: Arc<EventExporter>,
193    ) -> Result<PerfEventPollerContext> {
194        let ctx = PerfEventPollerContextTryBuilder {
195            exporter,
196            error_flag: AtomicBool::new(false),
197            event_processor_builder: |v: &Arc<EventExporter>| {
198                let event_processor = match &v.internal_impl {
199                    ExporterInternalImplementation::BufferValueProcessor {
200                        event_processor,
201                        ..
202                    } => &**event_processor,
203                    _ => bail!("Expected the exporter uses ringbuf processor"),
204                };
205                Ok(event_processor)
206            },
207            perf_builder: |processor, error_flag: &AtomicBool| {
208                let perf = PerfBufferBuilder::new(map)
209                    .sample_cb(|_cpu: i32, data: &[u8]| {
210                        if let Err(e) = processor.handle_event(data) {
211                            error!("Failed to handle event for perf array: \n{:?}", e);
212                            error_flag.store(true, Ordering::Relaxed);
213                        }
214                    })
215                    .build()
216                    .with_context(|| anyhow!("Failed to build perf event"))?;
217                Ok(perf)
218            },
219            poll_timeout_ms: self.meta.poll_timeout_ms as u64,
220        }
221        .try_build()?;
222        Ok(ctx)
223    }
224    #[inline]
225    pub(crate) fn build_sample_map_poller<'a>(
226        &self,
227        map: &'a Map,
228        exporter: Arc<EventExporter>,
229        sample_config: &'a MapSampleMeta,
230    ) -> Result<SampleMapPollerContext<'a>> {
231        let ctx = SampleMapPollerContextTryBuilder {
232            exporter,
233            event_processor_builder: |v| {
234                let event_processor = match &v.internal_impl {
235                    ExporterInternalImplementation::KeyValueMapProcessor {
236                        event_processor,
237                        ..
238                    } => &**event_processor,
239                    _ => bail!("Expected the exporter uses key-value processor"),
240                };
241                Ok(event_processor)
242            },
243            map,
244            sample_config,
245        }
246        .try_build()?;
247        Ok(ctx)
248    }
249}