async_inspect/inspector/
mod.rs

1//! Core inspection functionality
2//!
3//! This module provides the main `Inspector` type that manages task tracking
4//! and event collection.
5
6use crate::deadlock::DeadlockDetector;
7use crate::task::{sort_tasks, SortDirection, TaskFilter, TaskId, TaskInfo, TaskSortBy, TaskState};
8use crate::timeline::{Event, EventKind, Timeline};
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15/// Global inspector instance
16static GLOBAL_INSPECTOR: once_cell::sync::Lazy<Inspector> =
17    once_cell::sync::Lazy::new(Inspector::new);
18
19/// Main inspector for tracking async execution
20#[derive(Clone)]
21pub struct Inspector {
22    /// Shared state
23    state: Arc<InspectorState>,
24}
25
26struct InspectorState {
27    /// All tracked tasks
28    tasks: RwLock<HashMap<TaskId, TaskInfo>>,
29
30    /// Timeline of events
31    timeline: RwLock<Timeline>,
32
33    /// Deadlock detector
34    deadlock_detector: DeadlockDetector,
35
36    /// Event counter for unique IDs
37    event_counter: AtomicU64,
38
39    /// Whether the inspector is enabled
40    enabled: RwLock<bool>,
41}
42
43impl Inspector {
44    /// Create a new inspector
45    #[must_use]
46    pub fn new() -> Self {
47        Self {
48            state: Arc::new(InspectorState {
49                tasks: RwLock::new(HashMap::new()),
50                timeline: RwLock::new(Timeline::new()),
51                deadlock_detector: DeadlockDetector::new(),
52                event_counter: AtomicU64::new(1),
53                enabled: RwLock::new(true),
54            }),
55        }
56    }
57
58    /// Get the global inspector instance
59    #[must_use]
60    pub fn global() -> &'static Self {
61        &GLOBAL_INSPECTOR
62    }
63
64    /// Check if the inspector is enabled
65    #[must_use]
66    pub fn is_enabled(&self) -> bool {
67        *self.state.enabled.read()
68    }
69
70    /// Enable the inspector
71    pub fn enable(&self) {
72        *self.state.enabled.write() = true;
73    }
74
75    /// Disable the inspector
76    pub fn disable(&self) {
77        *self.state.enabled.write() = false;
78    }
79
80    /// Register a new task
81    #[must_use]
82    pub fn register_task(&self, name: String) -> TaskId {
83        if !self.is_enabled() {
84            return TaskId::new();
85        }
86
87        let task = TaskInfo::new(name.clone());
88        let task_id = task.id;
89
90        // Add event
91        self.add_event(
92            task_id,
93            EventKind::TaskSpawned {
94                name,
95                parent: None,
96                location: None,
97            },
98        );
99
100        // Store task
101        self.state.tasks.write().insert(task_id, task);
102
103        task_id
104    }
105
106    /// Register a child task with a parent
107    #[must_use]
108    pub fn register_child_task(&self, name: String, parent_id: TaskId) -> TaskId {
109        if !self.is_enabled() {
110            return TaskId::new();
111        }
112
113        let mut task = TaskInfo::new(name.clone());
114        task.parent = Some(parent_id);
115        let task_id = task.id;
116
117        // Add event
118        self.add_event(
119            task_id,
120            EventKind::TaskSpawned {
121                name,
122                parent: Some(parent_id),
123                location: None,
124            },
125        );
126
127        // Store task
128        self.state.tasks.write().insert(task_id, task);
129
130        task_id
131    }
132
133    /// Register a task with additional metadata
134    #[must_use]
135    pub fn register_task_with_info(&self, task: TaskInfo) -> TaskId {
136        if !self.is_enabled() {
137            return task.id;
138        }
139
140        let task_id = task.id;
141
142        // Add event
143        self.add_event(
144            task_id,
145            EventKind::TaskSpawned {
146                name: task.name.clone(),
147                parent: task.parent,
148                location: task.location.clone(),
149            },
150        );
151
152        // Store task
153        self.state.tasks.write().insert(task_id, task);
154
155        task_id
156    }
157
158    /// Update task state
159    pub fn update_task_state(&self, task_id: TaskId, new_state: TaskState) {
160        if !self.is_enabled() {
161            return;
162        }
163
164        if let Some(task) = self.state.tasks.write().get_mut(&task_id) {
165            let old_state = task.state.clone();
166            task.update_state(new_state.clone());
167
168            // Add event
169            self.add_event(
170                task_id,
171                EventKind::StateChanged {
172                    old_state,
173                    new_state,
174                },
175            );
176        }
177    }
178
179    /// Record a poll start
180    pub fn poll_started(&self, task_id: TaskId) {
181        if !self.is_enabled() {
182            return;
183        }
184
185        self.update_task_state(task_id, TaskState::Running);
186        self.add_event(task_id, EventKind::PollStarted);
187    }
188
189    /// Record a poll end
190    pub fn poll_ended(&self, task_id: TaskId, duration: Duration) {
191        if !self.is_enabled() {
192            return;
193        }
194
195        if let Some(task) = self.state.tasks.write().get_mut(&task_id) {
196            task.record_poll(duration);
197        }
198
199        self.add_event(task_id, EventKind::PollEnded { duration });
200    }
201
202    /// Record an await start
203    pub fn await_started(&self, task_id: TaskId, await_point: String, location: Option<String>) {
204        if !self.is_enabled() {
205            return;
206        }
207
208        self.update_task_state(
209            task_id,
210            TaskState::Blocked {
211                await_point: await_point.clone(),
212            },
213        );
214
215        self.add_event(
216            task_id,
217            EventKind::AwaitStarted {
218                await_point,
219                location,
220            },
221        );
222    }
223
224    /// Record an await end
225    pub fn await_ended(&self, task_id: TaskId, await_point: String, duration: Duration) {
226        if !self.is_enabled() {
227            return;
228        }
229
230        self.add_event(
231            task_id,
232            EventKind::AwaitEnded {
233                await_point,
234                duration,
235            },
236        );
237    }
238
239    /// Mark task as completed
240    pub fn task_completed(&self, task_id: TaskId) {
241        if !self.is_enabled() {
242            return;
243        }
244
245        // Get duration while holding read lock, then release it
246        let duration = {
247            self.state
248                .tasks
249                .read()
250                .get(&task_id)
251                .map(super::task::TaskInfo::age)
252        };
253
254        if let Some(duration) = duration {
255            self.update_task_state(task_id, TaskState::Completed);
256            self.add_event(task_id, EventKind::TaskCompleted { duration });
257        }
258    }
259
260    /// Mark task as failed
261    pub fn task_failed(&self, task_id: TaskId, error: Option<String>) {
262        if !self.is_enabled() {
263            return;
264        }
265
266        self.update_task_state(task_id, TaskState::Failed);
267        self.add_event(task_id, EventKind::TaskFailed { error });
268    }
269
270    /// Record an inspection point
271    pub fn inspection_point(&self, task_id: TaskId, label: String, message: Option<String>) {
272        if !self.is_enabled() {
273            return;
274        }
275
276        self.add_event(task_id, EventKind::InspectionPoint { label, message });
277    }
278
279    /// Add an event to the timeline
280    pub fn add_event(&self, task_id: TaskId, kind: EventKind) {
281        let event_id = self.state.event_counter.fetch_add(1, Ordering::Relaxed);
282        let event = Event::new(event_id, task_id, kind);
283        self.state.timeline.write().add_event(event);
284    }
285
286    /// Get a task by ID
287    #[must_use]
288    pub fn get_task(&self, task_id: TaskId) -> Option<TaskInfo> {
289        self.state.tasks.read().get(&task_id).cloned()
290    }
291
292    /// Get all tasks
293    #[must_use]
294    pub fn get_all_tasks(&self) -> Vec<TaskInfo> {
295        self.state.tasks.read().values().cloned().collect()
296    }
297
298    /// Get tasks matching a filter
299    ///
300    /// # Example
301    ///
302    /// ```rust
303    /// use async_inspect::{Inspector, task::{TaskFilter, TaskState}};
304    /// use std::time::Duration;
305    ///
306    /// let inspector = Inspector::new();
307    ///
308    /// // Find all running tasks with "fetch" in the name
309    /// let filter = TaskFilter::new()
310    ///     .with_state(TaskState::Running)
311    ///     .with_name_pattern("fetch");
312    ///
313    /// let tasks = inspector.get_tasks_filtered(&filter);
314    /// ```
315    #[must_use]
316    pub fn get_tasks_filtered(&self, filter: &TaskFilter) -> Vec<TaskInfo> {
317        self.state
318            .tasks
319            .read()
320            .values()
321            .filter(|t| filter.matches(t))
322            .cloned()
323            .collect()
324    }
325
326    /// Get tasks matching a filter, sorted by the given criteria
327    ///
328    /// # Example
329    ///
330    /// ```rust
331    /// use async_inspect::{Inspector, task::{TaskFilter, TaskState, TaskSortBy, SortDirection}};
332    ///
333    /// let inspector = Inspector::new();
334    ///
335    /// // Get blocked tasks sorted by age (oldest first)
336    /// let filter = TaskFilter::new().with_state(TaskState::Blocked { await_point: String::new() });
337    /// let tasks = inspector.get_tasks_sorted(&filter, TaskSortBy::Age, SortDirection::Descending);
338    /// ```
339    #[must_use]
340    pub fn get_tasks_sorted(
341        &self,
342        filter: &TaskFilter,
343        sort_by: TaskSortBy,
344        direction: SortDirection,
345    ) -> Vec<TaskInfo> {
346        let mut tasks = self.get_tasks_filtered(filter);
347        sort_tasks(&mut tasks, sort_by, direction);
348        tasks
349    }
350
351    /// Get running tasks
352    #[must_use]
353    pub fn get_running_tasks(&self) -> Vec<TaskInfo> {
354        self.get_tasks_filtered(&TaskFilter::new().with_state(TaskState::Running))
355    }
356
357    /// Get blocked tasks
358    #[must_use]
359    pub fn get_blocked_tasks(&self) -> Vec<TaskInfo> {
360        self.get_tasks_filtered(&TaskFilter::new().with_state(TaskState::Blocked {
361            await_point: String::new(),
362        }))
363    }
364
365    /// Get long-running tasks (older than the specified duration)
366    #[must_use]
367    pub fn get_long_running_tasks(&self, min_duration: Duration) -> Vec<TaskInfo> {
368        self.get_tasks_filtered(&TaskFilter::new().with_min_duration(min_duration))
369    }
370
371    /// Get root tasks (tasks without a parent)
372    #[must_use]
373    pub fn get_root_tasks(&self) -> Vec<TaskInfo> {
374        self.get_tasks_filtered(&TaskFilter::new().root_only())
375    }
376
377    /// Get child tasks of a specific parent
378    #[must_use]
379    pub fn get_child_tasks(&self, parent_id: TaskId) -> Vec<TaskInfo> {
380        self.get_tasks_filtered(&TaskFilter::new().with_parent(parent_id))
381    }
382
383    /// Get all events
384    #[must_use]
385    pub fn get_events(&self) -> Vec<Event> {
386        self.state.timeline.read().events().to_vec()
387    }
388
389    /// Get events for a specific task
390    #[must_use]
391    pub fn get_task_events(&self, task_id: TaskId) -> Vec<Event> {
392        self.state
393            .timeline
394            .read()
395            .events_for_task(task_id)
396            .into_iter()
397            .cloned()
398            .collect()
399    }
400
401    /// Build a performance profiler from collected data
402    #[must_use]
403    pub fn build_profiler(&self) -> crate::profile::Profiler {
404        use crate::profile::{Profiler, TaskMetrics};
405        use crate::timeline::EventKind;
406
407        let mut profiler = Profiler::new();
408        let tasks = self.state.tasks.read();
409        let timeline = self.state.timeline.read();
410
411        for task in tasks.values() {
412            let mut metrics = TaskMetrics::new(task.id, task.name.clone());
413
414            // Calculate durations
415            metrics.total_duration = task.age();
416            metrics.running_time = task.total_run_time;
417            metrics.blocked_time = if metrics.total_duration > task.total_run_time {
418                metrics.total_duration - task.total_run_time
419            } else {
420                Duration::ZERO
421            };
422
423            // Set poll count
424            metrics.poll_count = task.poll_count;
425
426            // Calculate average poll duration
427            if task.poll_count > 0 {
428                metrics.avg_poll_duration = task.total_run_time / task.poll_count as u32;
429            }
430
431            // Check if completed
432            metrics.completed = matches!(task.state, TaskState::Completed);
433
434            // Collect await durations from events
435            let task_events: Vec<&Event> = timeline
436                .events()
437                .iter()
438                .filter(|e| e.task_id == task.id)
439                .collect();
440
441            let mut await_start_times: HashMap<String, std::time::Instant> = HashMap::new();
442
443            for event in task_events {
444                match &event.kind {
445                    EventKind::AwaitStarted { await_point, .. } => {
446                        await_start_times.insert(await_point.clone(), event.timestamp);
447                    }
448                    EventKind::AwaitEnded { await_point, .. } => {
449                        if let Some(start_time) = await_start_times.remove(&await_point.clone()) {
450                            let duration = event.timestamp.duration_since(start_time);
451                            metrics.await_durations.push(duration);
452                            metrics.await_count += 1;
453                        }
454                    }
455                    _ => {}
456                }
457            }
458
459            profiler.record_task(metrics);
460        }
461
462        profiler
463    }
464
465    /// Get statistics
466    #[must_use]
467    pub fn stats(&self) -> InspectorStats {
468        let tasks = self.state.tasks.read();
469        let timeline = self.state.timeline.read();
470
471        let total = tasks.len();
472        let pending = tasks
473            .values()
474            .filter(|t| matches!(t.state, TaskState::Pending))
475            .count();
476        let running = tasks
477            .values()
478            .filter(|t| matches!(t.state, TaskState::Running))
479            .count();
480        let blocked = tasks
481            .values()
482            .filter(|t| matches!(t.state, TaskState::Blocked { .. }))
483            .count();
484        let completed = tasks
485            .values()
486            .filter(|t| matches!(t.state, TaskState::Completed))
487            .count();
488        let failed = tasks
489            .values()
490            .filter(|t| matches!(t.state, TaskState::Failed))
491            .count();
492
493        InspectorStats {
494            total_tasks: total,
495            pending_tasks: pending,
496            running_tasks: running,
497            blocked_tasks: blocked,
498            completed_tasks: completed,
499            failed_tasks: failed,
500            total_events: timeline.len(),
501            timeline_duration: timeline.duration(),
502        }
503    }
504
505    /// Clear all data
506    pub fn clear(&self) {
507        self.state.tasks.write().clear();
508        self.state.timeline.write().clear();
509        self.state.event_counter.store(1, Ordering::Relaxed);
510    }
511
512    /// Reset the inspector
513    pub fn reset(&self) {
514        self.clear();
515        self.enable();
516    }
517
518    /// Get the deadlock detector
519    ///
520    /// Returns a reference to the integrated deadlock detector for resource
521    /// tracking and deadlock analysis.
522    #[must_use]
523    pub fn deadlock_detector(&self) -> &DeadlockDetector {
524        &self.state.deadlock_detector
525    }
526}
527
528impl Default for Inspector {
529    fn default() -> Self {
530        Self::new()
531    }
532}
533
534/// Inspector statistics
535#[derive(Debug, Clone)]
536pub struct InspectorStats {
537    /// Total number of tasks
538    pub total_tasks: usize,
539    /// Tasks in pending state
540    pub pending_tasks: usize,
541    /// Tasks in running state
542    pub running_tasks: usize,
543    /// Tasks in blocked state
544    pub blocked_tasks: usize,
545    /// Completed tasks
546    pub completed_tasks: usize,
547    /// Failed tasks
548    pub failed_tasks: usize,
549    /// Total number of events
550    pub total_events: usize,
551    /// Total timeline duration
552    pub timeline_duration: Duration,
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[test]
560    fn test_inspector_creation() {
561        let inspector = Inspector::new();
562        assert!(inspector.is_enabled());
563    }
564
565    #[test]
566    fn test_register_task() {
567        let inspector = Inspector::new();
568        let task_id = inspector.register_task("test_task".to_string());
569        let task = inspector.get_task(task_id).unwrap();
570        assert_eq!(task.name, "test_task");
571    }
572
573    #[test]
574    fn test_task_lifecycle() {
575        let inspector = Inspector::new();
576        let task_id = inspector.register_task("test".to_string());
577
578        inspector.poll_started(task_id);
579        inspector.poll_ended(task_id, Duration::from_millis(10));
580        inspector.task_completed(task_id);
581
582        let task = inspector.get_task(task_id).unwrap();
583        assert_eq!(task.state, TaskState::Completed);
584        assert_eq!(task.poll_count, 1);
585    }
586
587    #[test]
588    fn test_stats() {
589        let inspector = Inspector::new();
590        inspector.register_task("task1".to_string());
591        inspector.register_task("task2".to_string());
592
593        let stats = inspector.stats();
594        assert_eq!(stats.total_tasks, 2);
595    }
596}