bpf-tracing 0.0.1

An eBPF tracing facility that integrates neatly into Rust's tracing
Documentation
use crate::event::{Event, Kind};
use nix::sys::statfs::{FsType, statfs};
use std::{
    collections::VecDeque,
    fs::File,
    io::{self, BufRead, BufReader},
    path::Path,
    thread::{self, JoinHandle},
};
use tracing::{self, Level, span::EnteredSpan};

mod event;

type Spans = Vec<VecDeque<EnteredSpan>>;

pub fn try_init() -> io::Result<JoinHandle<()>> {
    let pipe = get_trace_pipe()?;
    let new_cx = || {
        let cpus = thread::available_parallelism().unwrap().get();
        let mut spans: Spans = Vec::new();
        for _ in 0..cpus {
            spans.push(VecDeque::new());
        }
        spans
    };

    observe(&pipe, new_cx, move |val, spans| {
        if let Ok(event) = val.parse() {
            emit(event, spans);
        }
    })
}

fn get_trace_pipe() -> io::Result<impl AsRef<Path>> {
    fn validate(path: &Path) -> bool {
        const TRACEFS_MAGIC: FsType = FsType(0x74726163);
        if let Ok(stat) = statfs(path) {
            return stat.filesystem_type() == TRACEFS_MAGIC;
        }

        false
    }
    let known_mounts = [
        Path::new("/sys/kernel/tracing"),
        Path::new("/sys/kernel/debug/tracing"),
    ];
    if let Some(path) = known_mounts.into_iter().find(|p| validate(p)) {
        let path = path.to_path_buf();
        return Ok(path.join("trace_pipe"));
    }

    let file = File::open("/proc/mounts")?;
    let mut lines = BufReader::new(file).lines();
    while let Some(Ok(line)) = lines.next() {
        if line.starts_with("tracefs") {
            let mount = line.split_whitespace().nth(1).map(Path::new);
            if let Some(mount) = mount {
                if validate(mount) {
                    return Ok(mount.join("trace_pipe"));
                }
            }
        }
    }

    Err(io::Error::new(
        io::ErrorKind::NotFound,
        "trace_pipe not found",
    ))
}

fn observe<P: AsRef<Path>, C>(
    path: P,
    new_cx: impl FnOnce() -> C + Send + Sync + 'static,
    mut callback: impl FnMut(String, &mut C) + Send + Sync + 'static,
) -> io::Result<JoinHandle<()>> {
    let path = path.as_ref().to_path_buf();
    let file = File::open(&path)?;

    let handle = thread::spawn(move || {
        let mut cx = new_cx();
        let mut lines = BufReader::new(file).lines();
        while let Some(Ok(line)) = lines.next() {
            callback(line, &mut cx);
        }
    });

    Ok(handle)
}

fn emit(event: Event, spans: &mut Spans) {
    match event.kind {
        Kind::Message(msg, level) => {
            match level {
                Level::TRACE => tracing::trace!(target: "bpf", "{msg}"),
                Level::DEBUG => tracing::debug!(target: "bpf", "{msg}"),
                Level::INFO => tracing::info!(target: "bpf", "{msg}"),
                Level::WARN => tracing::warn!(target: "bpf", "{msg}"),
                Level::ERROR => tracing::error!(target: "bpf", "{msg}"),
            };
        }
        Kind::StartSpan(name, level) => {
            let parent = spans[event.cpu].back().and_then(|p| p.id());
            let span = match level {
                Level::TRACE => tracing::trace_span!(target: "bpf", parent: parent, "span", name),
                Level::DEBUG => tracing::debug_span!(target: "bpf", parent: parent, "span", name),
                Level::INFO => tracing::info_span!(target: "bpf", parent: parent, "span", name),
                Level::WARN => tracing::warn_span!(target: "bpf", parent: parent, "span", name),
                Level::ERROR => tracing::error_span!(target: "bpf", parent: parent, "span", name),
            };
            spans[event.cpu].push_back(span.entered());
        }
        Kind::EndSpan(name) => _ = spans[event.cpu].pop_back(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::{io::Write, sync::mpsc, thread::sleep, time::Duration};

    const TEST_INTERVAL: Duration = Duration::from_millis(100);

    #[test]
    fn observe_nonexistent_file() {
        let temp_dir = tempfile::tempdir().unwrap();
        let path = temp_dir.path().join("test.log");

        fn no_cx() -> Option<()> {
            None
        }
        fn callback(_: String, _: &mut Option<()>) {}

        assert!(observe(path, no_cx, callback).is_err());
    }

    #[test]
    fn observe_file_changes() {
        let temp_dir = tempfile::tempdir().unwrap();
        let path = temp_dir.path().join("test.log");
        let mut file = File::create(&path).unwrap();
        let (tx, rx) = mpsc::channel();

        fn no_cx() -> Option<()> {
            None
        }
        let callback = move |val: String, _: &mut Option<()>| {
            tx.send(val).ok();
        };

        observe(path, no_cx, callback).expect("observe");

        file.write_all(b"hello\n").unwrap();
        file.write_all(b"world\n").unwrap();

        sleep(TEST_INTERVAL);

        assert_eq!(rx.recv().unwrap(), "hello".to_string());
        assert_eq!(rx.recv().unwrap(), "world".to_string());
    }
}