bpf-loader-lib 0.2.1

A library to load json-described ebpf programs, and automatically poll outputs from the program
Documentation
//!  SPDX-License-Identifier: MIT
//!
//! Copyright (c) 2023, eunomia-bpf
//! All rights reserved.
//!

use std::sync::Weak;

use anyhow::{anyhow, bail, Context, Result};
use chrono::Local;
use log::warn;
use serde_json::json;
use std::fmt::Write;

use crate::{
    export_event::{
        data_dumper::{
            json::dump_to_json_with_checked_types,
            plain_text::{dump_to_string, dump_to_string_with_checked_types},
        },
        EventExporter, ExporterInternalImplementation, InternalSampleMapProcessor,
        ReceivedEventData,
    },
    helper::log2_hist::print_log2_hist,
};

pub(crate) struct JsonExportEventHandler {
    pub(crate) exporter: Weak<EventExporter>,
}

impl InternalSampleMapProcessor for JsonExportEventHandler {
    fn handle_event(&self, key_buffer: &[u8], value_buffer: &[u8]) -> Result<()> {
        let exporter = self.exporter.upgrade().unwrap();
        let btf = exporter.btf_container.borrow_btf();
        let (checked_key_types, checked_value_types) =
            if let ExporterInternalImplementation::KeyValueMapProcessor {
                ref checked_key_types,
                ref checked_value_types,
                ..
            } = exporter.internal_impl
            {
                (checked_key_types, checked_value_types)
            } else {
                bail!("Unexpected internal implementation");
            };
        let key_out = dump_to_json_with_checked_types(btf, checked_key_types, key_buffer)
            .with_context(|| anyhow!("Failed to dump key type to json"))?;
        let value_out = dump_to_json_with_checked_types(btf, checked_value_types, value_buffer)
            .with_context(|| anyhow!("Failed to dump value type to json"))?;
        let final_json = json!({
            "key":key_out,
            "value":value_out
        });
        let out_str = serde_json::to_string(&final_json)
            .with_context(|| anyhow!("Failed to serialize json"))?;
        exporter.dump_data_to_user_callback_or_stdout(ReceivedEventData::JsonText(&out_str));
        Ok(())
    }
}

pub(crate) struct RawExportEventHandler {
    pub(crate) exporter: Weak<EventExporter>,
}

impl InternalSampleMapProcessor for RawExportEventHandler {
    fn handle_event(&self, key_buffer: &[u8], value_buffer: &[u8]) -> Result<()> {
        let exporter = self.exporter.upgrade().unwrap();
        if let Some(callback) = exporter.user_export_event_handler.as_ref() {
            callback.handle_event(
                exporter.user_ctx.clone(),
                ReceivedEventData::KeyValueBuffer {
                    key: key_buffer,
                    value: value_buffer,
                },
            );
        } else {
            warn!("Raw map processor expects that a user-provided callback exists, or the data will be dropped");
        }
        Ok(())
    }
}

pub(crate) struct DefaultKVStringExportEventHandler {
    pub(crate) exporter: Weak<EventExporter>,
}

impl InternalSampleMapProcessor for DefaultKVStringExportEventHandler {
    fn handle_event(&self, key_buffer: &[u8], value_buffer: &[u8]) -> Result<()> {
        let exporter = self.exporter.upgrade().unwrap();
        let btf = exporter.btf_container.borrow_btf();
        let (checked_key_types, checked_value_types) =
            if let ExporterInternalImplementation::KeyValueMapProcessor {
                ref checked_key_types,
                ref checked_value_types,
                ..
            } = exporter.internal_impl
            {
                (checked_key_types, checked_value_types)
            } else {
                bail!("Unexpected internal implementation");
            };
        let now_str = Local::now().format("%H:%M:%S").to_string();
        let mut outbuf = String::default();
        write!(outbuf, "{now_str:<8} ").unwrap();
        write!(
            outbuf,
            "{} {}",
            dump_to_json_with_checked_types(btf, checked_key_types, key_buffer)?,
            dump_to_json_with_checked_types(btf, checked_value_types, value_buffer)?
        )
        .unwrap();
        exporter
            .dump_data_to_user_callback_or_stdout(ReceivedEventData::PlainText(outbuf.as_str()));
        Ok(())
    }
}

pub(crate) struct Log2HistExportEventHandler {
    pub(crate) exporter: Weak<EventExporter>,
}

impl InternalSampleMapProcessor for Log2HistExportEventHandler {
    fn handle_event(&self, key_buffer: &[u8], value_buffer: &[u8]) -> Result<()> {
        let exporter = self.exporter.upgrade().unwrap();
        let btf = exporter.btf_container.borrow_btf();
        let (checked_key_types, checked_value_types, sample_map_config) =
            if let ExporterInternalImplementation::KeyValueMapProcessor {
                ref checked_key_types,
                ref checked_value_types,
                ref sample_map_config,
                ..
            } = exporter.internal_impl
            {
                (checked_key_types, checked_value_types, sample_map_config)
            } else {
                bail!("Unexpected internal implementation");
            };
        let mut outbuf = String::default();
        write!(outbuf, "key = ").unwrap();
        dump_to_string_with_checked_types(btf, checked_key_types, key_buffer, &mut outbuf)?;
        writeln!(outbuf).unwrap();
        struct SlotsDef {
            offset: u32,
            length_in_u32: u32,
        }
        let mut slots: Option<SlotsDef> = None;

        for member in checked_value_types.iter() {
            let offset = member.bit_offset / 8;
            if member.bit_offset % 8 != 0 {
                bail!("bit fields are not supported now");
            }
            if member.field_name == "slots" {
                slots = Some(SlotsDef {
                    offset,
                    length_in_u32: (member.size as u32) / 4,
                });
            } else {
                write!(outbuf, "{} = ", member.field_name).unwrap();
                dump_to_string(
                    btf,
                    member.type_id,
                    &value_buffer[offset as usize..offset as usize + member.size],
                    &mut outbuf,
                )?;
                writeln!(outbuf).unwrap();
            }
        }
        if let Some(SlotsDef {
            offset: slot_offset,
            length_in_u32: slot_size,
        }) = slots
        {
            exporter.dump_data_to_user_callback_or_stdout(ReceivedEventData::PlainText(&outbuf));
            let mut val_buf = vec![];
            for i in 0..slot_size {
                val_buf.push(u32::from_le_bytes(
                    value_buffer
                        [(slot_offset + i * 4) as usize..(slot_offset + (i + 1) * 4) as usize]
                        .try_into()?,
                ))
            }
            outbuf.clear();
            print_log2_hist(&val_buf[..], &sample_map_config.unit, &mut outbuf);
            exporter.dump_data_to_user_callback_or_stdout(ReceivedEventData::PlainText(&outbuf));
        } else {
            bail!("No slots found!");
        }
        Ok(())
    }
}