use crate::tracing::instance::{TraceInstance, TraceInstanceArgs};
use crate::tracing::snapshot::{TraceSnapshot, TraceSummary};
use anyhow::Result;
use futures::future::{select_all, BoxFuture};
use futures::FutureExt;
use ghostscope_loader::GhostScopeLoader;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Notify;
use tracing::{debug, error, info, warn};
#[derive(Debug)]
pub struct TraceManager {
traces: HashMap<u32, TraceInstance>,
next_trace_id: u32,
target_to_trace_id: HashMap<String, u32>, no_trace_wait_notify: Notify, trace_created_times: HashMap<u32, u64>,
}
#[derive(Debug)]
pub struct AddTraceParams {
pub trace_id: u32,
pub target: String,
pub script_content: String,
pub pc: u64,
pub binary_path: String,
pub target_display: String,
pub target_pid: Option<u32>,
pub loader: Option<GhostScopeLoader>,
pub ebpf_function_name: String,
pub address_global_index: Option<usize>,
}
impl TraceManager {
pub fn new() -> Self {
Self {
traces: HashMap::new(),
next_trace_id: ghostscope_protocol::consts::DEFAULT_TRACE_ID as u32,
target_to_trace_id: HashMap::new(),
no_trace_wait_notify: Notify::new(),
trace_created_times: HashMap::new(),
}
}
pub fn get_next_trace_id(&self) -> u32 {
self.next_trace_id
}
pub fn add_trace_with_id(&mut self, params: AddTraceParams) -> u32 {
if params.trace_id >= self.next_trace_id {
self.next_trace_id = params.trace_id + 1;
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.trace_created_times.insert(params.trace_id, now);
let unique_target = format!("{}#{}", params.target, params.trace_id);
let trace_instance = TraceInstance::new(TraceInstanceArgs {
trace_id: params.trace_id,
target: params.target.clone(),
script_content: params.script_content,
pc: params.pc,
binary_path: params.binary_path,
target_display: params.target_display,
target_pid: params.target_pid,
loader: params.loader,
ebpf_function_name: params.ebpf_function_name,
address_global_index: params.address_global_index,
});
self.traces.insert(params.trace_id, trace_instance);
self.target_to_trace_id
.insert(unique_target, params.trace_id);
debug!(
"Added trace {} to manager with target '{}', next_trace_id updated to {}",
params.trace_id, params.target, self.next_trace_id
);
params.trace_id
}
pub fn delete_trace(&mut self, trace_id: u32) -> Result<()> {
if let Some(trace) = self.traces.remove(&trace_id) {
let unique_target = format!("{}#{}", trace.target, trace_id);
self.target_to_trace_id.remove(&unique_target);
self.trace_created_times.remove(&trace_id);
info!("Deleted trace {} with target '{}'", trace_id, trace.target);
Ok(())
} else {
Err(anyhow::anyhow!("Trace {} not found", trace_id))
}
}
pub fn delete_all_traces(&mut self) -> Result<usize> {
let count = self.traces.len();
self.traces.clear();
self.target_to_trace_id.clear();
self.trace_created_times.clear();
info!("Deleted all {} traces", count);
Ok(count)
}
pub fn active_trace_count(&self) -> usize {
self.traces.values().filter(|t| t.is_enabled).count()
}
pub fn get_all_trace_ids(&self) -> Vec<u32> {
self.traces.keys().cloned().collect()
}
pub fn enable_trace(&mut self, trace_id: u32) -> Result<()> {
let was_no_active_traces = self.active_trace_count() == 0;
if let Some(trace) = self.traces.get_mut(&trace_id) {
trace.enable()?;
if was_no_active_traces && self.active_trace_count() == 1 {
self.no_trace_wait_notify.notify_waiters();
}
Ok(())
} else {
Err(anyhow::anyhow!("Trace {} not found", trace_id))
}
}
pub fn disable_trace(&mut self, trace_id: u32) -> Result<()> {
if let Some(trace) = self.traces.get_mut(&trace_id) {
trace.disable()
} else {
Err(anyhow::anyhow!("Trace {} not found", trace_id))
}
}
pub fn enable_all_traces(&mut self) -> Result<()> {
let trace_ids: Vec<u32> = self.traces.keys().cloned().collect();
for trace_id in trace_ids {
if let Err(e) = self.enable_trace(trace_id) {
warn!("Failed to enable trace {}: {}", trace_id, e);
}
}
Ok(())
}
pub fn disable_all_traces(&mut self) -> Result<()> {
let trace_ids: Vec<u32> = self.traces.keys().cloned().collect();
for trace_id in trace_ids {
if let Err(e) = self.disable_trace(trace_id) {
warn!("Failed to disable trace {}: {}", trace_id, e);
}
}
Ok(())
}
pub fn get_trace_snapshot(&self, trace_id: u32) -> Option<TraceSnapshot> {
self.traces.get(&trace_id).map(|trace| TraceSnapshot {
trace_id: trace.trace_id,
target: trace.target.clone(),
script_content: trace.script_content.clone(),
binary_path: trace.binary_path.clone(),
target_display: trace.target_display.clone(),
target_pid: trace.target_pid,
is_enabled: trace.is_enabled,
pc: trace.pc,
ebpf_function_name: trace.ebpf_function_name.clone(),
address_global_index: trace.address_global_index,
})
}
pub fn get_summary(&self) -> TraceSummary {
let total = self.traces.len();
let active = self.active_trace_count();
let disabled = total - active;
TraceSummary {
total,
active,
disabled,
}
}
pub async fn wait_for_first_trace(&self) {
self.no_trace_wait_notify.notified().await;
}
pub async fn wait_for_all_events_async(
&mut self,
) -> anyhow::Result<Vec<ghostscope_protocol::ParsedTraceEvent>> {
loop {
let futures: Vec<
BoxFuture<
'_,
(
u32,
anyhow::Result<Vec<ghostscope_protocol::ParsedTraceEvent>>,
),
>,
> = self
.traces
.iter_mut()
.filter_map(|(&trace_id, trace)| {
if trace.is_enabled {
Some(async move { (trace_id, trace.wait_for_events_async().await) }.boxed())
} else {
None
}
})
.collect();
if futures.is_empty() {
drop(futures);
self.wait_for_first_trace().await;
continue;
}
let ((trace_id, result), _index, remaining) = select_all(futures).await;
let mut aggregated_events = Vec::new();
match result {
Ok(events) => {
aggregated_events.extend(events);
}
Err(e) => {
error!(
"Fatal error waiting for events from trace {}: {}",
trace_id, e
);
return Err(e);
}
}
for future in remaining {
if let Some((trace_id, result)) = future.now_or_never() {
match result {
Ok(events) => {
aggregated_events.extend(events);
}
Err(e) => {
error!(
"Fatal error waiting for events from trace {}: {}",
trace_id, e
);
return Err(e);
}
}
}
}
if !aggregated_events.is_empty() {
return Ok(aggregated_events);
}
}
}
}
impl Default for TraceManager {
fn default() -> Self {
Self::new()
}
}