async_inspect/deadlock/
mod.rs

1//! Deadlock detection and analysis
2//!
3//! This module provides automatic detection of deadlocks caused by circular
4//! dependencies between tasks waiting on resources (mutexes, channels, etc.).
5
6use crate::task::TaskId;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10use std::fmt;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13
14/// Unique identifier for a resource (lock, channel, etc.)
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct ResourceId(u64);
17
18impl ResourceId {
19    /// Create a new unique resource ID
20    pub fn new() -> Self {
21        static COUNTER: AtomicU64 = AtomicU64::new(1);
22        Self(COUNTER.fetch_add(1, Ordering::Relaxed))
23    }
24
25    /// Get the raw ID value
26    #[must_use]
27    pub fn as_u64(&self) -> u64 {
28        self.0
29    }
30}
31
32impl Default for ResourceId {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl fmt::Display for ResourceId {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(f, "Resource#{}", self.0)
41    }
42}
43
44/// Type of resource that can be waited on
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub enum ResourceKind {
47    /// Mutex lock
48    Mutex,
49    /// `RwLock` (read or write)
50    RwLock,
51    /// Semaphore
52    Semaphore,
53    /// Channel (send or receive)
54    Channel,
55    /// Other resource type
56    Other(String),
57}
58
59impl fmt::Display for ResourceKind {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        match self {
62            Self::Mutex => write!(f, "Mutex"),
63            Self::RwLock => write!(f, "RwLock"),
64            Self::Semaphore => write!(f, "Semaphore"),
65            Self::Channel => write!(f, "Channel"),
66            Self::Other(name) => write!(f, "{name}"),
67        }
68    }
69}
70
71/// Information about a resource
72#[derive(Debug, Clone)]
73pub struct ResourceInfo {
74    /// Unique resource identifier
75    pub id: ResourceId,
76
77    /// Type of resource
78    pub kind: ResourceKind,
79
80    /// Name or description
81    pub name: String,
82
83    /// Task currently holding this resource (if any)
84    pub holder: Option<TaskId>,
85
86    /// Tasks waiting for this resource
87    pub waiters: Vec<TaskId>,
88
89    /// Memory address (for debugging)
90    pub address: Option<usize>,
91}
92
93impl ResourceInfo {
94    /// Create a new resource info
95    #[must_use]
96    pub fn new(kind: ResourceKind, name: String) -> Self {
97        Self {
98            id: ResourceId::new(),
99            kind,
100            name,
101            holder: None,
102            waiters: Vec::new(),
103            address: None,
104        }
105    }
106
107    /// Set the memory address
108    #[must_use]
109    pub fn with_address(mut self, address: usize) -> Self {
110        self.address = Some(address);
111        self
112    }
113
114    /// Check if resource is held
115    #[must_use]
116    pub fn is_held(&self) -> bool {
117        self.holder.is_some()
118    }
119
120    /// Check if resource has waiters
121    #[must_use]
122    pub fn has_waiters(&self) -> bool {
123        !self.waiters.is_empty()
124    }
125}
126
127impl fmt::Display for ResourceInfo {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        write!(f, "{} '{}' ({})", self.kind, self.name, self.id)?;
130        if let Some(addr) = self.address {
131            write!(f, " @ 0x{addr:x}")?;
132        }
133        Ok(())
134    }
135}
136
137/// A cycle in the wait-for graph (deadlock)
138#[derive(Debug, Clone)]
139pub struct DeadlockCycle {
140    /// Tasks involved in the cycle
141    pub tasks: Vec<TaskId>,
142
143    /// Resources involved in the cycle
144    pub resources: Vec<ResourceId>,
145
146    /// Detailed chain: Task -> Resource -> Task -> ...
147    pub chain: Vec<WaitEdge>,
148}
149
150/// An edge in the wait-for graph
151#[derive(Debug, Clone)]
152pub struct WaitEdge {
153    /// Task waiting
154    pub task: TaskId,
155
156    /// Resource being waited for
157    pub resource: ResourceId,
158
159    /// Task holding the resource
160    pub holder: TaskId,
161}
162
163impl DeadlockCycle {
164    /// Get a human-readable description of the cycle
165    #[must_use]
166    pub fn describe(&self) -> String {
167        let mut desc = String::from("Deadlock cycle detected:\n");
168
169        for (i, edge) in self.chain.iter().enumerate() {
170            desc.push_str(&format!(
171                "  {} Task {} → {} → Task {}\n",
172                if i == 0 { "→" } else { " " },
173                edge.task,
174                edge.resource,
175                edge.holder
176            ));
177        }
178
179        desc.push_str(&format!(
180            "\n{} tasks and {} resources involved",
181            self.tasks.len(),
182            self.resources.len()
183        ));
184
185        desc
186    }
187}
188
189/// Deadlock detector
190#[derive(Clone)]
191pub struct DeadlockDetector {
192    /// Shared state
193    state: Arc<RwLock<DetectorState>>,
194}
195
196struct DetectorState {
197    /// All tracked resources
198    resources: HashMap<ResourceId, ResourceInfo>,
199
200    /// Mapping from task to resources it's waiting for
201    task_waiting: HashMap<TaskId, ResourceId>,
202
203    /// Whether detection is enabled
204    enabled: bool,
205}
206
207impl DeadlockDetector {
208    /// Create a new deadlock detector
209    #[must_use]
210    pub fn new() -> Self {
211        Self {
212            state: Arc::new(RwLock::new(DetectorState {
213                resources: HashMap::new(),
214                task_waiting: HashMap::new(),
215                enabled: true,
216            })),
217        }
218    }
219
220    /// Enable deadlock detection
221    pub fn enable(&self) {
222        self.state.write().enabled = true;
223    }
224
225    /// Disable deadlock detection
226    pub fn disable(&self) {
227        self.state.write().enabled = false;
228    }
229
230    /// Check if detection is enabled
231    #[must_use]
232    pub fn is_enabled(&self) -> bool {
233        self.state.read().enabled
234    }
235
236    /// Register a new resource
237    #[must_use]
238    pub fn register_resource(&self, info: ResourceInfo) -> ResourceId {
239        if !self.is_enabled() {
240            return info.id;
241        }
242
243        let resource_id = info.id;
244        self.state.write().resources.insert(resource_id, info);
245        resource_id
246    }
247
248    /// Record a task acquiring a resource
249    pub fn acquire(&self, task_id: TaskId, resource_id: ResourceId) {
250        if !self.is_enabled() {
251            return;
252        }
253
254        let mut state = self.state.write();
255
256        // Remove from waiting
257        state.task_waiting.remove(&task_id);
258
259        // Set as holder
260        if let Some(resource) = state.resources.get_mut(&resource_id) {
261            resource.holder = Some(task_id);
262            resource.waiters.retain(|&t| t != task_id);
263        }
264    }
265
266    /// Record a task releasing a resource
267    pub fn release(&self, task_id: TaskId, resource_id: ResourceId) {
268        if !self.is_enabled() {
269            return;
270        }
271
272        let mut state = self.state.write();
273
274        if let Some(resource) = state.resources.get_mut(&resource_id) {
275            if resource.holder == Some(task_id) {
276                resource.holder = None;
277            }
278        }
279    }
280
281    /// Record a task waiting for a resource
282    pub fn wait_for(&self, task_id: TaskId, resource_id: ResourceId) {
283        if !self.is_enabled() {
284            return;
285        }
286
287        let mut state = self.state.write();
288
289        // Record waiting
290        state.task_waiting.insert(task_id, resource_id);
291
292        // Add to waiters list
293        if let Some(resource) = state.resources.get_mut(&resource_id) {
294            if !resource.waiters.contains(&task_id) {
295                resource.waiters.push(task_id);
296            }
297        }
298    }
299
300    /// Detect deadlocks using cycle detection
301    #[must_use]
302    pub fn detect_deadlocks(&self) -> Vec<DeadlockCycle> {
303        let state = self.state.read();
304
305        // Build wait-for graph: Task -> Task via Resource
306        let mut graph: HashMap<TaskId, Vec<TaskId>> = HashMap::new();
307        let mut task_to_resource: HashMap<TaskId, ResourceId> = HashMap::new();
308
309        for (&waiting_task, &resource_id) in &state.task_waiting {
310            if let Some(resource) = state.resources.get(&resource_id) {
311                if let Some(holder_task) = resource.holder {
312                    graph.entry(waiting_task).or_default().push(holder_task);
313                    task_to_resource.insert(waiting_task, resource_id);
314                }
315            }
316        }
317
318        // Find cycles using DFS
319        let mut cycles = Vec::new();
320        let mut visited = HashSet::new();
321        let mut rec_stack = HashSet::new();
322
323        for &task in graph.keys() {
324            if !visited.contains(&task) {
325                if let Some(cycle) = self.find_cycle_dfs(
326                    task,
327                    &graph,
328                    &task_to_resource,
329                    &mut visited,
330                    &mut rec_stack,
331                    &mut Vec::new(),
332                ) {
333                    cycles.push(cycle);
334                }
335            }
336        }
337
338        cycles
339    }
340
341    /// DFS-based cycle detection
342    fn find_cycle_dfs(
343        &self,
344        task: TaskId,
345        graph: &HashMap<TaskId, Vec<TaskId>>,
346        task_to_resource: &HashMap<TaskId, ResourceId>,
347        visited: &mut HashSet<TaskId>,
348        rec_stack: &mut HashSet<TaskId>,
349        path: &mut Vec<TaskId>,
350    ) -> Option<DeadlockCycle> {
351        visited.insert(task);
352        rec_stack.insert(task);
353        path.push(task);
354
355        if let Some(neighbors) = graph.get(&task) {
356            for &neighbor in neighbors {
357                if !visited.contains(&neighbor) {
358                    if let Some(cycle) = self.find_cycle_dfs(
359                        neighbor,
360                        graph,
361                        task_to_resource,
362                        visited,
363                        rec_stack,
364                        path,
365                    ) {
366                        return Some(cycle);
367                    }
368                } else if rec_stack.contains(&neighbor) {
369                    // Found a cycle!
370                    return Some(self.build_cycle(neighbor, path, task_to_resource));
371                }
372            }
373        }
374
375        rec_stack.remove(&task);
376        path.pop();
377        None
378    }
379
380    /// Build a deadlock cycle from the path
381    fn build_cycle(
382        &self,
383        start_task: TaskId,
384        path: &[TaskId],
385        task_to_resource: &HashMap<TaskId, ResourceId>,
386    ) -> DeadlockCycle {
387        // Find where the cycle starts
388        let cycle_start = path.iter().position(|&t| t == start_task).unwrap_or(0);
389        let cycle_tasks: Vec<TaskId> = path[cycle_start..].to_vec();
390
391        let mut resources = Vec::new();
392        let mut chain = Vec::new();
393
394        for i in 0..cycle_tasks.len() {
395            let waiting_task = cycle_tasks[i];
396            let holder_task = cycle_tasks[(i + 1) % cycle_tasks.len()];
397
398            if let Some(&resource_id) = task_to_resource.get(&waiting_task) {
399                resources.push(resource_id);
400                chain.push(WaitEdge {
401                    task: waiting_task,
402                    resource: resource_id,
403                    holder: holder_task,
404                });
405            }
406        }
407
408        DeadlockCycle {
409            tasks: cycle_tasks,
410            resources,
411            chain,
412        }
413    }
414
415    /// Get all resources
416    #[must_use]
417    pub fn get_resources(&self) -> Vec<ResourceInfo> {
418        self.state.read().resources.values().cloned().collect()
419    }
420
421    /// Get a specific resource
422    #[must_use]
423    pub fn get_resource(&self, id: ResourceId) -> Option<ResourceInfo> {
424        self.state.read().resources.get(&id).cloned()
425    }
426
427    /// Clear all tracking data
428    pub fn clear(&self) {
429        let mut state = self.state.write();
430        state.resources.clear();
431        state.task_waiting.clear();
432    }
433}
434
435impl Default for DeadlockDetector {
436    fn default() -> Self {
437        Self::new()
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444
445    #[test]
446    fn test_resource_creation() {
447        let resource = ResourceInfo::new(ResourceKind::Mutex, "test_mutex".to_string());
448        assert_eq!(resource.kind, ResourceKind::Mutex);
449        assert_eq!(resource.name, "test_mutex");
450        assert!(!resource.is_held());
451        assert!(!resource.has_waiters());
452    }
453
454    #[test]
455    fn test_detector_registration() {
456        let detector = DeadlockDetector::new();
457        let resource = ResourceInfo::new(ResourceKind::Mutex, "test".to_string());
458        let resource_id = resource.id;
459
460        detector.register_resource(resource);
461
462        let retrieved = detector.get_resource(resource_id).unwrap();
463        assert_eq!(retrieved.name, "test");
464    }
465
466    #[test]
467    fn test_simple_deadlock_detection() {
468        let detector = DeadlockDetector::new();
469
470        // Create two resources
471        let res1 = ResourceInfo::new(ResourceKind::Mutex, "mutex_a".to_string());
472        let res2 = ResourceInfo::new(ResourceKind::Mutex, "mutex_b".to_string());
473        let res1_id = res1.id;
474        let res2_id = res2.id;
475
476        detector.register_resource(res1);
477        detector.register_resource(res2);
478
479        // Create two tasks
480        let task1 = TaskId::new();
481        let task2 = TaskId::new();
482
483        // Task1 holds res1, waits for res2
484        detector.acquire(task1, res1_id);
485        detector.wait_for(task1, res2_id);
486
487        // Task2 holds res2, waits for res1
488        detector.acquire(task2, res2_id);
489        detector.wait_for(task2, res1_id);
490
491        // Detect deadlock
492        let deadlocks = detector.detect_deadlocks();
493        assert_eq!(deadlocks.len(), 1);
494
495        let cycle = &deadlocks[0];
496        assert_eq!(cycle.tasks.len(), 2);
497        assert!(cycle.tasks.contains(&task1));
498        assert!(cycle.tasks.contains(&task2));
499    }
500
501    #[test]
502    fn test_no_deadlock() {
503        let detector = DeadlockDetector::new();
504
505        let res = ResourceInfo::new(ResourceKind::Mutex, "mutex".to_string());
506        let res_id = res.id;
507        detector.register_resource(res);
508
509        let task1 = TaskId::new();
510        let task2 = TaskId::new();
511
512        // Task1 acquires and releases
513        detector.acquire(task1, res_id);
514        detector.release(task1, res_id);
515
516        // Task2 acquires
517        detector.acquire(task2, res_id);
518
519        // No deadlock
520        let deadlocks = detector.detect_deadlocks();
521        assert_eq!(deadlocks.len(), 0);
522    }
523}