use anyhow::Result;
use libbpf_rs::Link;
use log::{debug, info, warn};
use serde_json::Value as JsonValue;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
#[derive(Debug)]
pub enum StatsControlCommand {
Start(u64), Stop,
}
pub type AttachCallback = Box<dyn Fn(&[&str]) -> Result<Vec<Link>> + Send + Sync>;
pub struct EventControl {
bpf_links: Arc<Mutex<Option<Vec<Link>>>>,
attach_callback: Arc<Mutex<Option<AttachCallback>>>,
event_tracking_enabled: Arc<Mutex<bool>>,
stats_control_tx: Option<mpsc::UnboundedSender<StatsControlCommand>>,
stats_collection_running: Arc<Mutex<bool>>,
stats_collection_interval_ms: Arc<Mutex<u64>>,
}
unsafe impl Send for EventControl {}
unsafe impl Sync for EventControl {}
impl EventControl {
pub fn new() -> Self {
Self {
bpf_links: Arc::new(Mutex::new(None)),
attach_callback: Arc::new(Mutex::new(None)),
event_tracking_enabled: Arc::new(Mutex::new(false)),
stats_control_tx: None,
stats_collection_running: Arc::new(Mutex::new(false)),
stats_collection_interval_ms: Arc::new(Mutex::new(100)),
}
}
pub fn set_bpf_links(&self, links: Vec<Link>, attach_callback: AttachCallback) {
*self.bpf_links.lock().unwrap() = Some(links);
*self.attach_callback.lock().unwrap() = Some(attach_callback);
*self.event_tracking_enabled.lock().unwrap() = true;
info!(
"BPF links initialized ({} programs attached)",
self.bpf_links
.lock()
.unwrap()
.as_ref()
.map(|l| l.len())
.unwrap_or(0)
);
}
pub fn set_stats_control_channel(&mut self, tx: mpsc::UnboundedSender<StatsControlCommand>) {
self.stats_control_tx = Some(tx);
}
pub fn enable_event_tracking(&self, program_names: &[&str]) -> Result<()> {
let mut links_guard = self.bpf_links.lock().unwrap();
if links_guard.is_some() {
info!("BPF event tracking already enabled");
return Ok(());
}
let callback_guard = self.attach_callback.lock().unwrap();
if let Some(ref callback) = *callback_guard {
if program_names.is_empty() {
info!("Reattaching all BPF programs...");
} else {
info!(
"Reattaching {} BPF programs: {:?}",
program_names.len(),
program_names
);
}
match callback(program_names) {
Ok(new_links) => {
let count = new_links.len();
*links_guard = Some(new_links);
*self.event_tracking_enabled.lock().unwrap() = true;
info!("BPF event tracking enabled ({} programs attached)", count);
Ok(())
}
Err(e) => {
warn!("Failed to reattach BPF programs: {}", e);
Err(anyhow::anyhow!("Failed to reattach BPF programs: {}", e))
}
}
} else {
Err(anyhow::anyhow!("No attach callback configured"))
}
}
pub fn disable_event_tracking(&self) -> Result<()> {
let mut links_guard = self.bpf_links.lock().unwrap();
if links_guard.is_none() {
info!("BPF event tracking already disabled");
return Ok(());
}
let count = links_guard.as_ref().map(|l| l.len()).unwrap_or(0);
*links_guard = None;
*self.event_tracking_enabled.lock().unwrap() = false;
info!("BPF event tracking disabled ({} programs detached)", count);
debug!("BPF programs unloaded, zero overhead from tracepoints");
Ok(())
}
pub fn is_event_tracking_enabled(&self) -> bool {
self.bpf_links.lock().unwrap().is_some()
}
pub fn start_stats_collection(&self, interval_ms: u64) -> Result<()> {
if let Some(ref tx) = self.stats_control_tx {
tx.send(StatsControlCommand::Start(interval_ms))
.map_err(|e| anyhow::anyhow!("Failed to send start command: {}", e))?;
*self.stats_collection_running.lock().unwrap() = true;
*self.stats_collection_interval_ms.lock().unwrap() = interval_ms;
info!(
"BPF stats collection started with {}ms interval",
interval_ms
);
Ok(())
} else {
Err(anyhow::anyhow!("Stats control channel not configured"))
}
}
pub fn stop_stats_collection(&self) -> Result<()> {
if let Some(ref tx) = self.stats_control_tx {
tx.send(StatsControlCommand::Stop)
.map_err(|e| anyhow::anyhow!("Failed to send stop command: {}", e))?;
*self.stats_collection_running.lock().unwrap() = false;
info!("BPF stats collection stopped");
Ok(())
} else {
Err(anyhow::anyhow!("Stats control channel not configured"))
}
}
pub fn is_stats_collection_running(&self) -> bool {
*self.stats_collection_running.lock().unwrap()
}
pub fn get_stats_collection_interval_ms(&self) -> u64 {
*self.stats_collection_interval_ms.lock().unwrap()
}
pub fn get_status_json(&self) -> JsonValue {
let links_count = self
.bpf_links
.lock()
.unwrap()
.as_ref()
.map(|l| l.len())
.unwrap_or(0);
serde_json::json!({
"event_tracking": {
"enabled": self.is_event_tracking_enabled(),
"programs_attached": links_count,
"description": if self.is_event_tracking_enabled() {
format!("BPF programs attached ({} active), collecting scheduler events", links_count)
} else {
"BPF programs detached, zero tracepoint overhead".to_string()
}
},
"stats_collection": {
"running": self.is_stats_collection_running(),
"interval_ms": if self.is_stats_collection_running() {
Some(self.get_stats_collection_interval_ms())
} else {
None
},
"description": if self.is_stats_collection_running() {
format!("BPF stats collected every {}ms", self.get_stats_collection_interval_ms())
} else {
"BPF stats collection stopped".to_string()
}
},
"note": "Use control_event_tracking and control_stats_collection tools to manage these settings"
})
}
}
impl Default for EventControl {
fn default() -> Self {
Self::new()
}
}
pub type SharedEventControl = Arc<EventControl>;
pub fn create_event_control() -> SharedEventControl {
Arc::new(EventControl::new())
}