memscope_rs/core/
optimized_locks.rs

1//! Optimized lock implementations using parking_lot
2//!
3//! This module provides drop-in replacements for standard library locks
4//! using parking_lot for better performance and features.
5
6use crate::core::atomic_stats::AtomicPerformanceCounters;
7use parking_lot::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
8use std::time::{Duration, Instant};
9
10/// Optimized mutex with performance monitoring
11#[derive(Debug)]
12pub struct OptimizedMutex<T> {
13    inner: Mutex<T>,
14    counters: AtomicPerformanceCounters,
15}
16
17impl<T> OptimizedMutex<T> {
18    /// Create a new optimized mutex
19    pub fn new(data: T) -> Self {
20        Self {
21            inner: Mutex::new(data),
22            counters: AtomicPerformanceCounters::new(),
23        }
24    }
25
26    /// Lock the mutex with performance monitoring
27    pub fn lock(&self) -> OptimizedMutexGuard<'_, T> {
28        let start = Instant::now();
29
30        // Try to acquire the lock without blocking first
31        if let Some(guard) = self.inner.try_lock() {
32            let wait_time = start.elapsed();
33            self.counters.record_lock_acquisition(wait_time);
34            return OptimizedMutexGuard {
35                guard,
36                _counters: &self.counters,
37            };
38        }
39
40        // If we couldn't acquire immediately, record contention
41        self.counters.record_lock_contention();
42
43        // Now block until we can acquire the lock
44        let guard = self.inner.lock();
45        let wait_time = start.elapsed();
46        self.counters.record_lock_acquisition(wait_time);
47
48        OptimizedMutexGuard {
49            guard,
50            _counters: &self.counters,
51        }
52    }
53
54    /// Try to lock the mutex without blocking
55    pub fn try_lock(&self) -> Option<OptimizedMutexGuard<'_, T>> {
56        let start = Instant::now();
57
58        if let Some(guard) = self.inner.try_lock() {
59            let wait_time = start.elapsed();
60            self.counters.record_lock_acquisition(wait_time);
61            Some(OptimizedMutexGuard {
62                guard,
63                _counters: &self.counters,
64            })
65        } else {
66            self.counters.record_lock_contention();
67            None
68        }
69    }
70
71    /// Try to lock with a timeout
72    pub fn try_lock_for(&self, timeout: Duration) -> Option<OptimizedMutexGuard<'_, T>> {
73        let start = Instant::now();
74
75        if let Some(guard) = self.inner.try_lock_for(timeout) {
76            let wait_time = start.elapsed();
77            self.counters.record_lock_acquisition(wait_time);
78            Some(OptimizedMutexGuard {
79                guard,
80                _counters: &self.counters,
81            })
82        } else {
83            self.counters.record_lock_contention();
84            None
85        }
86    }
87
88    /// Get performance statistics for this mutex
89    pub fn performance_stats(&self) -> crate::core::atomic_stats::PerformanceSnapshot {
90        self.counters.snapshot()
91    }
92}
93
94/// Guard for optimized mutex
95pub struct OptimizedMutexGuard<'a, T> {
96    guard: MutexGuard<'a, T>,
97    _counters: &'a AtomicPerformanceCounters,
98}
99
100impl<'a, T> std::ops::Deref for OptimizedMutexGuard<'a, T> {
101    type Target = T;
102
103    fn deref(&self) -> &Self::Target {
104        &self.guard
105    }
106}
107
108impl<'a, T> std::ops::DerefMut for OptimizedMutexGuard<'a, T> {
109    fn deref_mut(&mut self) -> &mut Self::Target {
110        &mut self.guard
111    }
112}
113
114/// Optimized RwLock with performance monitoring
115#[derive(Debug)]
116pub struct OptimizedRwLock<T> {
117    inner: RwLock<T>,
118    counters: AtomicPerformanceCounters,
119}
120
121impl<T> OptimizedRwLock<T> {
122    /// Create a new optimized RwLock
123    pub fn new(data: T) -> Self {
124        Self {
125            inner: RwLock::new(data),
126            counters: AtomicPerformanceCounters::new(),
127        }
128    }
129
130    /// Acquire a read lock with performance monitoring
131    pub fn read(&self) -> OptimizedRwLockReadGuard<'_, T> {
132        let start = Instant::now();
133
134        // Try to acquire the read lock without blocking first
135        if let Some(guard) = self.inner.try_read() {
136            let wait_time = start.elapsed();
137            self.counters.record_lock_acquisition(wait_time);
138            return OptimizedRwLockReadGuard {
139                guard,
140                _counters: &self.counters,
141            };
142        }
143
144        // If we couldn't acquire immediately, record contention
145        self.counters.record_lock_contention();
146
147        // Now block until we can acquire the lock
148        let guard = self.inner.read();
149        let wait_time = start.elapsed();
150        self.counters.record_lock_acquisition(wait_time);
151
152        OptimizedRwLockReadGuard {
153            guard,
154            _counters: &self.counters,
155        }
156    }
157
158    /// Acquire a write lock with performance monitoring
159    pub fn write(&self) -> OptimizedRwLockWriteGuard<'_, T> {
160        let start = Instant::now();
161
162        // Try to acquire the write lock without blocking first
163        if let Some(guard) = self.inner.try_write() {
164            let wait_time = start.elapsed();
165            self.counters.record_lock_acquisition(wait_time);
166            return OptimizedRwLockWriteGuard {
167                guard,
168                _counters: &self.counters,
169            };
170        }
171
172        // If we couldn't acquire immediately, record contention
173        self.counters.record_lock_contention();
174
175        // Now block until we can acquire the lock
176        let guard = self.inner.write();
177        let wait_time = start.elapsed();
178        self.counters.record_lock_acquisition(wait_time);
179
180        OptimizedRwLockWriteGuard {
181            guard,
182            _counters: &self.counters,
183        }
184    }
185
186    /// Try to acquire a read lock without blocking
187    pub fn try_read(&self) -> Option<OptimizedRwLockReadGuard<'_, T>> {
188        let start = Instant::now();
189
190        if let Some(guard) = self.inner.try_read() {
191            let wait_time = start.elapsed();
192            self.counters.record_lock_acquisition(wait_time);
193            Some(OptimizedRwLockReadGuard {
194                guard,
195                _counters: &self.counters,
196            })
197        } else {
198            self.counters.record_lock_contention();
199            None
200        }
201    }
202
203    /// Try to acquire a write lock without blocking
204    pub fn try_write(&self) -> Option<OptimizedRwLockWriteGuard<'_, T>> {
205        let start = Instant::now();
206
207        if let Some(guard) = self.inner.try_write() {
208            let wait_time = start.elapsed();
209            self.counters.record_lock_acquisition(wait_time);
210            Some(OptimizedRwLockWriteGuard {
211                guard,
212                _counters: &self.counters,
213            })
214        } else {
215            self.counters.record_lock_contention();
216            None
217        }
218    }
219
220    /// Get performance statistics for this RwLock
221    pub fn performance_stats(&self) -> crate::core::atomic_stats::PerformanceSnapshot {
222        self.counters.snapshot()
223    }
224}
225
226/// Read guard for optimized RwLock
227pub struct OptimizedRwLockReadGuard<'a, T> {
228    guard: RwLockReadGuard<'a, T>,
229    _counters: &'a AtomicPerformanceCounters,
230}
231
232impl<'a, T> std::ops::Deref for OptimizedRwLockReadGuard<'a, T> {
233    type Target = T;
234
235    fn deref(&self) -> &Self::Target {
236        &self.guard
237    }
238}
239
240/// Write guard for optimized RwLock
241pub struct OptimizedRwLockWriteGuard<'a, T> {
242    guard: RwLockWriteGuard<'a, T>,
243    _counters: &'a AtomicPerformanceCounters,
244}
245
246impl<'a, T> std::ops::Deref for OptimizedRwLockWriteGuard<'a, T> {
247    type Target = T;
248
249    fn deref(&self) -> &Self::Target {
250        &self.guard
251    }
252}
253
254impl<'a, T> std::ops::DerefMut for OptimizedRwLockWriteGuard<'a, T> {
255    fn deref_mut(&mut self) -> &mut Self::Target {
256        &mut self.guard
257    }
258}
259
260/// Lock-free counter for simple atomic operations
261#[derive(Debug)]
262pub struct LockFreeCounter {
263    value: std::sync::atomic::AtomicU64,
264}
265
266impl LockFreeCounter {
267    /// Create a new lock-free counter
268    pub fn new(initial_value: u64) -> Self {
269        Self {
270            value: std::sync::atomic::AtomicU64::new(initial_value),
271        }
272    }
273
274    /// Increment the counter and return the new value
275    pub fn increment(&self) -> u64 {
276        self.value
277            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
278            + 1
279    }
280
281    /// Decrement the counter and return the new value
282    pub fn decrement(&self) -> u64 {
283        self.value
284            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed)
285            - 1
286    }
287
288    /// Add a value to the counter and return the new value
289    pub fn add(&self, value: u64) -> u64 {
290        self.value
291            .fetch_add(value, std::sync::atomic::Ordering::Relaxed)
292            + value
293    }
294
295    /// Subtract a value from the counter and return the new value
296    pub fn sub(&self, value: u64) -> u64 {
297        self.value
298            .fetch_sub(value, std::sync::atomic::Ordering::Relaxed)
299            - value
300    }
301
302    /// Get the current value
303    pub fn get(&self) -> u64 {
304        self.value.load(std::sync::atomic::Ordering::Relaxed)
305    }
306
307    /// Set the value
308    pub fn set(&self, value: u64) {
309        self.value
310            .store(value, std::sync::atomic::Ordering::Relaxed);
311    }
312
313    /// Compare and swap
314    pub fn compare_and_swap(&self, current: u64, new: u64) -> Result<u64, u64> {
315        self.value.compare_exchange(
316            current,
317            new,
318            std::sync::atomic::Ordering::Relaxed,
319            std::sync::atomic::Ordering::Relaxed,
320        )
321    }
322}
323
324impl Default for LockFreeCounter {
325    fn default() -> Self {
326        Self::new(0)
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use std::sync::Arc;
334    use std::thread;
335    use std::time::Duration;
336
337    #[test]
338    fn test_optimized_mutex_basic_functionality() {
339        let mutex = OptimizedMutex::new(42);
340
341        {
342            let guard = mutex.lock();
343            assert_eq!(*guard, 42);
344        }
345
346        // Test that lock is released
347        {
348            let mut guard = mutex.lock();
349            *guard = 100;
350        }
351
352        let guard = mutex.lock();
353        assert_eq!(*guard, 100);
354    }
355
356    #[test]
357    fn test_optimized_mutex_try_lock() {
358        let mutex = OptimizedMutex::new(42);
359
360        // First try_lock should succeed
361        let guard1 = mutex.try_lock();
362        assert!(guard1.is_some());
363        assert_eq!(*guard1.unwrap(), 42);
364
365        // Second try_lock should fail while first is held
366        let _guard1 = mutex.lock(); // Hold the lock
367        let guard2 = mutex.try_lock();
368        assert!(guard2.is_none());
369    }
370
371    #[test]
372    fn test_optimized_rwlock_basic_functionality() {
373        let rwlock = OptimizedRwLock::new(42);
374
375        // Test read access
376        {
377            let read_guard = rwlock.read();
378            assert_eq!(*read_guard, 42);
379        }
380
381        // Test write access
382        {
383            let mut write_guard = rwlock.write();
384            *write_guard = 100;
385        }
386
387        // Verify write took effect
388        let read_guard = rwlock.read();
389        assert_eq!(*read_guard, 100);
390    }
391
392    #[test]
393    fn test_optimized_rwlock_writer_exclusivity() {
394        let rwlock = Arc::new(OptimizedRwLock::new(0));
395        let rwlock_clone = Arc::clone(&rwlock);
396
397        // Start a writer thread
398        let writer_handle = thread::spawn(move || {
399            let mut write_guard = rwlock_clone.write();
400            *write_guard = 100;
401            thread::sleep(Duration::from_millis(50)); // Hold write lock
402            *write_guard = 200;
403        });
404
405        // Give writer time to acquire lock
406        thread::sleep(Duration::from_millis(10));
407
408        // Try to read - should wait for writer to finish
409        let start = std::time::Instant::now();
410        let read_guard = rwlock.read();
411        let duration = start.elapsed();
412
413        // Should have waited for writer
414        assert!(duration >= Duration::from_millis(30));
415        assert_eq!(*read_guard, 200);
416
417        writer_handle.join().unwrap();
418    }
419
420    #[test]
421    fn test_optimized_rwlock_try_operations() {
422        let rwlock = OptimizedRwLock::new(42);
423
424        // try_read should succeed when unlocked
425        let read_guard = rwlock.try_read();
426        assert!(read_guard.is_some());
427        if let Some(guard) = read_guard {
428            assert_eq!(*guard, 42);
429        }
430
431        // try_write should succeed when unlocked
432        let write_guard = rwlock.try_write();
433        assert!(write_guard.is_some());
434
435        // try_read should fail when write locked
436        let try_read = rwlock.try_read();
437        assert!(try_read.is_none());
438
439        // try_write should fail when already write locked
440        let try_write = rwlock.try_write();
441        assert!(try_write.is_none());
442    }
443
444    #[test]
445    fn test_lock_free_counter_basic_operations() {
446        let counter = LockFreeCounter::new(0);
447
448        assert_eq!(counter.get(), 0);
449
450        assert_eq!(counter.increment(), 1);
451        assert_eq!(counter.get(), 1);
452
453        assert_eq!(counter.add(5), 6);
454        assert_eq!(counter.get(), 6);
455
456        assert_eq!(counter.decrement(), 5);
457        assert_eq!(counter.get(), 5);
458
459        assert_eq!(counter.sub(3), 2);
460        assert_eq!(counter.get(), 2);
461    }
462
463    #[test]
464    fn test_lock_free_counter_concurrent_increments() {
465        let counter = Arc::new(LockFreeCounter::new(0));
466        let mut handles = Vec::new();
467
468        // Spawn multiple threads that increment
469        for _ in 0..10 {
470            let counter_clone = Arc::clone(&counter);
471            let handle = thread::spawn(move || {
472                for _ in 0..1000 {
473                    counter_clone.increment();
474                }
475            });
476            handles.push(handle);
477        }
478
479        // Wait for all threads
480        for handle in handles {
481            handle.join().unwrap();
482        }
483
484        // Should have exactly 10,000 increments
485        assert_eq!(counter.get(), 10000);
486    }
487
488    #[test]
489    fn test_lock_free_counter_mixed_operations() {
490        let counter = Arc::new(LockFreeCounter::new(0));
491        let mut handles = Vec::new();
492
493        // Incrementing threads
494        for _ in 0..5 {
495            let counter_clone = Arc::clone(&counter);
496            let handle = thread::spawn(move || {
497                for _ in 0..1000 {
498                    counter_clone.increment();
499                }
500            });
501            handles.push(handle);
502        }
503
504        // Decrementing threads
505        for _ in 0..3 {
506            let counter_clone = Arc::clone(&counter);
507            let handle = thread::spawn(move || {
508                for _ in 0..500 {
509                    counter_clone.decrement();
510                }
511            });
512            handles.push(handle);
513        }
514
515        // Adding threads
516        for _ in 0..2 {
517            let counter_clone = Arc::clone(&counter);
518            let handle = thread::spawn(move || {
519                for _ in 0..100 {
520                    counter_clone.add(10);
521                }
522            });
523            handles.push(handle);
524        }
525
526        for handle in handles {
527            handle.join().unwrap();
528        }
529
530        // Expected: 5*1000 - 3*500 + 2*100*10 = 5000 - 1500 + 2000 = 5500
531        assert_eq!(counter.get(), 5500);
532    }
533
534    #[test]
535    fn test_lock_free_counter_compare_and_swap() {
536        let counter = LockFreeCounter::new(42);
537
538        // Successful compare and swap
539        let result = counter.compare_and_swap(42, 100);
540        assert_eq!(result, Ok(42));
541        assert_eq!(counter.get(), 100);
542
543        // Failed compare and swap
544        let result = counter.compare_and_swap(42, 200);
545        assert_eq!(result, Err(100)); // Returns current value
546        assert_eq!(counter.get(), 100); // Value unchanged
547    }
548
549    #[test]
550    fn test_lock_free_counter_set_operation() {
551        let counter = LockFreeCounter::new(0);
552
553        counter.set(12345);
554        assert_eq!(counter.get(), 12345);
555
556        // Should work with large values
557        counter.set(u64::MAX - 1);
558        assert_eq!(counter.get(), u64::MAX - 1);
559    }
560
561    #[test]
562    fn test_lock_free_counter_default() {
563        let counter = LockFreeCounter::default();
564        assert_eq!(counter.get(), 0);
565
566        counter.increment();
567        assert_eq!(counter.get(), 1);
568    }
569
570    #[test]
571    fn test_lock_contention_performance() {
572        let mutex = Arc::new(OptimizedMutex::new(0));
573        let mut handles = Vec::new();
574
575        let start = std::time::Instant::now();
576
577        // Create high contention scenario
578        for _ in 0..20 {
579            let mutex_clone = Arc::clone(&mutex);
580            let handle = thread::spawn(move || {
581                for _ in 0..100 {
582                    let mut guard = mutex_clone.lock();
583                    *guard += 1;
584                    // Simulate minimal work while holding the lock
585                    std::hint::spin_loop();
586                }
587            });
588            handles.push(handle);
589        }
590
591        for handle in handles {
592            handle.join().unwrap();
593        }
594
595        let duration = start.elapsed();
596
597        // Should handle contention correctly and efficiently
598        assert_eq!(*mutex.lock(), 2000);
599        assert!(duration < Duration::from_secs(1)); // Should complete reasonably fast
600    }
601
602    #[test]
603    fn test_rwlock_reader_writer_fairness() {
604        let rwlock = Arc::new(OptimizedRwLock::new(0));
605        let rwlock_clone = Arc::clone(&rwlock);
606
607        // Start multiple readers
608        let mut reader_handles = Vec::new();
609        for i in 0..5 {
610            let rwlock_clone = Arc::clone(&rwlock);
611            let handle = thread::spawn(move || {
612                thread::sleep(Duration::from_millis(i * 10)); // Stagger start times
613                let _guard = rwlock_clone.read();
614                thread::sleep(Duration::from_millis(50)); // Hold read lock
615            });
616            reader_handles.push(handle);
617        }
618
619        // Start a writer after readers have started
620        thread::sleep(Duration::from_millis(20));
621        let writer_handle = thread::spawn(move || {
622            let mut guard = rwlock_clone.write();
623            *guard = 42;
624        });
625
626        // Wait for all to complete
627        for handle in reader_handles {
628            handle.join().unwrap();
629        }
630        writer_handle.join().unwrap();
631
632        // Writer should have eventually acquired the lock
633        let guard = rwlock.read();
634        assert_eq!(*guard, 42);
635    }
636}