memscope_rs/core/
optimized_locks.rs

1//! Optimized lock implementations using parking_lot
2//!
3//! This module provides drop-in replacements for standard library locks
4//! using parking_lot for better performance and features.
5
6use crate::core::atomic_stats::AtomicPerformanceCounters;
7use parking_lot::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
8use std::time::{Duration, Instant};
9
10/// Optimized mutex with performance monitoring
11#[derive(Debug)]
12pub struct OptimizedMutex<T> {
13    inner: Mutex<T>,
14    counters: AtomicPerformanceCounters,
15}
16
17impl<T> OptimizedMutex<T> {
18    /// Create a new optimized mutex
19    pub fn new(data: T) -> Self {
20        Self {
21            inner: Mutex::new(data),
22            counters: AtomicPerformanceCounters::new(),
23        }
24    }
25
26    /// Lock the mutex with performance monitoring
27    pub fn lock(&self) -> OptimizedMutexGuard<'_, T> {
28        let start = Instant::now();
29
30        // Try to acquire the lock without blocking first
31        if let Some(guard) = self.inner.try_lock() {
32            let wait_time = start.elapsed();
33            self.counters.record_lock_acquisition(wait_time);
34            return OptimizedMutexGuard {
35                guard,
36                _counters: &self.counters,
37            };
38        }
39
40        // If we couldn't acquire immediately, record contention
41        self.counters.record_lock_contention();
42
43        // Now block until we can acquire the lock
44        let guard = self.inner.lock();
45        let wait_time = start.elapsed();
46        self.counters.record_lock_acquisition(wait_time);
47
48        OptimizedMutexGuard {
49            guard,
50            _counters: &self.counters,
51        }
52    }
53
54    /// Try to lock the mutex without blocking
55    pub fn try_lock(&self) -> Option<OptimizedMutexGuard<'_, T>> {
56        let start = Instant::now();
57
58        if let Some(guard) = self.inner.try_lock() {
59            let wait_time = start.elapsed();
60            self.counters.record_lock_acquisition(wait_time);
61            Some(OptimizedMutexGuard {
62                guard,
63                _counters: &self.counters,
64            })
65        } else {
66            self.counters.record_lock_contention();
67            None
68        }
69    }
70
71    /// Try to lock with a timeout
72    pub fn try_lock_for(&self, timeout: Duration) -> Option<OptimizedMutexGuard<'_, T>> {
73        let start = Instant::now();
74
75        if let Some(guard) = self.inner.try_lock_for(timeout) {
76            let wait_time = start.elapsed();
77            self.counters.record_lock_acquisition(wait_time);
78            Some(OptimizedMutexGuard {
79                guard,
80                _counters: &self.counters,
81            })
82        } else {
83            self.counters.record_lock_contention();
84            None
85        }
86    }
87
88    /// Get performance statistics for this mutex
89    pub fn performance_stats(&self) -> crate::core::atomic_stats::PerformanceSnapshot {
90        self.counters.snapshot()
91    }
92}
93
94/// Guard for optimized mutex
95pub struct OptimizedMutexGuard<'a, T> {
96    guard: MutexGuard<'a, T>,
97    _counters: &'a AtomicPerformanceCounters,
98}
99
100impl<'a, T> std::ops::Deref for OptimizedMutexGuard<'a, T> {
101    type Target = T;
102
103    fn deref(&self) -> &Self::Target {
104        &self.guard
105    }
106}
107
108impl<'a, T> std::ops::DerefMut for OptimizedMutexGuard<'a, T> {
109    fn deref_mut(&mut self) -> &mut Self::Target {
110        &mut self.guard
111    }
112}
113
114/// Optimized RwLock with performance monitoring
115#[derive(Debug)]
116pub struct OptimizedRwLock<T> {
117    inner: RwLock<T>,
118    counters: AtomicPerformanceCounters,
119}
120
121impl<T> OptimizedRwLock<T> {
122    /// Create a new optimized RwLock
123    pub fn new(data: T) -> Self {
124        Self {
125            inner: RwLock::new(data),
126            counters: AtomicPerformanceCounters::new(),
127        }
128    }
129
130    /// Acquire a read lock with performance monitoring
131    pub fn read(&self) -> OptimizedRwLockReadGuard<'_, T> {
132        let start = Instant::now();
133
134        // Try to acquire the read lock without blocking first
135        if let Some(guard) = self.inner.try_read() {
136            let wait_time = start.elapsed();
137            self.counters.record_lock_acquisition(wait_time);
138            return OptimizedRwLockReadGuard {
139                guard,
140                _counters: &self.counters,
141            };
142        }
143
144        // If we couldn't acquire immediately, record contention
145        self.counters.record_lock_contention();
146
147        // Now block until we can acquire the lock
148        let guard = self.inner.read();
149        let wait_time = start.elapsed();
150        self.counters.record_lock_acquisition(wait_time);
151
152        OptimizedRwLockReadGuard {
153            guard,
154            _counters: &self.counters,
155        }
156    }
157
158    /// Acquire a write lock with performance monitoring
159    pub fn write(&self) -> OptimizedRwLockWriteGuard<'_, T> {
160        let start = Instant::now();
161
162        // Try to acquire the write lock without blocking first
163        if let Some(guard) = self.inner.try_write() {
164            let wait_time = start.elapsed();
165            self.counters.record_lock_acquisition(wait_time);
166            return OptimizedRwLockWriteGuard {
167                guard,
168                _counters: &self.counters,
169            };
170        }
171
172        // If we couldn't acquire immediately, record contention
173        self.counters.record_lock_contention();
174
175        // Now block until we can acquire the lock
176        let guard = self.inner.write();
177        let wait_time = start.elapsed();
178        self.counters.record_lock_acquisition(wait_time);
179
180        OptimizedRwLockWriteGuard {
181            guard,
182            _counters: &self.counters,
183        }
184    }
185
186    /// Try to acquire a read lock without blocking
187    pub fn try_read(&self) -> Option<OptimizedRwLockReadGuard<'_, T>> {
188        let start = Instant::now();
189
190        if let Some(guard) = self.inner.try_read() {
191            let wait_time = start.elapsed();
192            self.counters.record_lock_acquisition(wait_time);
193            Some(OptimizedRwLockReadGuard {
194                guard,
195                _counters: &self.counters,
196            })
197        } else {
198            self.counters.record_lock_contention();
199            None
200        }
201    }
202
203    /// Try to acquire a write lock without blocking
204    pub fn try_write(&self) -> Option<OptimizedRwLockWriteGuard<'_, T>> {
205        let start = Instant::now();
206
207        if let Some(guard) = self.inner.try_write() {
208            let wait_time = start.elapsed();
209            self.counters.record_lock_acquisition(wait_time);
210            Some(OptimizedRwLockWriteGuard {
211                guard,
212                _counters: &self.counters,
213            })
214        } else {
215            self.counters.record_lock_contention();
216            None
217        }
218    }
219
220    /// Get performance statistics for this RwLock
221    pub fn performance_stats(&self) -> crate::core::atomic_stats::PerformanceSnapshot {
222        self.counters.snapshot()
223    }
224}
225
226/// Read guard for optimized RwLock
227pub struct OptimizedRwLockReadGuard<'a, T> {
228    guard: RwLockReadGuard<'a, T>,
229    _counters: &'a AtomicPerformanceCounters,
230}
231
232impl<'a, T> std::ops::Deref for OptimizedRwLockReadGuard<'a, T> {
233    type Target = T;
234
235    fn deref(&self) -> &Self::Target {
236        &self.guard
237    }
238}
239
240/// Write guard for optimized RwLock
241pub struct OptimizedRwLockWriteGuard<'a, T> {
242    guard: RwLockWriteGuard<'a, T>,
243    _counters: &'a AtomicPerformanceCounters,
244}
245
246impl<'a, T> std::ops::Deref for OptimizedRwLockWriteGuard<'a, T> {
247    type Target = T;
248
249    fn deref(&self) -> &Self::Target {
250        &self.guard
251    }
252}
253
254impl<'a, T> std::ops::DerefMut for OptimizedRwLockWriteGuard<'a, T> {
255    fn deref_mut(&mut self) -> &mut Self::Target {
256        &mut self.guard
257    }
258}
259
260/// Lock-free counter for simple atomic operations
261#[derive(Debug)]
262pub struct LockFreeCounter {
263    value: std::sync::atomic::AtomicU64,
264}
265
266impl LockFreeCounter {
267    /// Create a new lock-free counter
268    pub fn new(initial_value: u64) -> Self {
269        Self {
270            value: std::sync::atomic::AtomicU64::new(initial_value),
271        }
272    }
273
274    /// Increment the counter and return the new value
275    pub fn increment(&self) -> u64 {
276        let old_value = self
277            .value
278            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
279        old_value.wrapping_add(1)
280    }
281
282    /// Decrement the counter and return the new value
283    pub fn decrement(&self) -> u64 {
284        let old_value = self
285            .value
286            .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
287        old_value.wrapping_sub(1)
288    }
289
290    /// Add a value to the counter and return the new value
291    pub fn add(&self, value: u64) -> u64 {
292        let old_value = self
293            .value
294            .fetch_add(value, std::sync::atomic::Ordering::Relaxed);
295        old_value.wrapping_add(value)
296    }
297
298    /// Subtract a value from the counter and return the new value
299    pub fn sub(&self, value: u64) -> u64 {
300        let old_value = self
301            .value
302            .fetch_sub(value, std::sync::atomic::Ordering::Relaxed);
303        old_value.wrapping_sub(value)
304    }
305
306    /// Get the current value
307    pub fn get(&self) -> u64 {
308        self.value.load(std::sync::atomic::Ordering::Relaxed)
309    }
310
311    /// Set the value
312    pub fn set(&self, value: u64) {
313        self.value
314            .store(value, std::sync::atomic::Ordering::Relaxed);
315    }
316
317    /// Compare and swap
318    pub fn compare_and_swap(&self, current: u64, new: u64) -> Result<u64, u64> {
319        self.value.compare_exchange(
320            current,
321            new,
322            std::sync::atomic::Ordering::Relaxed,
323            std::sync::atomic::Ordering::Relaxed,
324        )
325    }
326}
327
328impl Default for LockFreeCounter {
329    fn default() -> Self {
330        Self::new(0)
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use std::sync::Arc;
338    use std::thread;
339    use std::time::Duration;
340
341    #[test]
342    fn test_optimized_mutex_basic_functionality() {
343        let mutex = OptimizedMutex::new(42);
344
345        {
346            let guard = mutex.lock();
347            assert_eq!(*guard, 42);
348        }
349
350        // Test that lock is released
351        {
352            let mut guard = mutex.lock();
353            *guard = 100;
354        }
355
356        let guard = mutex.lock();
357        assert_eq!(*guard, 100);
358    }
359
360    #[test]
361    fn test_optimized_mutex_try_lock() {
362        let mutex = OptimizedMutex::new(42);
363
364        // First try_lock should succeed
365        let guard1 = mutex.try_lock();
366        assert!(guard1.is_some());
367        assert_eq!(*guard1.unwrap(), 42);
368
369        // Second try_lock should fail while first is held
370        let _guard1 = mutex.lock(); // Hold the lock
371        let guard2 = mutex.try_lock();
372        assert!(guard2.is_none());
373    }
374
375    #[test]
376    fn test_optimized_rwlock_basic_functionality() {
377        let rwlock = OptimizedRwLock::new(42);
378
379        // Test read access
380        {
381            let read_guard = rwlock.read();
382            assert_eq!(*read_guard, 42);
383        }
384
385        // Test write access
386        {
387            let mut write_guard = rwlock.write();
388            *write_guard = 100;
389        }
390
391        // Verify write took effect
392        let read_guard = rwlock.read();
393        assert_eq!(*read_guard, 100);
394    }
395
396    #[test]
397    fn test_optimized_rwlock_writer_exclusivity() {
398        let rwlock = Arc::new(OptimizedRwLock::new(0));
399        let rwlock_clone = Arc::clone(&rwlock);
400
401        // Start a writer thread
402        let writer_handle = thread::spawn(move || {
403            let mut write_guard = rwlock_clone.write();
404            *write_guard = 100;
405            thread::sleep(Duration::from_millis(50)); // Hold write lock
406            *write_guard = 200;
407        });
408
409        // Give writer time to acquire lock
410        thread::sleep(Duration::from_millis(10));
411
412        // Try to read - should wait for writer to finish
413        let read_guard = rwlock.read();
414
415        // Just verify the lock works - remove timing assertions for CI stability
416        assert_eq!(*read_guard, 200);
417
418        writer_handle.join().unwrap();
419    }
420
421    #[test]
422    fn test_optimized_rwlock_try_operations() {
423        let rwlock = OptimizedRwLock::new(42);
424
425        // try_read should succeed when unlocked
426        let read_guard = rwlock.try_read();
427        assert!(read_guard.is_some());
428        if let Some(guard) = read_guard {
429            assert_eq!(*guard, 42);
430        }
431
432        // try_write should succeed when unlocked
433        let write_guard = rwlock.try_write();
434        assert!(write_guard.is_some());
435
436        // try_read should fail when write locked
437        let try_read = rwlock.try_read();
438        assert!(try_read.is_none());
439
440        // try_write should fail when already write locked
441        let try_write = rwlock.try_write();
442        assert!(try_write.is_none());
443    }
444
445    #[test]
446    fn test_lock_free_counter_basic_operations() {
447        let counter = LockFreeCounter::new(0);
448
449        assert_eq!(counter.get(), 0);
450
451        assert_eq!(counter.increment(), 1);
452        assert_eq!(counter.get(), 1);
453
454        assert_eq!(counter.add(5), 6);
455        assert_eq!(counter.get(), 6);
456
457        assert_eq!(counter.decrement(), 5);
458        assert_eq!(counter.get(), 5);
459
460        assert_eq!(counter.sub(3), 2);
461        assert_eq!(counter.get(), 2);
462    }
463
464    #[test]
465    fn test_lock_free_counter_concurrent_increments() {
466        let counter = Arc::new(LockFreeCounter::new(0));
467        let mut handles = Vec::new();
468
469        // Spawn multiple threads that increment
470        for _ in 0..10 {
471            let counter_clone = Arc::clone(&counter);
472            let handle = thread::spawn(move || {
473                for _ in 0..1000 {
474                    counter_clone.increment();
475                }
476            });
477            handles.push(handle);
478        }
479
480        // Wait for all threads
481        for handle in handles {
482            handle.join().unwrap();
483        }
484
485        // Should have exactly 10,000 increments
486        assert_eq!(counter.get(), 10000);
487    }
488
489    #[test]
490    fn test_lock_free_counter_mixed_operations() {
491        let counter = Arc::new(LockFreeCounter::new(0));
492        let mut handles = Vec::new();
493
494        // Incrementing threads
495        for _ in 0..5 {
496            let counter_clone = Arc::clone(&counter);
497            let handle = thread::spawn(move || {
498                for _ in 0..1000 {
499                    counter_clone.increment();
500                }
501            });
502            handles.push(handle);
503        }
504
505        // Decrementing threads
506        for _ in 0..3 {
507            let counter_clone = Arc::clone(&counter);
508            let handle = thread::spawn(move || {
509                for _ in 0..500 {
510                    counter_clone.decrement();
511                }
512            });
513            handles.push(handle);
514        }
515
516        // Adding threads
517        for _ in 0..2 {
518            let counter_clone = Arc::clone(&counter);
519            let handle = thread::spawn(move || {
520                for _ in 0..100 {
521                    counter_clone.add(10);
522                }
523            });
524            handles.push(handle);
525        }
526
527        for handle in handles {
528            handle.join().unwrap();
529        }
530
531        // Expected: 5*1000 - 3*500 + 2*100*10 = 5000 - 1500 + 2000 = 5500
532        assert_eq!(counter.get(), 5500);
533    }
534
535    #[test]
536    fn test_lock_free_counter_compare_and_swap() {
537        let counter = LockFreeCounter::new(42);
538
539        // Successful compare and swap
540        let result = counter.compare_and_swap(42, 100);
541        assert_eq!(result, Ok(42));
542        assert_eq!(counter.get(), 100);
543
544        // Failed compare and swap
545        let result = counter.compare_and_swap(42, 200);
546        assert_eq!(result, Err(100)); // Returns current value
547        assert_eq!(counter.get(), 100); // Value unchanged
548    }
549
550    #[test]
551    fn test_lock_free_counter_set_operation() {
552        let counter = LockFreeCounter::new(0);
553
554        counter.set(12345);
555        assert_eq!(counter.get(), 12345);
556
557        // Should work with large values
558        counter.set(u64::MAX - 1);
559        assert_eq!(counter.get(), u64::MAX - 1);
560    }
561
562    #[test]
563    fn test_lock_free_counter_default() {
564        let counter = LockFreeCounter::default();
565        assert_eq!(counter.get(), 0);
566
567        counter.increment();
568        assert_eq!(counter.get(), 1);
569    }
570
571    #[test]
572    fn test_lock_contention_performance() {
573        let mutex = Arc::new(OptimizedMutex::new(0));
574        let mut handles = Vec::new();
575
576        let start = std::time::Instant::now();
577
578        // Create high contention scenario
579        for _ in 0..20 {
580            let mutex_clone = Arc::clone(&mutex);
581            let handle = thread::spawn(move || {
582                for _ in 0..100 {
583                    let mut guard = mutex_clone.lock();
584                    *guard += 1;
585                    // Simulate minimal work while holding the lock
586                    std::hint::spin_loop();
587                }
588            });
589            handles.push(handle);
590        }
591
592        for handle in handles {
593            handle.join().unwrap();
594        }
595
596        let duration = start.elapsed();
597
598        // Should handle contention correctly and efficiently
599        assert_eq!(*mutex.lock(), 2000);
600        assert!(duration < Duration::from_secs(1)); // Should complete reasonably fast
601    }
602
603    #[test]
604    fn test_rwlock_reader_writer_fairness() {
605        let rwlock = Arc::new(OptimizedRwLock::new(0));
606        let rwlock_clone = Arc::clone(&rwlock);
607
608        // Start multiple readers
609        let mut reader_handles = Vec::new();
610        for i in 0..5 {
611            let rwlock_clone = Arc::clone(&rwlock);
612            let handle = thread::spawn(move || {
613                thread::sleep(Duration::from_millis(i * 10)); // Stagger start times
614                let _guard = rwlock_clone.read();
615                thread::sleep(Duration::from_millis(50)); // Hold read lock
616            });
617            reader_handles.push(handle);
618        }
619
620        // Start a writer after readers have started
621        thread::sleep(Duration::from_millis(20));
622        let writer_handle = thread::spawn(move || {
623            let mut guard = rwlock_clone.write();
624            *guard = 42;
625        });
626
627        // Wait for all to complete
628        for handle in reader_handles {
629            handle.join().unwrap();
630        }
631        writer_handle.join().unwrap();
632
633        // Writer should have eventually acquired the lock
634        let guard = rwlock.read();
635        assert_eq!(*guard, 42);
636    }
637}