use crate::deadlock::DeadlockDetector;
use crate::task::{sort_tasks, SortDirection, TaskFilter, TaskId, TaskInfo, TaskSortBy, TaskState};
use crate::timeline::{Event, EventKind, Timeline};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
static GLOBAL_INSPECTOR: once_cell::sync::Lazy<Inspector> =
once_cell::sync::Lazy::new(Inspector::new);
#[derive(Clone)]
pub struct Inspector {
state: Arc<InspectorState>,
}
struct InspectorState {
tasks: RwLock<HashMap<TaskId, TaskInfo>>,
timeline: RwLock<Timeline>,
deadlock_detector: DeadlockDetector,
event_counter: AtomicU64,
enabled: RwLock<bool>,
}
impl Inspector {
#[must_use]
pub fn new() -> Self {
Self {
state: Arc::new(InspectorState {
tasks: RwLock::new(HashMap::new()),
timeline: RwLock::new(Timeline::new()),
deadlock_detector: DeadlockDetector::new(),
event_counter: AtomicU64::new(1),
enabled: RwLock::new(true),
}),
}
}
#[must_use]
pub fn global() -> &'static Self {
&GLOBAL_INSPECTOR
}
#[must_use]
pub fn is_enabled(&self) -> bool {
*self.state.enabled.read()
}
pub fn enable(&self) {
*self.state.enabled.write() = true;
}
pub fn disable(&self) {
*self.state.enabled.write() = false;
}
#[must_use]
pub fn register_task(&self, name: String) -> TaskId {
if !self.is_enabled() {
return TaskId::new();
}
let task = TaskInfo::new(name.clone());
let task_id = task.id;
self.add_event(
task_id,
EventKind::TaskSpawned {
name,
parent: None,
location: None,
},
);
self.state.tasks.write().insert(task_id, task);
task_id
}
#[must_use]
pub fn register_child_task(&self, name: String, parent_id: TaskId) -> TaskId {
if !self.is_enabled() {
return TaskId::new();
}
let mut task = TaskInfo::new(name.clone());
task.parent = Some(parent_id);
let task_id = task.id;
self.add_event(
task_id,
EventKind::TaskSpawned {
name,
parent: Some(parent_id),
location: None,
},
);
self.state.tasks.write().insert(task_id, task);
task_id
}
#[must_use]
pub fn register_task_with_info(&self, task: TaskInfo) -> TaskId {
if !self.is_enabled() {
return task.id;
}
let task_id = task.id;
self.add_event(
task_id,
EventKind::TaskSpawned {
name: task.name.clone(),
parent: task.parent,
location: task.location.clone(),
},
);
self.state.tasks.write().insert(task_id, task);
task_id
}
pub fn update_task_state(&self, task_id: TaskId, new_state: TaskState) {
if !self.is_enabled() {
return;
}
if let Some(task) = self.state.tasks.write().get_mut(&task_id) {
let old_state = task.state.clone();
task.update_state(new_state.clone());
self.add_event(
task_id,
EventKind::StateChanged {
old_state,
new_state,
},
);
}
}
pub fn poll_started(&self, task_id: TaskId) {
if !self.is_enabled() {
return;
}
self.update_task_state(task_id, TaskState::Running);
self.add_event(task_id, EventKind::PollStarted);
}
pub fn poll_ended(&self, task_id: TaskId, duration: Duration) {
if !self.is_enabled() {
return;
}
if let Some(task) = self.state.tasks.write().get_mut(&task_id) {
task.record_poll(duration);
}
self.add_event(task_id, EventKind::PollEnded { duration });
}
pub fn await_started(&self, task_id: TaskId, await_point: String, location: Option<String>) {
if !self.is_enabled() {
return;
}
self.update_task_state(
task_id,
TaskState::Blocked {
await_point: await_point.clone(),
},
);
self.add_event(
task_id,
EventKind::AwaitStarted {
await_point,
location,
},
);
}
pub fn await_ended(&self, task_id: TaskId, await_point: String, duration: Duration) {
if !self.is_enabled() {
return;
}
self.add_event(
task_id,
EventKind::AwaitEnded {
await_point,
duration,
},
);
}
pub fn task_completed(&self, task_id: TaskId) {
if !self.is_enabled() {
return;
}
let duration = {
self.state
.tasks
.read()
.get(&task_id)
.map(super::task::TaskInfo::age)
};
if let Some(duration) = duration {
self.update_task_state(task_id, TaskState::Completed);
self.add_event(task_id, EventKind::TaskCompleted { duration });
}
}
pub fn task_failed(&self, task_id: TaskId, error: Option<String>) {
if !self.is_enabled() {
return;
}
self.update_task_state(task_id, TaskState::Failed);
self.add_event(task_id, EventKind::TaskFailed { error });
}
pub fn inspection_point(&self, task_id: TaskId, label: String, message: Option<String>) {
if !self.is_enabled() {
return;
}
self.add_event(task_id, EventKind::InspectionPoint { label, message });
}
pub fn add_event(&self, task_id: TaskId, kind: EventKind) {
let event_id = self.state.event_counter.fetch_add(1, Ordering::Relaxed);
let event = Event::new(event_id, task_id, kind);
self.state.timeline.write().add_event(event);
}
#[must_use]
pub fn get_task(&self, task_id: TaskId) -> Option<TaskInfo> {
self.state.tasks.read().get(&task_id).cloned()
}
#[must_use]
pub fn get_all_tasks(&self) -> Vec<TaskInfo> {
self.state.tasks.read().values().cloned().collect()
}
#[must_use]
pub fn get_tasks_filtered(&self, filter: &TaskFilter) -> Vec<TaskInfo> {
self.state
.tasks
.read()
.values()
.filter(|t| filter.matches(t))
.cloned()
.collect()
}
#[must_use]
pub fn get_tasks_sorted(
&self,
filter: &TaskFilter,
sort_by: TaskSortBy,
direction: SortDirection,
) -> Vec<TaskInfo> {
let mut tasks = self.get_tasks_filtered(filter);
sort_tasks(&mut tasks, sort_by, direction);
tasks
}
#[must_use]
pub fn get_running_tasks(&self) -> Vec<TaskInfo> {
self.get_tasks_filtered(&TaskFilter::new().with_state(TaskState::Running))
}
#[must_use]
pub fn get_blocked_tasks(&self) -> Vec<TaskInfo> {
self.get_tasks_filtered(&TaskFilter::new().with_state(TaskState::Blocked {
await_point: String::new(),
}))
}
#[must_use]
pub fn get_long_running_tasks(&self, min_duration: Duration) -> Vec<TaskInfo> {
self.get_tasks_filtered(&TaskFilter::new().with_min_duration(min_duration))
}
#[must_use]
pub fn get_root_tasks(&self) -> Vec<TaskInfo> {
self.get_tasks_filtered(&TaskFilter::new().root_only())
}
#[must_use]
pub fn get_child_tasks(&self, parent_id: TaskId) -> Vec<TaskInfo> {
self.get_tasks_filtered(&TaskFilter::new().with_parent(parent_id))
}
#[must_use]
pub fn get_events(&self) -> Vec<Event> {
self.state.timeline.read().events().to_vec()
}
#[must_use]
pub fn get_task_events(&self, task_id: TaskId) -> Vec<Event> {
self.state
.timeline
.read()
.events_for_task(task_id)
.into_iter()
.cloned()
.collect()
}
#[must_use]
pub fn build_profiler(&self) -> crate::profile::Profiler {
use crate::profile::{Profiler, TaskMetrics};
use crate::timeline::EventKind;
let mut profiler = Profiler::new();
let tasks = self.state.tasks.read();
let timeline = self.state.timeline.read();
for task in tasks.values() {
let mut metrics = TaskMetrics::new(task.id, task.name.clone());
metrics.total_duration = task.age();
metrics.running_time = task.total_run_time;
metrics.blocked_time = if metrics.total_duration > task.total_run_time {
metrics.total_duration - task.total_run_time
} else {
Duration::ZERO
};
metrics.poll_count = task.poll_count;
if task.poll_count > 0 {
metrics.avg_poll_duration = task.total_run_time / task.poll_count as u32;
}
metrics.completed = matches!(task.state, TaskState::Completed);
let task_events: Vec<&Event> = timeline
.events()
.iter()
.filter(|e| e.task_id == task.id)
.collect();
let mut await_start_times: HashMap<String, std::time::Instant> = HashMap::new();
for event in task_events {
match &event.kind {
EventKind::AwaitStarted { await_point, .. } => {
await_start_times.insert(await_point.clone(), event.timestamp);
}
EventKind::AwaitEnded { await_point, .. } => {
if let Some(start_time) = await_start_times.remove(&await_point.clone()) {
let duration = event.timestamp.duration_since(start_time);
metrics.await_durations.push(duration);
metrics.await_count += 1;
}
}
_ => {}
}
}
profiler.record_task(metrics);
}
profiler
}
#[must_use]
pub fn stats(&self) -> InspectorStats {
let tasks = self.state.tasks.read();
let timeline = self.state.timeline.read();
let total = tasks.len();
let pending = tasks
.values()
.filter(|t| matches!(t.state, TaskState::Pending))
.count();
let running = tasks
.values()
.filter(|t| matches!(t.state, TaskState::Running))
.count();
let blocked = tasks
.values()
.filter(|t| matches!(t.state, TaskState::Blocked { .. }))
.count();
let completed = tasks
.values()
.filter(|t| matches!(t.state, TaskState::Completed))
.count();
let failed = tasks
.values()
.filter(|t| matches!(t.state, TaskState::Failed))
.count();
InspectorStats {
total_tasks: total,
pending_tasks: pending,
running_tasks: running,
blocked_tasks: blocked,
completed_tasks: completed,
failed_tasks: failed,
total_events: timeline.len(),
timeline_duration: timeline.duration(),
}
}
pub fn clear(&self) {
self.state.tasks.write().clear();
self.state.timeline.write().clear();
self.state.event_counter.store(1, Ordering::Relaxed);
}
pub fn reset(&self) {
self.clear();
self.enable();
}
#[must_use]
pub fn deadlock_detector(&self) -> &DeadlockDetector {
&self.state.deadlock_detector
}
}
impl Default for Inspector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct InspectorStats {
pub total_tasks: usize,
pub pending_tasks: usize,
pub running_tasks: usize,
pub blocked_tasks: usize,
pub completed_tasks: usize,
pub failed_tasks: usize,
pub total_events: usize,
pub timeline_duration: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_inspector_creation() {
let inspector = Inspector::new();
assert!(inspector.is_enabled());
}
#[test]
fn test_register_task() {
let inspector = Inspector::new();
let task_id = inspector.register_task("test_task".to_string());
let task = inspector.get_task(task_id).unwrap();
assert_eq!(task.name, "test_task");
}
#[test]
fn test_task_lifecycle() {
let inspector = Inspector::new();
let task_id = inspector.register_task("test".to_string());
inspector.poll_started(task_id);
inspector.poll_ended(task_id, Duration::from_millis(10));
inspector.task_completed(task_id);
let task = inspector.get_task(task_id).unwrap();
assert_eq!(task.state, TaskState::Completed);
assert_eq!(task.poll_count, 1);
}
#[test]
fn test_stats() {
let inspector = Inspector::new();
inspector.register_task("task1".to_string());
inspector.register_task("task2".to_string());
let stats = inspector.stats();
assert_eq!(stats.total_tasks, 2);
}
}