use std::sync::mpsc::Receiver;
use super::event::EbpfEvent;
#[derive(Debug)]
pub enum EbpfError {
UnsupportedPlatform,
BpfObjectMissing,
LoadFailed(String),
AttachFailed(String),
Io(std::io::Error),
}
impl std::fmt::Display for EbpfError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnsupportedPlatform => {
write!(f, "eBPF event source is only available on Linux")
}
Self::BpfObjectMissing => write!(
f,
"BPF object not embedded; run scripts/build-ebpf.sh and \
rebuild netwatch-sdk with --features ebpf"
),
Self::LoadFailed(s) => write!(f, "BPF load failed: {s}"),
Self::AttachFailed(s) => write!(f, "BPF attach failed: {s}"),
Self::Io(e) => write!(f, "io: {e}"),
}
}
}
impl std::error::Error for EbpfError {}
impl From<std::io::Error> for EbpfError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
pub struct EventSource {
_inner: PlatformInner,
}
impl EventSource {
pub fn new() -> Result<(Self, Receiver<EbpfEvent>), EbpfError> {
platform::new()
}
}
#[cfg(target_os = "linux")]
type PlatformInner = linux::Inner;
#[cfg(not(target_os = "linux"))]
type PlatformInner = ();
#[cfg(target_os = "linux")]
mod platform {
use super::*;
pub fn new() -> Result<(EventSource, Receiver<EbpfEvent>), EbpfError> {
super::linux::new()
}
}
#[cfg(not(target_os = "linux"))]
mod platform {
use super::*;
pub fn new() -> Result<(EventSource, Receiver<EbpfEvent>), EbpfError> {
Err(EbpfError::UnsupportedPlatform)
}
}
#[cfg(target_os = "linux")]
mod linux {
use super::*;
use crate::ebpf::event::{estimate_boot_time, ConnectEvent};
use aya::{maps::RingBuf, programs::KProbe, Bpf};
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
const BPF_OBJECT: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/netwatch_sdk_ebpf.o"));
const POLL_TIMEOUT_MS: i32 = 100;
pub struct Inner {
_bpf: Bpf,
shutdown: Arc<AtomicBool>,
_reader: Option<JoinHandle<()>>,
}
impl Drop for Inner {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
}
}
pub fn new() -> Result<(EventSource, Receiver<EbpfEvent>), EbpfError> {
if BPF_OBJECT.is_empty() {
return Err(EbpfError::BpfObjectMissing);
}
let object: Vec<u8> = BPF_OBJECT.to_vec();
let mut bpf =
Bpf::load(&object).map_err(|e| EbpfError::LoadFailed(format!("{e:?}")))?;
let program: &mut KProbe = bpf
.program_mut("tcp_v4_connect")
.ok_or_else(|| {
EbpfError::LoadFailed("program tcp_v4_connect not found in BPF object".into())
})?
.try_into()
.map_err(|e: aya::programs::ProgramError| {
EbpfError::LoadFailed(format!("not a kprobe: {e:?}"))
})?;
program
.load()
.map_err(|e| EbpfError::LoadFailed(format!("{e:?}")))?;
program
.attach("tcp_v4_connect", 0)
.map_err(|e| EbpfError::AttachFailed(format!("{e:?}")))?;
let events_map = bpf
.take_map("EVENTS")
.ok_or_else(|| EbpfError::LoadFailed("EVENTS map not found".into()))?;
let mut ring: RingBuf<_> = RingBuf::try_from(events_map)
.map_err(|e| EbpfError::LoadFailed(format!("EVENTS not a RingBuf: {e:?}")))?;
let ring_fd = ring.as_raw_fd();
let (tx, rx) = mpsc::channel::<EbpfEvent>();
let boot = estimate_boot_time();
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_for_thread = shutdown.clone();
let reader = thread::Builder::new()
.name("netwatch-sdk-ebpf-reader".into())
.spawn(move || {
use crate::wire::{ConnectV4Event, EventKind};
let mut pollfd = libc::pollfd {
fd: ring_fd,
events: libc::POLLIN,
revents: 0,
};
loop {
if shutdown_for_thread.load(Ordering::Relaxed) {
return;
}
let n = unsafe { libc::poll(&mut pollfd, 1, POLL_TIMEOUT_MS) };
if n < 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINTR) {
continue;
}
return;
}
while let Some(item) = ring.next() {
if shutdown_for_thread.load(Ordering::Relaxed) {
return;
}
let bytes = item.as_ref();
if bytes.is_empty() {
continue;
}
let kind_byte = bytes[0];
if kind_byte == EventKind::TcpV4Connect as u8 {
if bytes.len() < std::mem::size_of::<ConnectV4Event>() {
continue;
}
let raw = unsafe {
std::ptr::read_unaligned(bytes.as_ptr() as *const ConnectV4Event)
};
let ev = ConnectEvent::decode(&raw, boot);
if tx.send(EbpfEvent::Connect(ev)).is_err() {
return;
}
}
}
}
})?;
Ok((
EventSource {
_inner: Inner {
_bpf: bpf,
shutdown,
_reader: Some(reader),
},
},
rx,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unsupported_platform_error_renders() {
let s = format!("{}", EbpfError::UnsupportedPlatform);
assert!(s.contains("Linux"));
}
#[test]
fn bpf_object_missing_error_explains_fix() {
let s = format!("{}", EbpfError::BpfObjectMissing);
assert!(s.contains("scripts/build-ebpf.sh"));
}
#[cfg(not(target_os = "linux"))]
#[test]
fn new_on_non_linux_returns_unsupported() {
let result = EventSource::new();
assert!(matches!(result, Err(EbpfError::UnsupportedPlatform)));
}
}