1use crate::deadlock::DeadlockDetector;
7use crate::task::{sort_tasks, SortDirection, TaskFilter, TaskId, TaskInfo, TaskSortBy, TaskState};
8use crate::timeline::{Event, EventKind, Timeline};
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15static GLOBAL_INSPECTOR: once_cell::sync::Lazy<Inspector> =
17 once_cell::sync::Lazy::new(Inspector::new);
18
19#[derive(Clone)]
21pub struct Inspector {
22 state: Arc<InspectorState>,
24}
25
26struct InspectorState {
27 tasks: RwLock<HashMap<TaskId, TaskInfo>>,
29
30 timeline: RwLock<Timeline>,
32
33 deadlock_detector: DeadlockDetector,
35
36 event_counter: AtomicU64,
38
39 enabled: RwLock<bool>,
41}
42
43impl Inspector {
44 #[must_use]
46 pub fn new() -> Self {
47 Self {
48 state: Arc::new(InspectorState {
49 tasks: RwLock::new(HashMap::new()),
50 timeline: RwLock::new(Timeline::new()),
51 deadlock_detector: DeadlockDetector::new(),
52 event_counter: AtomicU64::new(1),
53 enabled: RwLock::new(true),
54 }),
55 }
56 }
57
58 #[must_use]
60 pub fn global() -> &'static Self {
61 &GLOBAL_INSPECTOR
62 }
63
64 #[must_use]
66 pub fn is_enabled(&self) -> bool {
67 *self.state.enabled.read()
68 }
69
70 pub fn enable(&self) {
72 *self.state.enabled.write() = true;
73 }
74
75 pub fn disable(&self) {
77 *self.state.enabled.write() = false;
78 }
79
80 #[must_use]
82 pub fn register_task(&self, name: String) -> TaskId {
83 if !self.is_enabled() {
84 return TaskId::new();
85 }
86
87 let task = TaskInfo::new(name.clone());
88 let task_id = task.id;
89
90 self.add_event(
92 task_id,
93 EventKind::TaskSpawned {
94 name,
95 parent: None,
96 location: None,
97 },
98 );
99
100 self.state.tasks.write().insert(task_id, task);
102
103 task_id
104 }
105
106 #[must_use]
108 pub fn register_child_task(&self, name: String, parent_id: TaskId) -> TaskId {
109 if !self.is_enabled() {
110 return TaskId::new();
111 }
112
113 let mut task = TaskInfo::new(name.clone());
114 task.parent = Some(parent_id);
115 let task_id = task.id;
116
117 self.add_event(
119 task_id,
120 EventKind::TaskSpawned {
121 name,
122 parent: Some(parent_id),
123 location: None,
124 },
125 );
126
127 self.state.tasks.write().insert(task_id, task);
129
130 task_id
131 }
132
133 #[must_use]
135 pub fn register_task_with_info(&self, task: TaskInfo) -> TaskId {
136 if !self.is_enabled() {
137 return task.id;
138 }
139
140 let task_id = task.id;
141
142 self.add_event(
144 task_id,
145 EventKind::TaskSpawned {
146 name: task.name.clone(),
147 parent: task.parent,
148 location: task.location.clone(),
149 },
150 );
151
152 self.state.tasks.write().insert(task_id, task);
154
155 task_id
156 }
157
158 pub fn update_task_state(&self, task_id: TaskId, new_state: TaskState) {
160 if !self.is_enabled() {
161 return;
162 }
163
164 if let Some(task) = self.state.tasks.write().get_mut(&task_id) {
165 let old_state = task.state.clone();
166 task.update_state(new_state.clone());
167
168 self.add_event(
170 task_id,
171 EventKind::StateChanged {
172 old_state,
173 new_state,
174 },
175 );
176 }
177 }
178
179 pub fn poll_started(&self, task_id: TaskId) {
181 if !self.is_enabled() {
182 return;
183 }
184
185 self.update_task_state(task_id, TaskState::Running);
186 self.add_event(task_id, EventKind::PollStarted);
187 }
188
189 pub fn poll_ended(&self, task_id: TaskId, duration: Duration) {
191 if !self.is_enabled() {
192 return;
193 }
194
195 if let Some(task) = self.state.tasks.write().get_mut(&task_id) {
196 task.record_poll(duration);
197 }
198
199 self.add_event(task_id, EventKind::PollEnded { duration });
200 }
201
202 pub fn await_started(&self, task_id: TaskId, await_point: String, location: Option<String>) {
204 if !self.is_enabled() {
205 return;
206 }
207
208 self.update_task_state(
209 task_id,
210 TaskState::Blocked {
211 await_point: await_point.clone(),
212 },
213 );
214
215 self.add_event(
216 task_id,
217 EventKind::AwaitStarted {
218 await_point,
219 location,
220 },
221 );
222 }
223
224 pub fn await_ended(&self, task_id: TaskId, await_point: String, duration: Duration) {
226 if !self.is_enabled() {
227 return;
228 }
229
230 self.add_event(
231 task_id,
232 EventKind::AwaitEnded {
233 await_point,
234 duration,
235 },
236 );
237 }
238
239 pub fn task_completed(&self, task_id: TaskId) {
241 if !self.is_enabled() {
242 return;
243 }
244
245 let duration = {
247 self.state
248 .tasks
249 .read()
250 .get(&task_id)
251 .map(super::task::TaskInfo::age)
252 };
253
254 if let Some(duration) = duration {
255 self.update_task_state(task_id, TaskState::Completed);
256 self.add_event(task_id, EventKind::TaskCompleted { duration });
257 }
258 }
259
260 pub fn task_failed(&self, task_id: TaskId, error: Option<String>) {
262 if !self.is_enabled() {
263 return;
264 }
265
266 self.update_task_state(task_id, TaskState::Failed);
267 self.add_event(task_id, EventKind::TaskFailed { error });
268 }
269
270 pub fn inspection_point(&self, task_id: TaskId, label: String, message: Option<String>) {
272 if !self.is_enabled() {
273 return;
274 }
275
276 self.add_event(task_id, EventKind::InspectionPoint { label, message });
277 }
278
279 pub fn add_event(&self, task_id: TaskId, kind: EventKind) {
281 let event_id = self.state.event_counter.fetch_add(1, Ordering::Relaxed);
282 let event = Event::new(event_id, task_id, kind);
283 self.state.timeline.write().add_event(event);
284 }
285
286 #[must_use]
288 pub fn get_task(&self, task_id: TaskId) -> Option<TaskInfo> {
289 self.state.tasks.read().get(&task_id).cloned()
290 }
291
292 #[must_use]
294 pub fn get_all_tasks(&self) -> Vec<TaskInfo> {
295 self.state.tasks.read().values().cloned().collect()
296 }
297
298 #[must_use]
316 pub fn get_tasks_filtered(&self, filter: &TaskFilter) -> Vec<TaskInfo> {
317 self.state
318 .tasks
319 .read()
320 .values()
321 .filter(|t| filter.matches(t))
322 .cloned()
323 .collect()
324 }
325
326 #[must_use]
340 pub fn get_tasks_sorted(
341 &self,
342 filter: &TaskFilter,
343 sort_by: TaskSortBy,
344 direction: SortDirection,
345 ) -> Vec<TaskInfo> {
346 let mut tasks = self.get_tasks_filtered(filter);
347 sort_tasks(&mut tasks, sort_by, direction);
348 tasks
349 }
350
351 #[must_use]
353 pub fn get_running_tasks(&self) -> Vec<TaskInfo> {
354 self.get_tasks_filtered(&TaskFilter::new().with_state(TaskState::Running))
355 }
356
357 #[must_use]
359 pub fn get_blocked_tasks(&self) -> Vec<TaskInfo> {
360 self.get_tasks_filtered(&TaskFilter::new().with_state(TaskState::Blocked {
361 await_point: String::new(),
362 }))
363 }
364
365 #[must_use]
367 pub fn get_long_running_tasks(&self, min_duration: Duration) -> Vec<TaskInfo> {
368 self.get_tasks_filtered(&TaskFilter::new().with_min_duration(min_duration))
369 }
370
371 #[must_use]
373 pub fn get_root_tasks(&self) -> Vec<TaskInfo> {
374 self.get_tasks_filtered(&TaskFilter::new().root_only())
375 }
376
377 #[must_use]
379 pub fn get_child_tasks(&self, parent_id: TaskId) -> Vec<TaskInfo> {
380 self.get_tasks_filtered(&TaskFilter::new().with_parent(parent_id))
381 }
382
383 #[must_use]
385 pub fn get_events(&self) -> Vec<Event> {
386 self.state.timeline.read().events().to_vec()
387 }
388
389 #[must_use]
391 pub fn get_task_events(&self, task_id: TaskId) -> Vec<Event> {
392 self.state
393 .timeline
394 .read()
395 .events_for_task(task_id)
396 .into_iter()
397 .cloned()
398 .collect()
399 }
400
401 #[must_use]
403 pub fn build_profiler(&self) -> crate::profile::Profiler {
404 use crate::profile::{Profiler, TaskMetrics};
405 use crate::timeline::EventKind;
406
407 let mut profiler = Profiler::new();
408 let tasks = self.state.tasks.read();
409 let timeline = self.state.timeline.read();
410
411 for task in tasks.values() {
412 let mut metrics = TaskMetrics::new(task.id, task.name.clone());
413
414 metrics.total_duration = task.age();
416 metrics.running_time = task.total_run_time;
417 metrics.blocked_time = if metrics.total_duration > task.total_run_time {
418 metrics.total_duration - task.total_run_time
419 } else {
420 Duration::ZERO
421 };
422
423 metrics.poll_count = task.poll_count;
425
426 if task.poll_count > 0 {
428 metrics.avg_poll_duration = task.total_run_time / task.poll_count as u32;
429 }
430
431 metrics.completed = matches!(task.state, TaskState::Completed);
433
434 let task_events: Vec<&Event> = timeline
436 .events()
437 .iter()
438 .filter(|e| e.task_id == task.id)
439 .collect();
440
441 let mut await_start_times: HashMap<String, std::time::Instant> = HashMap::new();
442
443 for event in task_events {
444 match &event.kind {
445 EventKind::AwaitStarted { await_point, .. } => {
446 await_start_times.insert(await_point.clone(), event.timestamp);
447 }
448 EventKind::AwaitEnded { await_point, .. } => {
449 if let Some(start_time) = await_start_times.remove(&await_point.clone()) {
450 let duration = event.timestamp.duration_since(start_time);
451 metrics.await_durations.push(duration);
452 metrics.await_count += 1;
453 }
454 }
455 _ => {}
456 }
457 }
458
459 profiler.record_task(metrics);
460 }
461
462 profiler
463 }
464
465 #[must_use]
467 pub fn stats(&self) -> InspectorStats {
468 let tasks = self.state.tasks.read();
469 let timeline = self.state.timeline.read();
470
471 let total = tasks.len();
472 let pending = tasks
473 .values()
474 .filter(|t| matches!(t.state, TaskState::Pending))
475 .count();
476 let running = tasks
477 .values()
478 .filter(|t| matches!(t.state, TaskState::Running))
479 .count();
480 let blocked = tasks
481 .values()
482 .filter(|t| matches!(t.state, TaskState::Blocked { .. }))
483 .count();
484 let completed = tasks
485 .values()
486 .filter(|t| matches!(t.state, TaskState::Completed))
487 .count();
488 let failed = tasks
489 .values()
490 .filter(|t| matches!(t.state, TaskState::Failed))
491 .count();
492
493 InspectorStats {
494 total_tasks: total,
495 pending_tasks: pending,
496 running_tasks: running,
497 blocked_tasks: blocked,
498 completed_tasks: completed,
499 failed_tasks: failed,
500 total_events: timeline.len(),
501 timeline_duration: timeline.duration(),
502 }
503 }
504
505 pub fn clear(&self) {
507 self.state.tasks.write().clear();
508 self.state.timeline.write().clear();
509 self.state.event_counter.store(1, Ordering::Relaxed);
510 }
511
512 pub fn reset(&self) {
514 self.clear();
515 self.enable();
516 }
517
518 #[must_use]
523 pub fn deadlock_detector(&self) -> &DeadlockDetector {
524 &self.state.deadlock_detector
525 }
526}
527
528impl Default for Inspector {
529 fn default() -> Self {
530 Self::new()
531 }
532}
533
534#[derive(Debug, Clone)]
536pub struct InspectorStats {
537 pub total_tasks: usize,
539 pub pending_tasks: usize,
541 pub running_tasks: usize,
543 pub blocked_tasks: usize,
545 pub completed_tasks: usize,
547 pub failed_tasks: usize,
549 pub total_events: usize,
551 pub timeline_duration: Duration,
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[test]
560 fn test_inspector_creation() {
561 let inspector = Inspector::new();
562 assert!(inspector.is_enabled());
563 }
564
565 #[test]
566 fn test_register_task() {
567 let inspector = Inspector::new();
568 let task_id = inspector.register_task("test_task".to_string());
569 let task = inspector.get_task(task_id).unwrap();
570 assert_eq!(task.name, "test_task");
571 }
572
573 #[test]
574 fn test_task_lifecycle() {
575 let inspector = Inspector::new();
576 let task_id = inspector.register_task("test".to_string());
577
578 inspector.poll_started(task_id);
579 inspector.poll_ended(task_id, Duration::from_millis(10));
580 inspector.task_completed(task_id);
581
582 let task = inspector.get_task(task_id).unwrap();
583 assert_eq!(task.state, TaskState::Completed);
584 assert_eq!(task.poll_count, 1);
585 }
586
587 #[test]
588 fn test_stats() {
589 let inspector = Inspector::new();
590 inspector.register_task("task1".to_string());
591 inspector.register_task("task2".to_string());
592
593 let stats = inspector.stats();
594 assert_eq!(stats.total_tasks, 2);
595 }
596}