node_replication/
rwlock.rs

1// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! The distributed readers-writer lock used by the replica.
5//!
6//! This module is only public since it needs to be exposed to the benchmarking
7//! code. For clients there is no need to rely on this directly, as the RwLock
8//! is embedded inside the Replica.
9//!
10//! # Testing with loom
11//!
12//! We're not using loom in this module because we use UnsafeCell and loom's
13//! UnsafeCell exposes a different API. Luckily, loom provides it's own RwLock
14//! implementation which (with some modifications, see `loom_rwlock.rs`) we can
15//! use in the replica code.
16
17use core::cell::UnsafeCell;
18use core::default::Default;
19use core::hint::spin_loop;
20use core::ops::{Deref, DerefMut};
21use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22
23use crossbeam_utils::CachePadded;
24
25/// Maximum number of reader threads that this lock supports.
26const MAX_READER_THREADS: usize = 192;
27const_assert!(MAX_READER_THREADS > 0);
28
29#[allow(clippy::declare_interior_mutable_const)]
30const RLOCK_DEFAULT: CachePadded<AtomicUsize> = CachePadded::new(AtomicUsize::new(0));
31
32/// A scalable reader-writer lock.
33///
34/// This lock favours reader performance over writers. Each reader thread gets
35/// its own "lock" while writers share a single lock.
36///
37/// `T` represents the underlying type protected by the lock.
38/// Calling `read()` returns a read-guard that can be used to safely read `T`.
39/// Calling `write()` returns a write-guard that can be used to safely mutate `T`.
40pub struct RwLock<T>
41where
42    T: Sized + Sync,
43{
44    /// The writer lock. There can be at most one writer at any given point of time.
45    wlock: CachePadded<AtomicBool>,
46
47    /// Each reader use an individual lock to access the underlying data-structure.
48    rlock: [CachePadded<AtomicUsize>; MAX_READER_THREADS],
49
50    /// The underlying data-structure.
51    data: UnsafeCell<T>,
52}
53
54/// A read-guard that can be used to read the underlying data structure. Writes on
55/// the data structure will be blocked as long as one of these is lying around.
56pub struct ReadGuard<'a, T: Sized + Sync + 'a> {
57    /// Id of the thread that acquired this guard. Required at drop time so that
58    /// we can release the appropriate read lock.
59    tid: usize,
60
61    /// A reference to the Rwlock wrapping the data-structure.
62    lock: &'a RwLock<T>,
63}
64
65/// A write-guard that can be used to write to the underlying data structure. All
66/// reads will be blocked until this is dropped.
67pub struct WriteGuard<'a, T: Sized + Sync + 'a> {
68    /// A reference to the Rwlock wrapping the data-structure.
69    lock: &'a RwLock<T>,
70}
71
72impl<T> Default for RwLock<T>
73where
74    T: Sized + Default + Sync,
75{
76    /// Returns a new instance of a RwLock. Default constructs the
77    /// underlying data structure.
78    fn default() -> RwLock<T> {
79        RwLock {
80            wlock: CachePadded::new(AtomicBool::new(false)),
81            rlock: [RLOCK_DEFAULT; MAX_READER_THREADS],
82            data: UnsafeCell::new(T::default()),
83        }
84    }
85}
86
87impl<T> RwLock<T>
88where
89    T: Sized + Sync,
90{
91    /// Returns a new instance of a RwLock. Default constructs the
92    /// underlying data structure.
93    pub fn new(t: T) -> Self {
94        Self {
95            wlock: CachePadded::new(AtomicBool::new(false)),
96            rlock: [RLOCK_DEFAULT; MAX_READER_THREADS],
97            data: UnsafeCell::new(t),
98        }
99    }
100
101    /// Locks the underlying data-structure for writes. The caller can retrieve
102    /// a mutable reference from the returned `WriteGuard`.
103    ///
104    /// `n` is the number of active readers currently using this reader-writer lock.
105    ///
106    /// # Example
107    ///
108    /// ```
109    ///     use node_replication::rwlock::RwLock;
110    ///
111    ///     // Create the lock.
112    ///     let lock = RwLock::<usize>::default();
113    ///
114    ///     // Acquire the write lock. This returns a guard that can be used
115    ///     // to perform writes against the protected data. We need to know
116    ///     // the number of concurrent reader threads upfront.
117    ///     const N_CONCURRENT_READERS: usize = 32;
118    ///     let mut w_guard = lock.write(N_CONCURRENT_READERS);
119    ///     *w_guard = 777;
120    /// ```
121    pub fn write(&self, n: usize) -> WriteGuard<T> {
122        // First, wait until we can acquire the writer lock.
123        loop {
124            match self.wlock.compare_exchange_weak(
125                false,
126                true,
127                Ordering::Acquire,
128                Ordering::Acquire,
129            ) {
130                Ok(_) => break,
131                Err(_) => continue,
132            }
133        }
134
135        // Next, wait until all readers have released their locks. This condition
136        // evaluates to true if each reader lock is free (i.e equal to zero).
137        while !self
138            .rlock
139            .iter()
140            .take(n)
141            .all(|item| item.load(Ordering::Relaxed) == 0)
142        {
143            spin_loop();
144        }
145
146        unsafe { WriteGuard::new(self) }
147    }
148
149    /// Locks the underlying data-structure for reads. Allows multiple readers to acquire the lock.
150    /// Blocks until there aren't any active writers.
151    ///
152    /// # Example
153    ///
154    /// ```
155    ///     use node_replication::rwlock::RwLock;
156    ///
157    ///     // Create the lock.
158    ///     let lock = RwLock::<usize>::default();
159    ///
160    ///     // Acquire the read lock. This returns a guard that can be used
161    ///     // to perform reads against the protected data. We need
162    ///     // a thread identifier to acquire this lock.
163    ///     const MY_THREAD_ID: usize = 16;
164    ///     let r_guard = lock.read(MY_THREAD_ID);
165    ///     assert_eq!(0, *r_guard);
166    pub fn read(&self, tid: usize) -> ReadGuard<T> {
167        // We perform a small optimization. Before attempting to acquire a read lock, we issue
168        // naked reads to the write lock and wait until it is free. For that, we retrieve a
169        // raw pointer to the write lock over here.
170        let ptr = unsafe {
171            &*(&self.wlock as *const crossbeam_utils::CachePadded<core::sync::atomic::AtomicBool>
172                as *const bool)
173        };
174
175        loop {
176            // First, wait until the write lock is free. This is the small
177            // optimization spoken of earlier.
178            unsafe {
179                while core::ptr::read_volatile(ptr) {
180                    spin_loop();
181                }
182            }
183
184            // Next, acquire this thread's read lock and actually check if the write lock
185            // is free. If it is, then we're good to go because any new writers will now
186            // see this acquired read lock and block. If it isn't free, then we got unlucky;
187            // release the read lock and retry.
188            self.rlock[tid].fetch_add(1, Ordering::Acquire);
189            if !self.wlock.load(Ordering::Relaxed) {
190                break;
191            }
192
193            self.rlock[tid].fetch_sub(1, Ordering::Release);
194        }
195
196        unsafe { ReadGuard::new(self, tid) }
197    }
198
199    /// Unlocks the write lock; invoked by the drop() method.
200    pub(in crate::rwlock) unsafe fn write_unlock(&self) {
201        match self
202            .wlock
203            .compare_exchange_weak(true, false, Ordering::Acquire, Ordering::Acquire)
204        {
205            Ok(_) => (),
206            Err(_) => panic!("write_unlock() called without acquiring the write lock"),
207        }
208    }
209
210    /// Unlocks the read lock; called by the drop() method.
211    pub(in crate::rwlock) unsafe fn read_unlock(&self, tid: usize) {
212        if self.rlock[tid].fetch_sub(1, Ordering::Release) == 0 {
213            panic!("read_unlock() called without acquiring the read lock");
214        }
215    }
216}
217
218impl<'rwlock, T: Sized + Sync> ReadGuard<'rwlock, T> {
219    /// Returns a read guard over a passed in reader-writer lock.
220    unsafe fn new(lock: &'rwlock RwLock<T>, tid: usize) -> ReadGuard<'rwlock, T> {
221        ReadGuard { tid, lock }
222    }
223}
224
225impl<'rwlock, T: Sized + Sync> WriteGuard<'rwlock, T> {
226    /// Returns a write guard over a passed in reader-writer lock.
227    unsafe fn new(lock: &'rwlock RwLock<T>) -> WriteGuard<'rwlock, T> {
228        WriteGuard { lock }
229    }
230}
231
232/// `Sync` trait allows `RwLock` to be shared between threads. The `read()` and
233/// `write()` logic ensures that we will never have threads writing to and
234/// reading from the underlying data structure simultaneously.
235unsafe impl<T: Sized + Sync> Sync for RwLock<T> {}
236
237/// This `Deref` trait allows a thread to use T from a ReadGuard.
238/// ReadGuard can only be dereferenced into an immutable reference.
239impl<T: Sized + Sync> Deref for ReadGuard<'_, T> {
240    type Target = T;
241
242    fn deref(&self) -> &T {
243        unsafe { &*self.lock.data.get() }
244    }
245}
246
247/// This `Deref` trait allows a thread to use T from a WriteGuard.
248/// This allows us to dereference an immutable reference.
249impl<T: Sized + Sync> Deref for WriteGuard<'_, T> {
250    type Target = T;
251
252    fn deref(&self) -> &T {
253        unsafe { &*self.lock.data.get() }
254    }
255}
256
257/// This `DerefMut` trait allow a thread to use T from a WriteGuard.
258/// This allows us to dereference a mutable reference.
259impl<T: Sized + Sync> DerefMut for WriteGuard<'_, T> {
260    fn deref_mut(&mut self) -> &mut T {
261        unsafe { &mut *self.lock.data.get() }
262    }
263}
264
265/// This `Drop` trait implements the unlock logic for a reader lock. Once the `ReadGuard`
266/// goes out of scope, the corresponding read lock is marked as released.
267impl<T: Sized + Sync> Drop for ReadGuard<'_, T> {
268    fn drop(&mut self) {
269        unsafe {
270            let tid = self.tid;
271            self.lock.read_unlock(tid);
272        }
273    }
274}
275
276/// This `Drop` trait implements the unlock logic for a writer lock. Once the `WriteGuard`
277/// goes out of scope, the corresponding write lock is marked as released.
278impl<T: Sized + Sync> Drop for WriteGuard<'_, T> {
279    fn drop(&mut self) {
280        unsafe {
281            self.lock.write_unlock();
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::{RwLock, MAX_READER_THREADS};
289    use std::sync::atomic::{AtomicUsize, Ordering};
290    use std::sync::Arc;
291    use std::thread;
292    use std::vec::Vec;
293
294    // Tests if we can successfully default-construct a reader-writer lock.
295    #[test]
296    fn test_rwlock_default() {
297        let lock = RwLock::<usize>::default();
298
299        assert_eq!(lock.wlock.load(Ordering::Relaxed), false);
300        for idx in 0..MAX_READER_THREADS {
301            assert_eq!(lock.rlock[idx].load(Ordering::Relaxed), 0);
302        }
303        assert_eq!(unsafe { *lock.data.get() }, usize::default());
304    }
305
306    // Tests if the mutable reference returned on acquiring a write lock
307    // can be used to write to the underlying data structure.
308    #[test]
309    fn test_writer_lock() {
310        let lock = RwLock::<usize>::default();
311        let val = 10;
312
313        let mut guard = lock.write(1);
314        *guard = val;
315
316        assert_eq!(lock.wlock.load(Ordering::Relaxed), true);
317        assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 0);
318        assert_eq!(unsafe { *lock.data.get() }, val);
319    }
320
321    // Tests if the write lock is released once a WriteGuard goes out of scope.
322    #[test]
323    fn test_writer_unlock() {
324        let lock = RwLock::<usize>::default();
325
326        {
327            let mut _guard = lock.write(1);
328            assert_eq!(lock.wlock.load(Ordering::Relaxed), true);
329        }
330
331        assert_eq!(lock.wlock.load(Ordering::Relaxed), false);
332    }
333
334    // Tests if the immutable reference returned on acquiring a read lock
335    // can be used to read from the underlying data structure.
336    #[test]
337    fn test_reader_lock() {
338        let lock = RwLock::<usize>::default();
339        let val = 10;
340
341        unsafe {
342            *lock.data.get() = val;
343        }
344        let guard = lock.read(0);
345
346        assert_eq!(lock.wlock.load(Ordering::Relaxed), false);
347        assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 1);
348        assert_eq!(*guard, val);
349    }
350
351    // Tests if a reader lock is released once a ReadGuard goes out of scope.
352    #[test]
353    fn test_reader_unlock() {
354        let lock = RwLock::<usize>::default();
355
356        {
357            let mut _guard = lock.read(0);
358            assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 1);
359        }
360
361        assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 0);
362    }
363
364    // Tests that multiple readers can simultaneously acquire a readers lock
365    #[test]
366    fn test_multiple_readers() {
367        let lock = RwLock::<usize>::default();
368        let val = 10;
369
370        unsafe {
371            *lock.data.get() = val;
372        }
373
374        let f = lock.read(0);
375        let s = lock.read(1);
376        let t = lock.read(2);
377
378        assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 1);
379        assert_eq!(lock.rlock[1].load(Ordering::Relaxed), 1);
380        assert_eq!(lock.rlock[2].load(Ordering::Relaxed), 1);
381        assert_eq!(*f, val);
382        assert_eq!(*s, val);
383        assert_eq!(*t, val);
384    }
385
386    // Tests that multiple writers and readers whose scopes don't interfere can
387    // acquire the lock.
388    #[test]
389    fn test_lock_combinations() {
390        let l = RwLock::<usize>::default();
391
392        {
393            let _g = l.write(2);
394        }
395
396        {
397            let _g = l.write(2);
398        }
399
400        {
401            let _f = l.read(0);
402            let _s = l.read(1);
403        }
404
405        {
406            let _g = l.write(2);
407        }
408    }
409
410    // Tests that writes to the underlying data structure are atomic.
411    #[test]
412    fn test_atomic_writes() {
413        let lock = Arc::new(RwLock::<usize>::default());
414        let t = 100;
415
416        let mut threads = Vec::new();
417        for _i in 0..t {
418            let l = lock.clone();
419            let child = thread::spawn(move || {
420                let mut ele = l.write(t);
421                *ele += 1;
422            });
423            threads.push(child);
424        }
425
426        for _i in 0..threads.len() {
427            let _retval = threads
428                .pop()
429                .unwrap()
430                .join()
431                .expect("Thread didn't finish successfully.");
432        }
433
434        assert_eq!(unsafe { *lock.data.get() }, t);
435    }
436
437    // Tests that the multiple readers can read from the lock in parallel.
438    #[test]
439    fn test_parallel_readers() {
440        let lock = Arc::new(RwLock::<usize>::default());
441        let t = 100;
442
443        unsafe {
444            *lock.data.get() = t;
445        }
446
447        let mut threads = Vec::new();
448        for i in 0..t {
449            let l = lock.clone();
450            let child = thread::spawn(move || {
451                let ele = l.read(i);
452                assert_eq!(*ele, t);
453            });
454            threads.push(child);
455        }
456
457        for _i in 0..threads.len() {
458            let _retval = threads
459                .pop()
460                .unwrap()
461                .join()
462                .expect("Reading didn't finish successfully.");
463        }
464    }
465
466    // Tests that write_unlock() panics if called without acquiring a write lock.
467    #[test]
468    #[should_panic]
469    fn test_writer_unlock_without_lock() {
470        let lock = RwLock::<usize>::default();
471        unsafe { lock.write_unlock() };
472    }
473
474    // Tests that read_unlock() panics if called without acquiring a write lock.
475    #[test]
476    #[should_panic]
477    fn test_reader_unlock_without_lock() {
478        let lock = RwLock::<usize>::default();
479        unsafe { lock.read_unlock(1) };
480    }
481
482    // Tests that a read lock cannot be held along with a write lock.
483    //
484    // The second lock operation in this test should block indefinitely, and
485    // the main thread should panic after waking up because the atomic wasn't
486    // written to.
487    //
488    // If the main thread doesn't panic, then it means that we've got a bug
489    // that allows readers to acquire the lock despite a writer already having
490    // done so.
491    #[test]
492    #[should_panic(expected = "This test should always panic")]
493    fn test_reader_after_writer() {
494        let lock = RwLock::<usize>::default();
495        let shared = Arc::new(AtomicUsize::new(0));
496
497        let s = shared.clone();
498        let lock_thread = thread::spawn(move || {
499            let _w = lock.write(1);
500            let _r = lock.read(0);
501            s.store(1, Ordering::SeqCst);
502        });
503
504        thread::sleep(std::time::Duration::from_secs(2));
505        if shared.load(Ordering::SeqCst) == 0 {
506            panic!("This test should always panic");
507        }
508        lock_thread.join().unwrap();
509    }
510
511    // Tests that a write lock cannot be held along with a read lock.
512    //
513    // The second lock operation in this test should block indefinitely, and
514    // the main thread should panic after waking up because the atomic wasn't
515    // written to.
516    //
517    // If the main thread doesn't panic, then it means that we've got a bug
518    // that allows writers to acquire the lock despite a reader already having
519    // done so.
520    #[test]
521    #[should_panic(expected = "This test should always panic")]
522    fn test_writer_after_reader() {
523        let lock = RwLock::<usize>::default();
524        let shared = Arc::new(AtomicUsize::new(0));
525
526        let s = shared.clone();
527        let lock_thread = thread::spawn(move || {
528            let _r = lock.read(0);
529            let _w = lock.write(1);
530            s.store(1, Ordering::SeqCst);
531        });
532
533        thread::sleep(std::time::Duration::from_secs(2));
534        if shared.load(Ordering::SeqCst) == 0 {
535            panic!("This test should always panic");
536        }
537        lock_thread.join().unwrap();
538    }
539
540    // Tests that a write lock cannot be held along with another write lock.
541    //
542    // The second lock operation in this test should block indefinitely, and
543    // the main thread should panic after waking up because the atomic wasn't
544    // written to.
545    //
546    // If the main thread doesn't panic, then it means that we've got a bug
547    // that allows writers to acquire the lock despite a writer already having
548    // done so.
549    #[test]
550    #[should_panic(expected = "This test should always panic")]
551    fn test_writer_after_writer() {
552        let lock = RwLock::<usize>::default();
553        let shared = Arc::new(AtomicUsize::new(0));
554
555        let s = shared.clone();
556        let lock_thread = thread::spawn(move || {
557            let _f = lock.write(1);
558            let _s = lock.write(1);
559            s.store(1, Ordering::SeqCst);
560        });
561
562        thread::sleep(std::time::Duration::from_secs(2));
563        if shared.load(Ordering::SeqCst) == 0 {
564            panic!("This test should always panic");
565        }
566        lock_thread.join().unwrap();
567    }
568}