node_replication/rwlock.rs
1// Copyright © 2019-2020 VMware, Inc. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! The distributed readers-writer lock used by the replica.
5//!
6//! This module is only public since it needs to be exposed to the benchmarking
7//! code. For clients there is no need to rely on this directly, as the RwLock
8//! is embedded inside the Replica.
9//!
10//! # Testing with loom
11//!
12//! We're not using loom in this module because we use UnsafeCell and loom's
13//! UnsafeCell exposes a different API. Luckily, loom provides it's own RwLock
14//! implementation which (with some modifications, see `loom_rwlock.rs`) we can
15//! use in the replica code.
16
17use core::cell::UnsafeCell;
18use core::default::Default;
19use core::hint::spin_loop;
20use core::ops::{Deref, DerefMut};
21use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22
23use crossbeam_utils::CachePadded;
24
25/// Maximum number of reader threads that this lock supports.
26const MAX_READER_THREADS: usize = 192;
27const_assert!(MAX_READER_THREADS > 0);
28
29#[allow(clippy::declare_interior_mutable_const)]
30const RLOCK_DEFAULT: CachePadded<AtomicUsize> = CachePadded::new(AtomicUsize::new(0));
31
32/// A scalable reader-writer lock.
33///
34/// This lock favours reader performance over writers. Each reader thread gets
35/// its own "lock" while writers share a single lock.
36///
37/// `T` represents the underlying type protected by the lock.
38/// Calling `read()` returns a read-guard that can be used to safely read `T`.
39/// Calling `write()` returns a write-guard that can be used to safely mutate `T`.
40pub struct RwLock<T>
41where
42 T: Sized + Sync,
43{
44 /// The writer lock. There can be at most one writer at any given point of time.
45 wlock: CachePadded<AtomicBool>,
46
47 /// Each reader use an individual lock to access the underlying data-structure.
48 rlock: [CachePadded<AtomicUsize>; MAX_READER_THREADS],
49
50 /// The underlying data-structure.
51 data: UnsafeCell<T>,
52}
53
54/// A read-guard that can be used to read the underlying data structure. Writes on
55/// the data structure will be blocked as long as one of these is lying around.
56pub struct ReadGuard<'a, T: Sized + Sync + 'a> {
57 /// Id of the thread that acquired this guard. Required at drop time so that
58 /// we can release the appropriate read lock.
59 tid: usize,
60
61 /// A reference to the Rwlock wrapping the data-structure.
62 lock: &'a RwLock<T>,
63}
64
65/// A write-guard that can be used to write to the underlying data structure. All
66/// reads will be blocked until this is dropped.
67pub struct WriteGuard<'a, T: Sized + Sync + 'a> {
68 /// A reference to the Rwlock wrapping the data-structure.
69 lock: &'a RwLock<T>,
70}
71
72impl<T> Default for RwLock<T>
73where
74 T: Sized + Default + Sync,
75{
76 /// Returns a new instance of a RwLock. Default constructs the
77 /// underlying data structure.
78 fn default() -> RwLock<T> {
79 RwLock {
80 wlock: CachePadded::new(AtomicBool::new(false)),
81 rlock: [RLOCK_DEFAULT; MAX_READER_THREADS],
82 data: UnsafeCell::new(T::default()),
83 }
84 }
85}
86
87impl<T> RwLock<T>
88where
89 T: Sized + Sync,
90{
91 /// Returns a new instance of a RwLock. Default constructs the
92 /// underlying data structure.
93 pub fn new(t: T) -> Self {
94 Self {
95 wlock: CachePadded::new(AtomicBool::new(false)),
96 rlock: [RLOCK_DEFAULT; MAX_READER_THREADS],
97 data: UnsafeCell::new(t),
98 }
99 }
100
101 /// Locks the underlying data-structure for writes. The caller can retrieve
102 /// a mutable reference from the returned `WriteGuard`.
103 ///
104 /// `n` is the number of active readers currently using this reader-writer lock.
105 ///
106 /// # Example
107 ///
108 /// ```
109 /// use node_replication::rwlock::RwLock;
110 ///
111 /// // Create the lock.
112 /// let lock = RwLock::<usize>::default();
113 ///
114 /// // Acquire the write lock. This returns a guard that can be used
115 /// // to perform writes against the protected data. We need to know
116 /// // the number of concurrent reader threads upfront.
117 /// const N_CONCURRENT_READERS: usize = 32;
118 /// let mut w_guard = lock.write(N_CONCURRENT_READERS);
119 /// *w_guard = 777;
120 /// ```
121 pub fn write(&self, n: usize) -> WriteGuard<T> {
122 // First, wait until we can acquire the writer lock.
123 loop {
124 match self.wlock.compare_exchange_weak(
125 false,
126 true,
127 Ordering::Acquire,
128 Ordering::Acquire,
129 ) {
130 Ok(_) => break,
131 Err(_) => continue,
132 }
133 }
134
135 // Next, wait until all readers have released their locks. This condition
136 // evaluates to true if each reader lock is free (i.e equal to zero).
137 while !self
138 .rlock
139 .iter()
140 .take(n)
141 .all(|item| item.load(Ordering::Relaxed) == 0)
142 {
143 spin_loop();
144 }
145
146 unsafe { WriteGuard::new(self) }
147 }
148
149 /// Locks the underlying data-structure for reads. Allows multiple readers to acquire the lock.
150 /// Blocks until there aren't any active writers.
151 ///
152 /// # Example
153 ///
154 /// ```
155 /// use node_replication::rwlock::RwLock;
156 ///
157 /// // Create the lock.
158 /// let lock = RwLock::<usize>::default();
159 ///
160 /// // Acquire the read lock. This returns a guard that can be used
161 /// // to perform reads against the protected data. We need
162 /// // a thread identifier to acquire this lock.
163 /// const MY_THREAD_ID: usize = 16;
164 /// let r_guard = lock.read(MY_THREAD_ID);
165 /// assert_eq!(0, *r_guard);
166 pub fn read(&self, tid: usize) -> ReadGuard<T> {
167 // We perform a small optimization. Before attempting to acquire a read lock, we issue
168 // naked reads to the write lock and wait until it is free. For that, we retrieve a
169 // raw pointer to the write lock over here.
170 let ptr = unsafe {
171 &*(&self.wlock as *const crossbeam_utils::CachePadded<core::sync::atomic::AtomicBool>
172 as *const bool)
173 };
174
175 loop {
176 // First, wait until the write lock is free. This is the small
177 // optimization spoken of earlier.
178 unsafe {
179 while core::ptr::read_volatile(ptr) {
180 spin_loop();
181 }
182 }
183
184 // Next, acquire this thread's read lock and actually check if the write lock
185 // is free. If it is, then we're good to go because any new writers will now
186 // see this acquired read lock and block. If it isn't free, then we got unlucky;
187 // release the read lock and retry.
188 self.rlock[tid].fetch_add(1, Ordering::Acquire);
189 if !self.wlock.load(Ordering::Relaxed) {
190 break;
191 }
192
193 self.rlock[tid].fetch_sub(1, Ordering::Release);
194 }
195
196 unsafe { ReadGuard::new(self, tid) }
197 }
198
199 /// Unlocks the write lock; invoked by the drop() method.
200 pub(in crate::rwlock) unsafe fn write_unlock(&self) {
201 match self
202 .wlock
203 .compare_exchange_weak(true, false, Ordering::Acquire, Ordering::Acquire)
204 {
205 Ok(_) => (),
206 Err(_) => panic!("write_unlock() called without acquiring the write lock"),
207 }
208 }
209
210 /// Unlocks the read lock; called by the drop() method.
211 pub(in crate::rwlock) unsafe fn read_unlock(&self, tid: usize) {
212 if self.rlock[tid].fetch_sub(1, Ordering::Release) == 0 {
213 panic!("read_unlock() called without acquiring the read lock");
214 }
215 }
216}
217
218impl<'rwlock, T: Sized + Sync> ReadGuard<'rwlock, T> {
219 /// Returns a read guard over a passed in reader-writer lock.
220 unsafe fn new(lock: &'rwlock RwLock<T>, tid: usize) -> ReadGuard<'rwlock, T> {
221 ReadGuard { tid, lock }
222 }
223}
224
225impl<'rwlock, T: Sized + Sync> WriteGuard<'rwlock, T> {
226 /// Returns a write guard over a passed in reader-writer lock.
227 unsafe fn new(lock: &'rwlock RwLock<T>) -> WriteGuard<'rwlock, T> {
228 WriteGuard { lock }
229 }
230}
231
232/// `Sync` trait allows `RwLock` to be shared between threads. The `read()` and
233/// `write()` logic ensures that we will never have threads writing to and
234/// reading from the underlying data structure simultaneously.
235unsafe impl<T: Sized + Sync> Sync for RwLock<T> {}
236
237/// This `Deref` trait allows a thread to use T from a ReadGuard.
238/// ReadGuard can only be dereferenced into an immutable reference.
239impl<T: Sized + Sync> Deref for ReadGuard<'_, T> {
240 type Target = T;
241
242 fn deref(&self) -> &T {
243 unsafe { &*self.lock.data.get() }
244 }
245}
246
247/// This `Deref` trait allows a thread to use T from a WriteGuard.
248/// This allows us to dereference an immutable reference.
249impl<T: Sized + Sync> Deref for WriteGuard<'_, T> {
250 type Target = T;
251
252 fn deref(&self) -> &T {
253 unsafe { &*self.lock.data.get() }
254 }
255}
256
257/// This `DerefMut` trait allow a thread to use T from a WriteGuard.
258/// This allows us to dereference a mutable reference.
259impl<T: Sized + Sync> DerefMut for WriteGuard<'_, T> {
260 fn deref_mut(&mut self) -> &mut T {
261 unsafe { &mut *self.lock.data.get() }
262 }
263}
264
265/// This `Drop` trait implements the unlock logic for a reader lock. Once the `ReadGuard`
266/// goes out of scope, the corresponding read lock is marked as released.
267impl<T: Sized + Sync> Drop for ReadGuard<'_, T> {
268 fn drop(&mut self) {
269 unsafe {
270 let tid = self.tid;
271 self.lock.read_unlock(tid);
272 }
273 }
274}
275
276/// This `Drop` trait implements the unlock logic for a writer lock. Once the `WriteGuard`
277/// goes out of scope, the corresponding write lock is marked as released.
278impl<T: Sized + Sync> Drop for WriteGuard<'_, T> {
279 fn drop(&mut self) {
280 unsafe {
281 self.lock.write_unlock();
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::{RwLock, MAX_READER_THREADS};
289 use std::sync::atomic::{AtomicUsize, Ordering};
290 use std::sync::Arc;
291 use std::thread;
292 use std::vec::Vec;
293
294 // Tests if we can successfully default-construct a reader-writer lock.
295 #[test]
296 fn test_rwlock_default() {
297 let lock = RwLock::<usize>::default();
298
299 assert_eq!(lock.wlock.load(Ordering::Relaxed), false);
300 for idx in 0..MAX_READER_THREADS {
301 assert_eq!(lock.rlock[idx].load(Ordering::Relaxed), 0);
302 }
303 assert_eq!(unsafe { *lock.data.get() }, usize::default());
304 }
305
306 // Tests if the mutable reference returned on acquiring a write lock
307 // can be used to write to the underlying data structure.
308 #[test]
309 fn test_writer_lock() {
310 let lock = RwLock::<usize>::default();
311 let val = 10;
312
313 let mut guard = lock.write(1);
314 *guard = val;
315
316 assert_eq!(lock.wlock.load(Ordering::Relaxed), true);
317 assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 0);
318 assert_eq!(unsafe { *lock.data.get() }, val);
319 }
320
321 // Tests if the write lock is released once a WriteGuard goes out of scope.
322 #[test]
323 fn test_writer_unlock() {
324 let lock = RwLock::<usize>::default();
325
326 {
327 let mut _guard = lock.write(1);
328 assert_eq!(lock.wlock.load(Ordering::Relaxed), true);
329 }
330
331 assert_eq!(lock.wlock.load(Ordering::Relaxed), false);
332 }
333
334 // Tests if the immutable reference returned on acquiring a read lock
335 // can be used to read from the underlying data structure.
336 #[test]
337 fn test_reader_lock() {
338 let lock = RwLock::<usize>::default();
339 let val = 10;
340
341 unsafe {
342 *lock.data.get() = val;
343 }
344 let guard = lock.read(0);
345
346 assert_eq!(lock.wlock.load(Ordering::Relaxed), false);
347 assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 1);
348 assert_eq!(*guard, val);
349 }
350
351 // Tests if a reader lock is released once a ReadGuard goes out of scope.
352 #[test]
353 fn test_reader_unlock() {
354 let lock = RwLock::<usize>::default();
355
356 {
357 let mut _guard = lock.read(0);
358 assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 1);
359 }
360
361 assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 0);
362 }
363
364 // Tests that multiple readers can simultaneously acquire a readers lock
365 #[test]
366 fn test_multiple_readers() {
367 let lock = RwLock::<usize>::default();
368 let val = 10;
369
370 unsafe {
371 *lock.data.get() = val;
372 }
373
374 let f = lock.read(0);
375 let s = lock.read(1);
376 let t = lock.read(2);
377
378 assert_eq!(lock.rlock[0].load(Ordering::Relaxed), 1);
379 assert_eq!(lock.rlock[1].load(Ordering::Relaxed), 1);
380 assert_eq!(lock.rlock[2].load(Ordering::Relaxed), 1);
381 assert_eq!(*f, val);
382 assert_eq!(*s, val);
383 assert_eq!(*t, val);
384 }
385
386 // Tests that multiple writers and readers whose scopes don't interfere can
387 // acquire the lock.
388 #[test]
389 fn test_lock_combinations() {
390 let l = RwLock::<usize>::default();
391
392 {
393 let _g = l.write(2);
394 }
395
396 {
397 let _g = l.write(2);
398 }
399
400 {
401 let _f = l.read(0);
402 let _s = l.read(1);
403 }
404
405 {
406 let _g = l.write(2);
407 }
408 }
409
410 // Tests that writes to the underlying data structure are atomic.
411 #[test]
412 fn test_atomic_writes() {
413 let lock = Arc::new(RwLock::<usize>::default());
414 let t = 100;
415
416 let mut threads = Vec::new();
417 for _i in 0..t {
418 let l = lock.clone();
419 let child = thread::spawn(move || {
420 let mut ele = l.write(t);
421 *ele += 1;
422 });
423 threads.push(child);
424 }
425
426 for _i in 0..threads.len() {
427 let _retval = threads
428 .pop()
429 .unwrap()
430 .join()
431 .expect("Thread didn't finish successfully.");
432 }
433
434 assert_eq!(unsafe { *lock.data.get() }, t);
435 }
436
437 // Tests that the multiple readers can read from the lock in parallel.
438 #[test]
439 fn test_parallel_readers() {
440 let lock = Arc::new(RwLock::<usize>::default());
441 let t = 100;
442
443 unsafe {
444 *lock.data.get() = t;
445 }
446
447 let mut threads = Vec::new();
448 for i in 0..t {
449 let l = lock.clone();
450 let child = thread::spawn(move || {
451 let ele = l.read(i);
452 assert_eq!(*ele, t);
453 });
454 threads.push(child);
455 }
456
457 for _i in 0..threads.len() {
458 let _retval = threads
459 .pop()
460 .unwrap()
461 .join()
462 .expect("Reading didn't finish successfully.");
463 }
464 }
465
466 // Tests that write_unlock() panics if called without acquiring a write lock.
467 #[test]
468 #[should_panic]
469 fn test_writer_unlock_without_lock() {
470 let lock = RwLock::<usize>::default();
471 unsafe { lock.write_unlock() };
472 }
473
474 // Tests that read_unlock() panics if called without acquiring a write lock.
475 #[test]
476 #[should_panic]
477 fn test_reader_unlock_without_lock() {
478 let lock = RwLock::<usize>::default();
479 unsafe { lock.read_unlock(1) };
480 }
481
482 // Tests that a read lock cannot be held along with a write lock.
483 //
484 // The second lock operation in this test should block indefinitely, and
485 // the main thread should panic after waking up because the atomic wasn't
486 // written to.
487 //
488 // If the main thread doesn't panic, then it means that we've got a bug
489 // that allows readers to acquire the lock despite a writer already having
490 // done so.
491 #[test]
492 #[should_panic(expected = "This test should always panic")]
493 fn test_reader_after_writer() {
494 let lock = RwLock::<usize>::default();
495 let shared = Arc::new(AtomicUsize::new(0));
496
497 let s = shared.clone();
498 let lock_thread = thread::spawn(move || {
499 let _w = lock.write(1);
500 let _r = lock.read(0);
501 s.store(1, Ordering::SeqCst);
502 });
503
504 thread::sleep(std::time::Duration::from_secs(2));
505 if shared.load(Ordering::SeqCst) == 0 {
506 panic!("This test should always panic");
507 }
508 lock_thread.join().unwrap();
509 }
510
511 // Tests that a write lock cannot be held along with a read lock.
512 //
513 // The second lock operation in this test should block indefinitely, and
514 // the main thread should panic after waking up because the atomic wasn't
515 // written to.
516 //
517 // If the main thread doesn't panic, then it means that we've got a bug
518 // that allows writers to acquire the lock despite a reader already having
519 // done so.
520 #[test]
521 #[should_panic(expected = "This test should always panic")]
522 fn test_writer_after_reader() {
523 let lock = RwLock::<usize>::default();
524 let shared = Arc::new(AtomicUsize::new(0));
525
526 let s = shared.clone();
527 let lock_thread = thread::spawn(move || {
528 let _r = lock.read(0);
529 let _w = lock.write(1);
530 s.store(1, Ordering::SeqCst);
531 });
532
533 thread::sleep(std::time::Duration::from_secs(2));
534 if shared.load(Ordering::SeqCst) == 0 {
535 panic!("This test should always panic");
536 }
537 lock_thread.join().unwrap();
538 }
539
540 // Tests that a write lock cannot be held along with another write lock.
541 //
542 // The second lock operation in this test should block indefinitely, and
543 // the main thread should panic after waking up because the atomic wasn't
544 // written to.
545 //
546 // If the main thread doesn't panic, then it means that we've got a bug
547 // that allows writers to acquire the lock despite a writer already having
548 // done so.
549 #[test]
550 #[should_panic(expected = "This test should always panic")]
551 fn test_writer_after_writer() {
552 let lock = RwLock::<usize>::default();
553 let shared = Arc::new(AtomicUsize::new(0));
554
555 let s = shared.clone();
556 let lock_thread = thread::spawn(move || {
557 let _f = lock.write(1);
558 let _s = lock.write(1);
559 s.store(1, Ordering::SeqCst);
560 });
561
562 thread::sleep(std::time::Duration::from_secs(2));
563 if shared.load(Ordering::SeqCst) == 0 {
564 panic!("This test should always panic");
565 }
566 lock_thread.join().unwrap();
567 }
568}