async_inspect/sync/
mod.rs

1//! Tracked synchronization primitives
2//!
3//! This module provides drop-in replacements for Tokio's synchronization primitives
4//! (`Mutex`, `RwLock`, `Semaphore`) that automatically track contention and integrate
5//! with async-inspect's deadlock detection.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use async_inspect::sync::Mutex;
11//!
12//! #[tokio::main]
13//! async fn main() {
14//!     // Create a tracked mutex (drop-in replacement for tokio::sync::Mutex)
15//!     let mutex = Mutex::new(42, "my_counter");
16//!
17//!     // Use it like normal - tracking is automatic!
18//!     {
19//!         let mut guard = mutex.lock().await;
20//!         *guard += 1;
21//!     }
22//!
23//!     // Check contention metrics
24//!     let metrics = mutex.metrics();
25//!     println!("Acquisitions: {}", metrics.acquisitions);
26//!     println!("Contentions: {}", metrics.contentions);
27//! }
28//! ```
29
30mod mutex;
31mod rwlock;
32mod semaphore;
33
34pub use mutex::{Mutex, MutexGuard};
35pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
36pub use semaphore::{AcquireError, Semaphore, SemaphorePermit, TryAcquireError};
37
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Duration, Instant};
40
41/// Metrics for a tracked synchronization primitive
42#[derive(Debug, Clone, Default)]
43pub struct LockMetrics {
44    /// Total number of successful acquisitions
45    pub acquisitions: u64,
46    /// Number of times a task had to wait (contention)
47    pub contentions: u64,
48    /// Total time spent waiting for the lock
49    pub total_wait_time: Duration,
50    /// Maximum wait time observed
51    pub max_wait_time: Duration,
52    /// Average wait time (when contended)
53    pub avg_wait_time: Duration,
54}
55
56impl LockMetrics {
57    /// Calculate contention rate (0.0 to 1.0)
58    #[must_use]
59    pub fn contention_rate(&self) -> f64 {
60        if self.acquisitions == 0 {
61            0.0
62        } else {
63            self.contentions as f64 / self.acquisitions as f64
64        }
65    }
66}
67
68/// Internal metrics tracker for locks
69#[derive(Debug)]
70pub(crate) struct MetricsTracker {
71    acquisitions: AtomicU64,
72    contentions: AtomicU64,
73    total_wait_nanos: AtomicU64,
74    max_wait_nanos: AtomicU64,
75}
76
77impl MetricsTracker {
78    pub fn new() -> Self {
79        Self {
80            acquisitions: AtomicU64::new(0),
81            contentions: AtomicU64::new(0),
82            total_wait_nanos: AtomicU64::new(0),
83            max_wait_nanos: AtomicU64::new(0),
84        }
85    }
86
87    pub fn record_acquisition(&self, wait_time: Option<Duration>) {
88        self.acquisitions.fetch_add(1, Ordering::Relaxed);
89
90        if let Some(wait) = wait_time {
91            self.contentions.fetch_add(1, Ordering::Relaxed);
92            let nanos = wait.as_nanos() as u64;
93            self.total_wait_nanos.fetch_add(nanos, Ordering::Relaxed);
94
95            // Update max wait time (compare-and-swap loop)
96            let mut current_max = self.max_wait_nanos.load(Ordering::Relaxed);
97            while nanos > current_max {
98                match self.max_wait_nanos.compare_exchange_weak(
99                    current_max,
100                    nanos,
101                    Ordering::Relaxed,
102                    Ordering::Relaxed,
103                ) {
104                    Ok(_) => break,
105                    Err(actual) => current_max = actual,
106                }
107            }
108        }
109    }
110
111    pub fn get_metrics(&self) -> LockMetrics {
112        let acquisitions = self.acquisitions.load(Ordering::Relaxed);
113        let contentions = self.contentions.load(Ordering::Relaxed);
114        let total_wait_nanos = self.total_wait_nanos.load(Ordering::Relaxed);
115        let max_wait_nanos = self.max_wait_nanos.load(Ordering::Relaxed);
116
117        let total_wait_time = Duration::from_nanos(total_wait_nanos);
118        let max_wait_time = Duration::from_nanos(max_wait_nanos);
119        let avg_wait_time = if contentions > 0 {
120            Duration::from_nanos(total_wait_nanos / contentions)
121        } else {
122            Duration::ZERO
123        };
124
125        LockMetrics {
126            acquisitions,
127            contentions,
128            total_wait_time,
129            max_wait_time,
130            avg_wait_time,
131        }
132    }
133
134    pub fn reset(&self) {
135        self.acquisitions.store(0, Ordering::Relaxed);
136        self.contentions.store(0, Ordering::Relaxed);
137        self.total_wait_nanos.store(0, Ordering::Relaxed);
138        self.max_wait_nanos.store(0, Ordering::Relaxed);
139    }
140}
141
142impl Default for MetricsTracker {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148/// Helper to measure wait time
149pub(crate) struct WaitTimer {
150    start: Instant,
151    /// Threshold below which we don't count as contention (immediate acquisition)
152    threshold: Duration,
153}
154
155impl WaitTimer {
156    pub fn start() -> Self {
157        Self {
158            start: Instant::now(),
159            threshold: Duration::from_micros(10), // 10µs threshold
160        }
161    }
162
163    /// Returns Some(duration) if there was actual contention, None if immediate
164    pub fn elapsed_if_contended(&self) -> Option<Duration> {
165        let elapsed = self.start.elapsed();
166        if elapsed > self.threshold {
167            Some(elapsed)
168        } else {
169            None
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    #[test]
179    fn test_metrics_tracker() {
180        let tracker = MetricsTracker::new();
181
182        // Record some acquisitions
183        tracker.record_acquisition(None); // Immediate
184        tracker.record_acquisition(Some(Duration::from_millis(10)));
185        tracker.record_acquisition(Some(Duration::from_millis(20)));
186
187        let metrics = tracker.get_metrics();
188        assert_eq!(metrics.acquisitions, 3);
189        assert_eq!(metrics.contentions, 2);
190        assert_eq!(metrics.max_wait_time, Duration::from_millis(20));
191    }
192
193    #[test]
194    fn test_contention_rate() {
195        let metrics = LockMetrics {
196            acquisitions: 100,
197            contentions: 25,
198            ..Default::default()
199        };
200        assert!((metrics.contention_rate() - 0.25).abs() < 0.001);
201    }
202}