use std::sync::mpsc::Receiver;
use super::event::EbpfEvent;
#[derive(Debug)]
pub enum EbpfError {
UnsupportedPlatform,
BpfObjectMissing,
LoadFailed(String),
AttachFailed(String),
Io(std::io::Error),
}
impl std::fmt::Display for EbpfError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnsupportedPlatform => {
write!(f, "eBPF event source is only available on Linux")
}
Self::BpfObjectMissing => write!(
f,
"BPF object not embedded; run scripts/build-ebpf.sh and \
rebuild netwatch-sdk with --features ebpf"
),
Self::LoadFailed(s) => write!(f, "BPF load failed: {s}"),
Self::AttachFailed(s) => write!(f, "BPF attach failed: {s}"),
Self::Io(e) => write!(f, "io: {e}"),
}
}
}
impl std::error::Error for EbpfError {}
impl From<std::io::Error> for EbpfError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
pub struct EventSource {
_inner: PlatformInner,
}
impl EventSource {
pub fn new() -> Result<(Self, Receiver<EbpfEvent>), EbpfError> {
platform::new()
}
}
#[cfg(target_os = "linux")]
type PlatformInner = linux::Inner;
#[cfg(not(target_os = "linux"))]
type PlatformInner = ();
#[cfg(target_os = "linux")]
mod platform {
use super::*;
pub fn new() -> Result<(EventSource, Receiver<EbpfEvent>), EbpfError> {
super::linux::new()
}
}
#[cfg(not(target_os = "linux"))]
mod platform {
use super::*;
pub fn new() -> Result<(EventSource, Receiver<EbpfEvent>), EbpfError> {
Err(EbpfError::UnsupportedPlatform)
}
}
#[cfg(target_os = "linux")]
mod linux {
use super::*;
use crate::ebpf::event::{estimate_boot_time, ConnectEvent, Protocol};
use aya::{maps::RingBuf, programs::KProbe, Bpf};
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
const BPF_OBJECT: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/netwatch_sdk_ebpf.o"));
const POLL_TIMEOUT_MS: i32 = 100;
pub struct Inner {
_bpf: Bpf,
shutdown: Arc<AtomicBool>,
_reader: Option<JoinHandle<()>>,
}
impl Drop for Inner {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
}
}
fn attach_kprobe(bpf: &mut Bpf, name: &str) -> Result<(), EbpfError> {
let program: &mut KProbe = bpf
.program_mut(name)
.ok_or_else(|| {
EbpfError::LoadFailed(format!("program {name} not found in BPF object"))
})?
.try_into()
.map_err(|e: aya::programs::ProgramError| {
EbpfError::LoadFailed(format!("not a kprobe: {e:?}"))
})?;
program
.load()
.map_err(|e| EbpfError::LoadFailed(format!("{e:?}")))?;
program
.attach(name, 0)
.map_err(|e| EbpfError::AttachFailed(format!("{e:?}")))?;
Ok(())
}
pub fn new() -> Result<(EventSource, Receiver<EbpfEvent>), EbpfError> {
if BPF_OBJECT.is_empty() {
return Err(EbpfError::BpfObjectMissing);
}
let object: Vec<u8> = BPF_OBJECT.to_vec();
let mut bpf =
Bpf::load(&object).map_err(|e| EbpfError::LoadFailed(format!("{e:?}")))?;
attach_kprobe(&mut bpf, "tcp_v4_connect")?;
let _ = attach_kprobe(&mut bpf, "tcp_v6_connect");
let udp_attached = [
"ip4_datagram_connect",
"__ip4_datagram_connect",
"ip6_datagram_connect",
"__ip6_datagram_connect",
]
.into_iter()
.filter(|name| attach_kprobe(&mut bpf, name).is_ok())
.count();
if udp_attached == 0 {
eprintln!(
"netwatch-sdk: no connected-UDP kprobes attached; UDP/QUIC \
process attribution disabled on this kernel"
);
}
let events_map = bpf
.take_map("EVENTS")
.ok_or_else(|| EbpfError::LoadFailed("EVENTS map not found".into()))?;
let mut ring: RingBuf<_> = RingBuf::try_from(events_map)
.map_err(|e| EbpfError::LoadFailed(format!("EVENTS not a RingBuf: {e:?}")))?;
let ring_fd = ring.as_raw_fd();
let (tx, rx) = mpsc::channel::<EbpfEvent>();
let boot = estimate_boot_time();
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_for_thread = shutdown.clone();
let reader = thread::Builder::new()
.name("netwatch-sdk-ebpf-reader".into())
.spawn(move || {
use crate::wire::{ConnectV4Event, ConnectV6Event, EventKind};
let mut pollfd = libc::pollfd {
fd: ring_fd,
events: libc::POLLIN,
revents: 0,
};
loop {
if shutdown_for_thread.load(Ordering::Relaxed) {
return;
}
let n = unsafe { libc::poll(&mut pollfd, 1, POLL_TIMEOUT_MS) };
if n < 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINTR) {
continue;
}
return;
}
while let Some(item) = ring.next() {
if shutdown_for_thread.load(Ordering::Relaxed) {
return;
}
let bytes = item.as_ref();
if bytes.is_empty() {
continue;
}
let kind_byte = bytes[0];
let v4_proto = if kind_byte == EventKind::TcpV4Connect as u8 {
Some(Protocol::Tcp)
} else if kind_byte == EventKind::UdpV4Connect as u8 {
Some(Protocol::Udp)
} else {
None
};
let v6_proto = if kind_byte == EventKind::TcpV6Connect as u8 {
Some(Protocol::Tcp)
} else if kind_byte == EventKind::UdpV6Connect as u8 {
Some(Protocol::Udp)
} else {
None
};
let ev = if let Some(proto) = v4_proto {
if bytes.len() < std::mem::size_of::<ConnectV4Event>() {
continue;
}
let raw = unsafe {
std::ptr::read_unaligned(bytes.as_ptr() as *const ConnectV4Event)
};
ConnectEvent::decode_v4(&raw, boot, proto)
} else if let Some(proto) = v6_proto {
if bytes.len() < std::mem::size_of::<ConnectV6Event>() {
continue;
}
let raw = unsafe {
std::ptr::read_unaligned(bytes.as_ptr() as *const ConnectV6Event)
};
ConnectEvent::decode_v6(&raw, boot, proto)
} else {
continue;
};
if tx.send(EbpfEvent::Connect(ev)).is_err() {
return;
}
}
}
})?;
Ok((
EventSource {
_inner: Inner {
_bpf: bpf,
shutdown,
_reader: Some(reader),
},
},
rx,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unsupported_platform_error_renders() {
let s = format!("{}", EbpfError::UnsupportedPlatform);
assert!(s.contains("Linux"));
}
#[test]
fn bpf_object_missing_error_explains_fix() {
let s = format!("{}", EbpfError::BpfObjectMissing);
assert!(s.contains("scripts/build-ebpf.sh"));
}
#[cfg(not(target_os = "linux"))]
#[test]
fn new_on_non_linux_returns_unsupported() {
let result = EventSource::new();
assert!(matches!(result, Err(EbpfError::UnsupportedPlatform)));
}
}