async_inspect/sync/
mod.rs1mod mutex;
31mod rwlock;
32mod semaphore;
33
34pub use mutex::{Mutex, MutexGuard};
35pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
36pub use semaphore::{AcquireError, Semaphore, SemaphorePermit, TryAcquireError};
37
38use std::sync::atomic::{AtomicU64, Ordering};
39use std::time::{Duration, Instant};
40
41#[derive(Debug, Clone, Default)]
43pub struct LockMetrics {
44 pub acquisitions: u64,
46 pub contentions: u64,
48 pub total_wait_time: Duration,
50 pub max_wait_time: Duration,
52 pub avg_wait_time: Duration,
54}
55
56impl LockMetrics {
57 #[must_use]
59 pub fn contention_rate(&self) -> f64 {
60 if self.acquisitions == 0 {
61 0.0
62 } else {
63 self.contentions as f64 / self.acquisitions as f64
64 }
65 }
66}
67
68#[derive(Debug)]
70pub(crate) struct MetricsTracker {
71 acquisitions: AtomicU64,
72 contentions: AtomicU64,
73 total_wait_nanos: AtomicU64,
74 max_wait_nanos: AtomicU64,
75}
76
77impl MetricsTracker {
78 pub fn new() -> Self {
79 Self {
80 acquisitions: AtomicU64::new(0),
81 contentions: AtomicU64::new(0),
82 total_wait_nanos: AtomicU64::new(0),
83 max_wait_nanos: AtomicU64::new(0),
84 }
85 }
86
87 pub fn record_acquisition(&self, wait_time: Option<Duration>) {
88 self.acquisitions.fetch_add(1, Ordering::Relaxed);
89
90 if let Some(wait) = wait_time {
91 self.contentions.fetch_add(1, Ordering::Relaxed);
92 let nanos = wait.as_nanos() as u64;
93 self.total_wait_nanos.fetch_add(nanos, Ordering::Relaxed);
94
95 let mut current_max = self.max_wait_nanos.load(Ordering::Relaxed);
97 while nanos > current_max {
98 match self.max_wait_nanos.compare_exchange_weak(
99 current_max,
100 nanos,
101 Ordering::Relaxed,
102 Ordering::Relaxed,
103 ) {
104 Ok(_) => break,
105 Err(actual) => current_max = actual,
106 }
107 }
108 }
109 }
110
111 pub fn get_metrics(&self) -> LockMetrics {
112 let acquisitions = self.acquisitions.load(Ordering::Relaxed);
113 let contentions = self.contentions.load(Ordering::Relaxed);
114 let total_wait_nanos = self.total_wait_nanos.load(Ordering::Relaxed);
115 let max_wait_nanos = self.max_wait_nanos.load(Ordering::Relaxed);
116
117 let total_wait_time = Duration::from_nanos(total_wait_nanos);
118 let max_wait_time = Duration::from_nanos(max_wait_nanos);
119 let avg_wait_time = if contentions > 0 {
120 Duration::from_nanos(total_wait_nanos / contentions)
121 } else {
122 Duration::ZERO
123 };
124
125 LockMetrics {
126 acquisitions,
127 contentions,
128 total_wait_time,
129 max_wait_time,
130 avg_wait_time,
131 }
132 }
133
134 pub fn reset(&self) {
135 self.acquisitions.store(0, Ordering::Relaxed);
136 self.contentions.store(0, Ordering::Relaxed);
137 self.total_wait_nanos.store(0, Ordering::Relaxed);
138 self.max_wait_nanos.store(0, Ordering::Relaxed);
139 }
140}
141
142impl Default for MetricsTracker {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148pub(crate) struct WaitTimer {
150 start: Instant,
151 threshold: Duration,
153}
154
155impl WaitTimer {
156 pub fn start() -> Self {
157 Self {
158 start: Instant::now(),
159 threshold: Duration::from_micros(10), }
161 }
162
163 pub fn elapsed_if_contended(&self) -> Option<Duration> {
165 let elapsed = self.start.elapsed();
166 if elapsed > self.threshold {
167 Some(elapsed)
168 } else {
169 None
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn test_metrics_tracker() {
180 let tracker = MetricsTracker::new();
181
182 tracker.record_acquisition(None); tracker.record_acquisition(Some(Duration::from_millis(10)));
185 tracker.record_acquisition(Some(Duration::from_millis(20)));
186
187 let metrics = tracker.get_metrics();
188 assert_eq!(metrics.acquisitions, 3);
189 assert_eq!(metrics.contentions, 2);
190 assert_eq!(metrics.max_wait_time, Duration::from_millis(20));
191 }
192
193 #[test]
194 fn test_contention_rate() {
195 let metrics = LockMetrics {
196 acquisitions: 100,
197 contentions: 25,
198 ..Default::default()
199 };
200 assert!((metrics.contention_rate() - 0.25).abs() < 0.001);
201 }
202}