ghostscope_loader/
lib.rs

1//! GhostScope eBPF Loader
2//!
3//! This crate provides the `GhostScopeLoader` which manages the lifecycle of eBPF programs:
4//! - Loading eBPF bytecode into the kernel
5//! - Attaching/detaching uprobes to target binaries
6//! - Reading trace events from RingBuf or PerfEventArray
7//! - Managing BPF maps for process module offsets
8//!
9//! ## Architecture
10//!
11//! The loader supports two event output mechanisms:
12//! - **RingBuf**: Modern kernel (>= 5.8) continuous byte stream
13//! - **PerfEventArray**: Legacy kernel (< 5.8) per-CPU independent events
14//!
15//! Event parsing is handled by `ghostscope_protocol::StreamingTraceParser` which
16//! adapts to the event source type automatically.
17
18use aya::{
19    maps::{perf::PerfEventArray, MapData, RingBuf},
20    programs::{uprobe::UProbeLinkId, ProgramError, UProbe},
21    Ebpf, EbpfLoader, VerifierLogLevel,
22};
23use ghostscope_protocol::{ParsedTraceEvent, StreamingTraceParser, TraceContext};
24use log::log_enabled;
25use log::Level as LogLevel;
26use std::convert::TryInto;
27use std::future::poll_fn;
28use std::os::unix::io::AsRawFd;
29use std::os::unix::io::RawFd;
30use std::path::Path;
31use std::task::Poll;
32use tokio::io::unix::AsyncFd;
33use tokio::io::Interest;
34use tracing::{debug, error, info, warn};
35
36// Export kernel capabilities detection
37mod kernel_caps;
38pub use kernel_caps::{KernelCapabilities, KernelCapabilityError};
39
40// Export error types
41mod error;
42pub use error::{LoaderError, Result};
43
44// Internal uprobe module
45mod uprobe;
46use uprobe::UprobeAttachmentParams;
47
48// Use shared map types from ghostscope-process
49use ghostscope_process::maps::{proc_offsets_pin_dir, proc_offsets_pin_path};
50
51/// Event output map type wrapper
52enum EventMap {
53    RingBuf(RingBuf<MapData>),
54    PerfEventArray {
55        _map: PerfEventArray<MapData>,
56        cpu_buffers: Vec<PerfEventCpuBuffer>,
57    },
58}
59
60#[derive(Clone, Copy, Debug)]
61struct PerfBufferFd(RawFd);
62
63impl AsRawFd for PerfBufferFd {
64    fn as_raw_fd(&self) -> RawFd {
65        self.0
66    }
67}
68
69struct PerfEventCpuBuffer {
70    cpu_id: u32,
71    buffer: aya::maps::perf::PerfEventArrayBuffer<MapData>,
72    readiness: AsyncFd<PerfBufferFd>,
73}
74
75/// Compatibility shim that mimics Aya's newer attach location helper so we can keep
76/// a single call-site regardless of which `UProbe::attach` signature we compile against.
77enum UProbeAttachLocation<'a> {
78    AbsoluteOffset(u64),
79    Function(&'a str),
80}
81
82impl<'a> UProbeAttachLocation<'a> {
83    fn attach<T: AsRef<Path>>(
84        self,
85        program: &mut UProbe,
86        target: T,
87        pid: Option<i32>,
88    ) -> std::result::Result<UProbeLinkId, ProgramError> {
89        match self {
90            Self::AbsoluteOffset(offset) => program.attach(None, offset, target, pid),
91            Self::Function(fn_name) => program.attach(Some(fn_name), 0, target, pid),
92        }
93    }
94}
95
96pub fn hello() -> String {
97    format!("Loader: {}", ghostscope_compiler::hello())
98}
99
100/// Main eBPF program loader and manager
101///
102/// Manages the lifecycle of eBPF programs and provides methods for:
103/// - Loading eBPF bytecode
104/// - Attaching/detaching uprobes
105/// - Reading trace events
106/// - Managing BPF maps
107pub struct GhostScopeLoader {
108    /// Loaded eBPF program
109    bpf: Ebpf,
110    /// Event output map (RingBuf or PerfEventArray)
111    event_map: Option<EventMap>,
112    /// Active uprobe link
113    uprobe_link: Option<UProbeLinkId>,
114    /// Stored parameters for re-attaching uprobe
115    attachment_params: Option<UprobeAttachmentParams>,
116    /// Streaming parser for trace events
117    parser: StreamingTraceParser,
118    /// String table and metadata for parsing trace events
119    trace_context: Option<TraceContext>,
120    /// Optional override for PerfEventArray page count (per CPU buffer size in pages)
121    perf_page_count: Option<usize>,
122}
123
124impl std::fmt::Debug for GhostScopeLoader {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("GhostScopeLoader")
127            .field("bpf", &"<eBPF object>")
128            .field("event_map", &self.event_map.is_some())
129            .field("uprobe_attached", &self.uprobe_link.is_some())
130            .field("attachment_params", &self.attachment_params.is_some())
131            .finish()
132    }
133}
134
135impl GhostScopeLoader {
136    // ============================================================================
137    // Lifecycle Management
138    // ============================================================================
139
140    /// Create a new loader instance from eBPF bytecode
141    pub fn new(bytecode: &[u8]) -> Result<Self> {
142        info!(
143            "Loading eBPF program from bytecode ({} bytes)",
144            bytecode.len()
145        );
146
147        // Enforce: proc_module_offsets must be provided as a pinned global map by the process layer
148        let pin_path = proc_offsets_pin_path();
149        if !pin_path.exists() {
150            return Err(LoaderError::Generic(format!(
151                "Pinned map '{}' not found. Please call ghostscope-process to create it first.",
152                pin_path.display()
153            )));
154        }
155
156        let mut loader = EbpfLoader::new();
157        let use_verbose = cfg!(debug_assertions)
158            || log_enabled!(LogLevel::Trace)
159            || log_enabled!(LogLevel::Debug);
160        if use_verbose {
161            loader.verifier_log_level(VerifierLogLevel::VERBOSE | VerifierLogLevel::STATS);
162            tracing::info!("BPF verifier logs: VERBOSE (debug build/log)");
163        } else {
164            loader.verifier_log_level(VerifierLogLevel::DEBUG | VerifierLogLevel::STATS);
165            tracing::info!("BPF verifier logs: DEBUG (release/info)");
166        }
167        // Configure Aya loader to reuse pinned maps by name under our per-process pin directory.
168        // This makes @proc_module_offsets in the eBPF object bind to the already pinned map
169        // created by ghostscope-process instead of creating a new private map.
170        let pin_dir = proc_offsets_pin_dir();
171        if pin_dir.exists() {
172            loader.map_pin_path(&pin_dir);
173            tracing::info!(
174                "Configured map pin directory for reuse: {}",
175                pin_dir.display()
176            );
177        }
178        match loader.load(bytecode) {
179            Ok(bpf) => {
180                info!("Successfully loaded eBPF program");
181                Ok(Self {
182                    bpf,
183                    event_map: None,
184                    uprobe_link: None,
185                    attachment_params: None,
186                    parser: StreamingTraceParser::new(),
187                    trace_context: None,
188                    perf_page_count: None,
189                })
190            }
191            Err(e) => {
192                error!("Failed to load BPF program: {:?}", e);
193                // Try to provide more specific error information
194                match &e {
195                    aya::EbpfError::ParseError(parse_err) => {
196                        error!("Parse error details: {:?}", parse_err);
197                    }
198                    aya::EbpfError::BtfError(btf_err) => {
199                        error!("BTF error details: {:?}", btf_err);
200                    }
201                    _ => {
202                        error!("Other BPF error: {:?}", e);
203                    }
204                }
205                Err(LoaderError::Aya(e))
206            }
207        }
208    }
209
210    // ============================================================================
211    // Uprobe Management
212    // ============================================================================
213
214    /// Attach to a uprobe at the specified function offset
215    pub fn attach_uprobe(
216        &mut self,
217        target_binary: &str,
218        function_name: &str,
219        offset: Option<u64>,
220        pid: Option<i32>,
221    ) -> Result<()> {
222        self.attach_uprobe_with_program_name(target_binary, function_name, offset, pid, None)
223    }
224
225    /// Set PerfEventArray page count override (applies when using Perf backend)
226    pub fn set_perf_page_count(&mut self, pages: u32) {
227        self.perf_page_count = Some(pages as usize);
228    }
229
230    /// Attach to a uprobe with a specific eBPF program name
231    pub fn attach_uprobe_with_program_name(
232        &mut self,
233        target_binary: &str,
234        function_name: &str,
235        offset: Option<u64>,
236        pid: Option<i32>,
237        program_name: Option<&str>,
238    ) -> Result<()> {
239        info!("attach_uprobe called with offset: {:?}", offset);
240        if let Some(offset) = offset {
241            info!(
242                "Using offset-based attachment: {} at 0x{:x} ({}) (pid: {:?})",
243                target_binary, offset, function_name, pid
244            );
245        } else {
246            info!(
247                "Using function name-based attachment: {}:{} (pid: {:?})",
248                target_binary, function_name, pid
249            );
250        }
251
252        // Collect all available program names first to avoid borrowing conflicts
253        let available_programs: Vec<String> = self
254            .bpf
255            .programs()
256            .map(|(name, _)| name.to_string())
257            .collect();
258
259        // Debug: Print all available programs
260        info!("Available programs:");
261        for name in &available_programs {
262            info!("  - {}", name);
263        }
264
265        // Get the program from the BPF object
266        let program_name: String = if let Some(name) = program_name {
267            // Use the specified program name
268            info!("Using specified program name: {}", name);
269            if available_programs.contains(&name.to_string()) {
270                name.to_string()
271            } else {
272                return Err(LoaderError::Generic(format!(
273                    "Specified program '{name}' not found in eBPF object"
274                )));
275            }
276        } else {
277            // Try different program names: section name first, then function name, then any program
278            let program_names = ["uprobe", "main"];
279            let mut found_program_name: Option<String> = None;
280
281            for name in &program_names {
282                info!("Checking if program exists: {}", name);
283                if available_programs.contains(&name.to_string()) {
284                    info!("Found program: {}", name);
285                    found_program_name = Some(name.to_string());
286                    break;
287                }
288            }
289
290            // If no standard names found, use the first available program
291            if found_program_name.is_none() {
292                if let Some(first_name) = available_programs.first() {
293                    info!(
294                        "No standard program names found, using first available: {}",
295                        first_name
296                    );
297                    found_program_name = Some(first_name.clone());
298                }
299            }
300
301            found_program_name
302                .ok_or_else(|| LoaderError::Generic("No suitable program found".to_string()))?
303        };
304
305        info!("Attempting to load program: {}", program_name);
306
307        let program_ref = self
308            .bpf
309            .program_mut(&program_name)
310            .ok_or_else(|| LoaderError::Generic(format!("Program '{program_name}' not found")))?;
311
312        info!("Found program, attempting to convert to UProbe");
313        info!("Program type: {:?}", program_ref.prog_type());
314
315        // Check what type of program this actually is
316        match program_ref {
317            aya::programs::Program::UProbe(_) => {
318                info!("Program is correctly recognized as UProbe");
319            }
320            aya::programs::Program::KProbe(_) => {
321                error!("Program is incorrectly recognized as KProbe, should be UProbe");
322            }
323            ref _other => {
324                error!("Program is unexpected type (not UProbe or KProbe)");
325            }
326        }
327
328        let program: &mut UProbe = program_ref.try_into().map_err(|e| {
329            LoaderError::Generic(format!("Program '{program_name}' is not a UProbe: {e:?}"))
330        })?;
331
332        // Load the program
333        info!("About to load eBPF program");
334        match program.load() {
335            Ok(()) => {
336                info!("Program loaded successfully");
337            }
338            Err(e) => {
339                error!("eBPF program load failed: {}", e);
340                error!("This typically indicates eBPF verifier rejection");
341
342                // Check for specific verifier errors
343                if let ProgramError::SyscallError(syscall_error) = &e {
344                    error!(
345                        "Syscall '{}' failed: {}",
346                        syscall_error.call, syscall_error.io_error
347                    );
348
349                    // Check for common error codes
350                    if let Some(errno) = syscall_error.io_error.raw_os_error() {
351                        match errno {
352                            22 => error!(
353                                "EINVAL (22): Invalid argument - likely eBPF verifier rejection"
354                            ),
355                            7 => error!("E2BIG (7): Program too large"),
356                            13 => error!("EACCES (13): Permission denied"),
357                            95 => error!("EOPNOTSUPP (95): Operation not supported"),
358                            _ => error!("Unknown errno: {}", errno),
359                        }
360                    }
361                }
362
363                // Log additional debugging info
364                error!("Program name: {}", program_name);
365                error!("Program type: {:?}", program_ref.prog_type());
366
367                return Err(LoaderError::Program(e));
368            }
369        }
370
371        // Attach the uprobe using Aya API via a compatibility helper
372        // so argument ordering stays explicit regardless of Aya version.
373        let attach_location = match offset {
374            Some(offset) => UProbeAttachLocation::AbsoluteOffset(offset),
375            None => UProbeAttachLocation::Function(function_name),
376        };
377        let attach_result = attach_location.attach(program, target_binary, pid);
378
379        match attach_result {
380            Ok(link) => {
381                if let Some(offset) = offset {
382                    info!(
383                        "Uprobe attached successfully to {} at offset 0x{:x}",
384                        target_binary, offset
385                    );
386                } else {
387                    info!(
388                        "Uprobe attached successfully to {}:{}",
389                        target_binary, function_name
390                    );
391                }
392
393                // Store the link handle and attachment parameters for later use
394                self.uprobe_link = Some(link);
395                self.attachment_params = Some(UprobeAttachmentParams {
396                    target_binary: target_binary.to_string(),
397                    function_name: function_name.to_string(),
398                    offset,
399                    pid,
400                    program_name,
401                });
402            }
403            Err(e) => {
404                if let Some(offset) = offset {
405                    error!(
406                        "Failed to attach uprobe to {} at offset 0x{:x}: {}",
407                        target_binary, offset, e
408                    );
409                    error!("Detailed error: {:#?}", e);
410                } else {
411                    error!(
412                        "Failed to attach uprobe to {}:{}: {}",
413                        target_binary, function_name, e
414                    );
415                    error!("Detailed error: {:#?}", e);
416                }
417
418                // Try to provide more helpful error information
419                if let ProgramError::SyscallError(syscall_error) = &e {
420                    error!(
421                        "Syscall '{}' failed: {}",
422                        syscall_error.call, syscall_error.io_error
423                    );
424                    if let Some(13) = syscall_error.io_error.raw_os_error() {
425                        error!("Permission denied - make sure to run with sudo");
426                    }
427                }
428
429                return Err(LoaderError::Program(e));
430            }
431        }
432
433        // Initialize event map after successful attachment
434        // Try RingBuf first, fall back to PerfEventArray
435        let event_map = if let Some(map) = self.bpf.take_map("ringbuf") {
436            info!("Initializing RingBuf event map");
437            let ringbuf: RingBuf<_> = map
438                .try_into()
439                .map_err(|e| LoaderError::Generic(format!("Failed to convert ringbuf map: {e}")))?;
440            EventMap::RingBuf(ringbuf)
441        } else if let Some(map) = self.bpf.take_map("events") {
442            info!("Initializing PerfEventArray event map");
443            let mut perf_array: PerfEventArray<_> = map.try_into().map_err(|e| {
444                LoaderError::Generic(format!("Failed to convert perf event array map: {e}"))
445            })?;
446
447            // Get online CPUs
448            let online_cpus = aya::util::online_cpus().map_err(|(_, e)| {
449                LoaderError::Generic(format!("Failed to get online CPUs: {e}"))
450            })?;
451
452            info!(
453                "Opening PerfEventArray buffers for {} online CPUs",
454                online_cpus.len()
455            );
456
457            // Open buffers for all online CPUs
458            let mut cpu_buffers = Vec::new();
459
460            for cpu_id in online_cpus {
461                let pages = self.perf_page_count;
462                match perf_array.open(cpu_id, pages) {
463                    Ok(buffer) => {
464                        if let Some(p) = pages {
465                            info!(
466                                "Opened PerfEventArray buffer for CPU {} with {} pages",
467                                cpu_id, p
468                            );
469                        } else {
470                            info!(
471                                "Opened PerfEventArray buffer for CPU {} (default pages)",
472                                cpu_id
473                            );
474                        }
475                        let fd = buffer.as_raw_fd();
476                        let readiness =
477                            AsyncFd::with_interest(PerfBufferFd(fd), Interest::READABLE).map_err(
478                                |err| {
479                                    LoaderError::Generic(format!(
480                                        "Failed to register perf buffer fd for CPU {cpu_id}: {err}"
481                                    ))
482                                },
483                            )?;
484                        cpu_buffers.push(PerfEventCpuBuffer {
485                            cpu_id,
486                            buffer,
487                            readiness,
488                        });
489                    }
490                    Err(e) => {
491                        warn!("Failed to open perf buffer for CPU {}: {}", cpu_id, e);
492                    }
493                }
494            }
495
496            if cpu_buffers.is_empty() {
497                return Err(LoaderError::Generic(
498                    "Failed to open any perf event buffers".to_string(),
499                ));
500            }
501
502            EventMap::PerfEventArray {
503                _map: perf_array,
504                cpu_buffers,
505            }
506        } else {
507            return Err(LoaderError::MapNotFound(
508                "Neither 'ringbuf' nor 'events' map found".to_string(),
509            ));
510        };
511
512        // Set parser event source based on map type
513        let event_source = match &event_map {
514            EventMap::RingBuf(_) => {
515                info!("Using RingBuf mode for parser");
516                ghostscope_protocol::EventSource::RingBuf
517            }
518            EventMap::PerfEventArray { .. } => {
519                info!("Using PerfEventArray mode for parser");
520                ghostscope_protocol::EventSource::PerfEventArray
521            }
522        };
523        self.parser = StreamingTraceParser::with_event_source(event_source);
524
525        self.event_map = Some(event_map);
526        info!("Event map initialized");
527
528        Ok(())
529    }
530
531    /// Detach the uprobe (disable tracing) while keeping eBPF resources loaded
532    /// This allows the trace to be quickly re-enabled later
533    pub fn detach_uprobe(&mut self) -> Result<()> {
534        if let Some(link_id) = self.uprobe_link.take() {
535            if let Some(params) = &self.attachment_params {
536                info!("Detaching uprobe...");
537
538                // Get the program to detach the link
539                let program_ref = self.bpf.program_mut(&params.program_name).ok_or_else(|| {
540                    let program_name = &params.program_name;
541                    LoaderError::Generic(format!("Program '{program_name}' not found"))
542                })?;
543
544                let program: &mut UProbe = program_ref.try_into().map_err(|e| {
545                    let program_name = &params.program_name;
546                    LoaderError::Generic(format!("Program '{program_name}' is not a UProbe: {e:?}"))
547                })?;
548
549                // Detach the uprobe using the link ID
550                program.detach(link_id).map_err(LoaderError::Program)?;
551
552                info!("Uprobe detached successfully");
553                Ok(())
554            } else {
555                error!("No attachment parameters stored");
556                Err(LoaderError::Generic(
557                    "No attachment parameters stored".to_string(),
558                ))
559            }
560        } else {
561            warn!("No uprobe attached, nothing to detach");
562            Ok(())
563        }
564    }
565
566    /// Reattach the uprobe (re-enable tracing) using previously stored parameters
567    /// This requires that attach_uprobe was called previously to store the parameters
568    pub fn reattach_uprobe(&mut self) -> Result<()> {
569        if self.uprobe_link.is_some() {
570            info!("Uprobe already attached");
571            return Ok(());
572        }
573
574        let params = self
575            .attachment_params
576            .as_ref()
577            .ok_or_else(|| {
578                LoaderError::Generic(
579                    "No attachment parameters stored. Call attach_uprobe first.".to_string(),
580                )
581            })?
582            .clone();
583
584        info!("Reattaching uprobe with stored parameters...");
585
586        // Get the program directly (it's already loaded)
587        let program_ref = self.bpf.program_mut(&params.program_name).ok_or_else(|| {
588            LoaderError::Generic(format!("Program '{}' not found", params.program_name))
589        })?;
590
591        let program: &mut UProbe = program_ref.try_into().map_err(|e| {
592            LoaderError::Generic(format!(
593                "Program '{}' is not a UProbe: {:?}",
594                params.program_name, e
595            ))
596        })?;
597
598        // Attach the uprobe directly (don't load - it's already loaded)
599        let attach_location = match params.offset {
600            Some(offset) => UProbeAttachLocation::AbsoluteOffset(offset),
601            None => UProbeAttachLocation::Function(params.function_name.as_str()),
602        };
603        let attach_result = attach_location.attach(program, &params.target_binary, params.pid);
604
605        match attach_result {
606            Ok(link) => {
607                if let Some(offset) = params.offset {
608                    info!(
609                        "Uprobe reattached successfully to {} at offset 0x{:x}",
610                        params.target_binary, offset
611                    );
612                } else {
613                    info!(
614                        "Uprobe reattached successfully to {}:{}",
615                        params.target_binary, params.function_name
616                    );
617                }
618
619                // Store the new link handle
620                self.uprobe_link = Some(link);
621                Ok(())
622            }
623            Err(e) => {
624                error!("Failed to reattach uprobe: {:?}", e);
625                Err(LoaderError::Program(e))
626            }
627        }
628    }
629
630    /// Check if the uprobe is currently attached
631    pub fn is_uprobe_attached(&self) -> bool {
632        self.uprobe_link.is_some()
633    }
634
635    /// Completely destroy this loader and all associated resources
636    /// This detaches any attached uprobes and clears all eBPF resources
637    /// After calling this, the loader cannot be reused
638    pub fn destroy(&mut self) -> Result<()> {
639        info!("Destroying GhostScopeLoader and all associated resources");
640
641        // First detach uprobe if attached
642        if self.uprobe_link.is_some() {
643            if let Err(e) = self.detach_uprobe() {
644                warn!("Failed to detach uprobe during destroy: {}", e);
645                // Continue with destruction even if detach fails
646            }
647        }
648
649        // Clear attachment parameters
650        self.attachment_params = None;
651
652        // Clear event map reference (this doesn't destroy the actual eBPF map,
653        // but removes our handle to it)
654        self.event_map = None;
655
656        // Note: The eBPF programs and maps will be automatically cleaned up
657        // when the `bpf` field is dropped (when this struct is dropped)
658
659        info!("GhostScopeLoader destroyed successfully");
660        Ok(())
661    }
662
663    /// Get current attachment status information
664    pub fn get_attachment_info(&self) -> Option<String> {
665        if let Some(params) = &self.attachment_params {
666            if let Some(offset) = params.offset {
667                Some(format!(
668                    "{}:{} (offset: 0x{:x}, pid: {:?}) - {}",
669                    params.target_binary,
670                    params.function_name,
671                    offset,
672                    params.pid,
673                    if self.is_uprobe_attached() {
674                        "attached"
675                    } else {
676                        "detached"
677                    }
678                ))
679            } else {
680                Some(format!(
681                    "{}:{} (pid: {:?}) - {}",
682                    params.target_binary,
683                    params.function_name,
684                    params.pid,
685                    if self.is_uprobe_attached() {
686                        "attached"
687                    } else {
688                        "detached"
689                    }
690                ))
691            }
692        } else {
693            None
694        }
695    }
696
697    // ============================================================================
698    // Event Reading
699    // ============================================================================
700
701    /// Wait for events asynchronously using AsyncFd
702    pub async fn wait_for_events_async(&mut self) -> Result<Vec<ParsedTraceEvent>> {
703        let trace_context = self.trace_context.as_ref().ok_or_else(|| {
704            LoaderError::Generic(
705                "No trace context available - cannot parse trace events".to_string(),
706            )
707        })?;
708
709        let event_map = self.event_map.as_mut().ok_or_else(|| {
710            LoaderError::Generic("Event map not initialized. Call attach_uprobe first.".to_string())
711        })?;
712
713        let mut events = Vec::new();
714
715        match event_map {
716            EventMap::RingBuf(ringbuf) => {
717                // Create AsyncFd and wait for readable; clear readiness to avoid spin
718                let async_fd = AsyncFd::new(ringbuf.as_raw_fd())
719                    .map_err(|e| LoaderError::Generic(format!("Failed to create AsyncFd: {e}")))?;
720                let mut guard = async_fd
721                    .readable()
722                    .await
723                    .map_err(|e| LoaderError::Generic(format!("AsyncFd error: {e}")))?;
724                guard.clear_ready();
725
726                // Read all available events
727                while let Some(item) = ringbuf.next() {
728                    match self.parser.process_segment(&item, trace_context) {
729                        Ok(Some(parsed_event)) => events.push(parsed_event),
730                        Ok(None) => {}
731                        Err(e) => {
732                            return Err(LoaderError::Generic(format!(
733                                "Fatal: Failed to parse trace event from RingBuf (async): {e}"
734                            )));
735                        }
736                    }
737                }
738            }
739            EventMap::PerfEventArray { cpu_buffers, .. } => {
740                use bytes::BytesMut;
741
742                let parser = &mut self.parser;
743
744                let mut drain_buffer = |entry: &mut PerfEventCpuBuffer| -> Result<bool> {
745                    let mut produced = false;
746                    let mut read_bufs = vec![BytesMut::with_capacity(4096)];
747
748                    match entry.buffer.read_events(&mut read_bufs) {
749                        Ok(result) => {
750                            if result.read > 0 {
751                                produced = true;
752                                info!(
753                                    "Read {} events from CPU {} buffer",
754                                    result.read, entry.cpu_id
755                                );
756                            }
757                            if result.lost > 0 {
758                                warn!(
759                                    "Lost {} events from CPU {} buffer",
760                                    result.lost, entry.cpu_id
761                                );
762                            }
763
764                            for (i, data) in read_bufs.iter().enumerate().take(result.read) {
765                                debug!(
766                                    "PerfEvent {}: {} bytes - {:02x?}",
767                                    i,
768                                    data.len(),
769                                    &data[..data.len().min(32)]
770                                );
771
772                                match parser.process_segment(data, trace_context) {
773                                    Ok(Some(parsed_event)) => events.push(parsed_event),
774                                    Ok(None) => {}
775                                    Err(e) => {
776                                        let cpu = entry.cpu_id;
777                                        return Err(LoaderError::Generic(format!(
778                                            "Fatal: Failed to parse trace event from PerfEventArray CPU {cpu}: {e}"
779                                        )));
780                                    }
781                                }
782                            }
783                        }
784                        Err(e) => {
785                            warn!("Failed to read from CPU {} buffer: {}", entry.cpu_id, e);
786                        }
787                    }
788
789                    Ok(produced)
790                };
791
792                loop {
793                    // Drain any buffers that already report data without waiting.
794                    let mut made_progress = false;
795                    for entry in cpu_buffers.iter_mut() {
796                        if entry.buffer.readable() {
797                            made_progress |= drain_buffer(entry)?;
798                        }
799                    }
800
801                    if made_progress {
802                        break;
803                    }
804
805                    // Wait for at least one buffer to become readable.
806                    let ready_idx = poll_fn(|cx| {
807                        for (idx, entry) in cpu_buffers.iter().enumerate() {
808                            match entry.readiness.poll_read_ready(cx) {
809                                Poll::Ready(Ok(mut guard)) => {
810                                    guard.clear_ready();
811                                    return Poll::Ready(Ok(idx));
812                                }
813                                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
814                                Poll::Pending => {}
815                            }
816                        }
817                        Poll::Pending
818                    })
819                    .await
820                    .map_err(|e| {
821                        LoaderError::Generic(format!(
822                            "AsyncFd error while waiting for perf events: {e}"
823                        ))
824                    })?;
825
826                    // Drain the buffer that triggered readiness.
827                    made_progress |= drain_buffer(
828                        cpu_buffers
829                            .get_mut(ready_idx)
830                            .expect("ready index should be valid"),
831                    )?;
832
833                    // Drain any other buffers now advertising data.
834                    for (idx, entry) in cpu_buffers.iter_mut().enumerate() {
835                        if idx == ready_idx || !entry.buffer.readable() {
836                            continue;
837                        }
838                        made_progress |= drain_buffer(entry)?;
839                    }
840
841                    if made_progress {
842                        break;
843                    }
844                    // No events were produced despite readiness (eg. lost event markers).
845                    // Loop back and wait again.
846                }
847            }
848        }
849
850        Ok(events)
851    }
852
853    /// Set the trace context for parsing trace events
854    pub fn set_trace_context(&mut self, trace_context: TraceContext) {
855        info!("Setting trace context for trace event parsing");
856        self.trace_context = Some(trace_context);
857    }
858
859    // ============================================================================
860    // Information and Debugging
861    // ============================================================================
862
863    /// Get information about loaded maps
864    pub fn get_map_info(&self) -> Vec<String> {
865        self.bpf
866            .maps()
867            .map(|(name, _map)| format!("Map: {name}"))
868            .collect()
869    }
870
871    /// Get information about loaded programs
872    pub fn get_program_info(&self) -> Vec<String> {
873        self.bpf
874            .programs()
875            .map(|(name, _prog)| format!("Program: {name}"))
876            .collect()
877    }
878}