bpf_loader_lib/skeleton/poller/
mod.rs1use std::{
8 sync::{
9 atomic::{AtomicBool, Ordering},
10 Arc,
11 },
12 time::Duration,
13};
14
15use crate::{
16 export_event::{
17 EventExporter, ExporterInternalImplementation, InternalBufferValueEventProcessor,
18 InternalSampleMapProcessor,
19 },
20 meta::MapSampleMeta,
21};
22use anyhow::anyhow;
23use anyhow::{bail, Context, Result};
24use libbpf_rs::{Map, MapFlags, PerfBuffer, PerfBufferBuilder, RingBuffer, RingBufferBuilder};
25use log::error;
26
27use super::BpfSkeleton;
28#[macro_export]
29macro_rules! program_poll_loop {
30 ($handle: expr, $blk: block) => {{
31 use log::info;
32 use std::time::Duration;
33 info!("Running ebpf program...");
34 while !$handle.should_terminate() {
35 while $handle.should_pause() {
36 std::hint::spin_loop();
37 std::thread::sleep(Duration::from_millis(1));
38 }
39 if $handle.should_terminate() {
40 info!("Program terminated");
41 break;
42 }
43 $blk;
44 }
45 info!("Program exited");
46 }};
47}
48#[ouroboros::self_referencing]
49pub(crate) struct RingBufPollerContext {
50 exporter: Arc<EventExporter>,
51 #[borrows(exporter)]
52 event_processor: &'this dyn InternalBufferValueEventProcessor,
53 #[borrows(event_processor)]
54 #[covariant]
55 ringbuf: RingBuffer<'this>,
56 poll_timeout_ms: u64,
57}
58
59#[ouroboros::self_referencing]
60pub(crate) struct PerfEventPollerContext {
61 exporter: Arc<EventExporter>,
62 #[borrows(exporter)]
63 event_processor: &'this dyn InternalBufferValueEventProcessor,
64 error_flag: AtomicBool,
65 #[borrows(event_processor, error_flag)]
66 #[covariant]
67 perf: PerfBuffer<'this>,
68 poll_timeout_ms: u64,
69}
70#[ouroboros::self_referencing]
71pub(crate) struct SampleMapPollerContext<'a> {
72 map: &'a Map,
73 exporter: Arc<EventExporter>,
74 sample_config: &'a MapSampleMeta,
75 #[borrows(exporter)]
76 event_processor: &'this dyn InternalSampleMapProcessor,
77}
78
79pub(crate) enum Poller<'a> {
80 RingBuf(RingBufPollerContext),
81 PerfEvent(PerfEventPollerContext),
82 SampleMap(SampleMapPollerContext<'a>),
83}
84
85impl<'a> Drop for Poller<'a> {
86 fn drop(&mut self) {
87 if let Poller::SampleMap(ctx) = self {
88 if ctx.borrow_sample_config().clear_map {
89 let keys = ctx.borrow_map().keys().collect::<Vec<_>>();
91 for key in keys.into_iter() {
92 ctx.borrow_map().delete(&key).ok();
93 }
94 }
95 }
96 }
97}
98
99impl<'a> Poller<'a> {
100 pub(crate) fn poll(&self) -> Result<()> {
101 match self {
102 Poller::RingBuf(rb) => {
103 rb.borrow_ringbuf()
104 .poll(Duration::from_millis(*rb.borrow_poll_timeout_ms()))
105 .map_err(|e| anyhow!("Failed to poll ringbuf: {}, see logs for details", e))?;
106 }
107 Poller::PerfEvent(ctx) => {
108 ctx.borrow_perf()
109 .poll(Duration::from_millis(*ctx.borrow_poll_timeout_ms()))
110 .map_err(|e| anyhow!("Failed to poll perf event: {}", e))?;
111 if ctx.borrow_error_flag().load(Ordering::Relaxed) {
112 bail!("Failed to poll perf event. See log for details");
113 }
114 }
115 Poller::SampleMap(ctx) => {
116 for key in ctx.borrow_map().keys() {
117 let value = ctx
118 .borrow_map()
119 .lookup(&key, MapFlags::empty())
120 .map_err(|e| {
121 anyhow!("Failed to lookup value of the key `{:?}`: {}", key, e)
122 })?
123 .ok_or_else(|| anyhow!("Value of key `{:?}` should exist", key))?;
124 ctx.borrow_event_processor()
125 .handle_event(&key, &value)
126 .with_context(|| anyhow!("Failed to handle event"))?;
127 }
128 std::thread::sleep(Duration::from_millis(
129 ctx.borrow_sample_config().interval as u64,
130 ));
131 }
132 };
133 Ok(())
134 }
135}
136
137impl BpfSkeleton {
138 #[inline]
139 pub(crate) fn wait_for_no_export_program(&self) -> Result<()> {
140 program_poll_loop!(self.handle, {
141 std::hint::spin_loop();
142 std::thread::sleep(Duration::from_millis(1));
143 });
144 Ok(())
145 }
146
147 pub(crate) fn build_ringbuf_poller(
148 &self,
149 map: &Map,
150 exporter: Arc<EventExporter>,
151 ) -> Result<RingBufPollerContext> {
152 let ctx = RingBufPollerContextTryBuilder {
153 exporter,
154 event_processor_builder: |v: &Arc<EventExporter>| {
155 let event_processor = match &v.internal_impl {
156 ExporterInternalImplementation::BufferValueProcessor {
157 event_processor,
158 ..
159 } => &**event_processor,
160 _ => bail!("Expected the exporter uses ringbuf processor"),
161 };
162 Ok(event_processor)
163 },
164 ringbuf_builder: |event_processor| {
165 let mut builder = RingBufferBuilder::new();
166 builder
167 .add(map, |data: &[u8]| {
168 if let Err(e) = event_processor.handle_event(data) {
169 error!("Failed to process event: \n{:?}", e);
170 -1
171 } else {
172 0
173 }
174 })
175 .with_context(|| anyhow!("Failed to add ringbuf callback"))?;
176
177 let ringbuf = builder
178 .build()
179 .with_context(|| anyhow!("Failed to build ringbuf poller"))?;
180 Ok(ringbuf)
181 },
182 poll_timeout_ms: self.meta.poll_timeout_ms as u64,
183 }
184 .try_build()?;
185
186 Ok(ctx)
187 }
188 #[inline]
189 pub(crate) fn build_perfevent_poller(
190 &self,
191 map: &Map,
192 exporter: Arc<EventExporter>,
193 ) -> Result<PerfEventPollerContext> {
194 let ctx = PerfEventPollerContextTryBuilder {
195 exporter,
196 error_flag: AtomicBool::new(false),
197 event_processor_builder: |v: &Arc<EventExporter>| {
198 let event_processor = match &v.internal_impl {
199 ExporterInternalImplementation::BufferValueProcessor {
200 event_processor,
201 ..
202 } => &**event_processor,
203 _ => bail!("Expected the exporter uses ringbuf processor"),
204 };
205 Ok(event_processor)
206 },
207 perf_builder: |processor, error_flag: &AtomicBool| {
208 let perf = PerfBufferBuilder::new(map)
209 .sample_cb(|_cpu: i32, data: &[u8]| {
210 if let Err(e) = processor.handle_event(data) {
211 error!("Failed to handle event for perf array: \n{:?}", e);
212 error_flag.store(true, Ordering::Relaxed);
213 }
214 })
215 .build()
216 .with_context(|| anyhow!("Failed to build perf event"))?;
217 Ok(perf)
218 },
219 poll_timeout_ms: self.meta.poll_timeout_ms as u64,
220 }
221 .try_build()?;
222 Ok(ctx)
223 }
224 #[inline]
225 pub(crate) fn build_sample_map_poller<'a>(
226 &self,
227 map: &'a Map,
228 exporter: Arc<EventExporter>,
229 sample_config: &'a MapSampleMeta,
230 ) -> Result<SampleMapPollerContext<'a>> {
231 let ctx = SampleMapPollerContextTryBuilder {
232 exporter,
233 event_processor_builder: |v| {
234 let event_processor = match &v.internal_impl {
235 ExporterInternalImplementation::KeyValueMapProcessor {
236 event_processor,
237 ..
238 } => &**event_processor,
239 _ => bail!("Expected the exporter uses key-value processor"),
240 };
241 Ok(event_processor)
242 },
243 map,
244 sample_config,
245 }
246 .try_build()?;
247 Ok(ctx)
248 }
249}