1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
use super::RwLockReadGuard;
use super::RwLockWriteGuard;
use crate::redis::rwlock::constants::{READER_LOCK, UUID_SCRIPT, WRITER_LOCK};
use crate::redis::{Generic, LockError};
use redis::Connection;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::ops::{Deref, DerefMut};
/// A Read-Write Lock.
///
/// This lock is similar to the [std::sync::RwLock](https://doc.rust-lang.org/std/sync/struct.RwLock.html).
/// But it is distributed over multiple instances of the same service.
///
/// # Threads
///
/// If you try to get a writer lock in a thread, which already has a reader lock, you will end up in a deadlock.
/// To use the RwLock in threads, you need a scoped thread.
///
/// # Examples
///
/// ## Linear usage
/// ```
/// use dtypes::redis::RwLock;
/// use dtypes::redis::Di32;
/// use std::thread;
///
/// let client = redis::Client::open("redis://localhost:6379").unwrap();
/// let client2 = client.clone();
/// let mut i32 = Di32::with_value(1, "test_rwlock_example1", client.clone());
/// let mut lock = RwLock::new(i32);
///
/// // many reader locks can be held at once
/// {
/// let read1 = lock.read().unwrap();
/// let read2 = lock.read().unwrap();
/// assert_eq!(*read1, 1);
/// } // read locks are dropped at this point
///
/// // only one writer lock can be held, however
/// {
/// let mut write1 = lock.write().unwrap();
/// write1.store(2).unwrap();
/// assert_eq!(*write1, 2);
/// // the next line is not allowed, because it deadlocks.
/// //let mut _ = lock.write().unwrap();
/// } // write lock is dropped here
///
/// // look, you can read it again
/// {
/// let read1 = lock.read().unwrap();
/// assert_eq!(*read1, 2);
/// }
/// ```
/// ## Threaded usage
/// ```
/// use dtypes::redis::RwLock;
/// use dtypes::redis::Di32;
/// use std::thread;
///
/// let client = redis::Client::open("redis://localhost:6379").unwrap();
/// let i32 = Di32::with_value(1, "test_rwlock_example2", client.clone());
/// let mut lock = RwLock::new(i32);
/// // the reader lock is dropped immediately
/// assert_eq!(*lock.read().unwrap(), 1);
/// // Scoped threads are needed, otherwise the lifetime is unclear.
/// thread::scope(|s| {
/// s.spawn(|| {
/// let mut write = lock.write().unwrap();
/// write.store(2);
/// assert_eq!(*write, 2);
/// }).join().unwrap();
/// });
/// assert_eq!(*lock.read().unwrap(), 2);
/// ```
pub struct RwLock<T> {
pub(crate) data: Generic<T>,
pub(crate) conn: Option<redis::Connection>,
}
impl<T> RwLock<T>
where
T: Serialize + DeserializeOwned,
{
pub fn new(data: Generic<T>) -> Self {
Self { data, conn: None }
}
/// Creates a new RwLock Reader.
///
/// This function blocks until the lock is acquired.
/// If there is a writer lock, this function blocks until the writer lock is dropped.
/// Also if there is a writer locks waiting to be acquired, this function blocks until the writer lock is acquired and dropped.
pub fn read(&self) -> Result<RwLockReadGuard<T>, LockError> {
let mut conn = self.client.clone().get_connection().unwrap();
let uuid = self.acquire_via_script(READER_LOCK, &mut conn);
Ok(RwLockReadGuard::new(self, uuid, conn))
}
/// Creates a new RwLock Writer.
///
/// This function blocks until the lock is acquired.
/// If there is a reader lock, this function blocks until the reader lock is dropped.
/// The acquiring writer lock has priority over any waiting reader lock.
pub fn write(&mut self) -> Result<RwLockWriteGuard<T>, LockError> {
let mut conn = self.client.clone().get_connection().unwrap();
let uuid = self.acquire_via_script(WRITER_LOCK, &mut conn);
Ok(RwLockWriteGuard::new(self, uuid, conn))
}
fn acquire_via_script(&self, script: &str, conn: &mut Connection) -> usize {
let uuid = self.generate_uuid(conn);
let mut res = false;
while !res {
res = redis::Script::new(script)
.arg(&self.data.key)
.arg(uuid)
.arg(2)
.invoke(conn)
.unwrap();
}
uuid
}
pub(crate) fn generate_uuid(&self, conn: &mut Connection) -> usize {
redis::Script::new(UUID_SCRIPT)
.arg(&self.data.key)
.invoke(conn)
.unwrap()
}
}
impl<T> Deref for RwLock<T> {
type Target = Generic<T>;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<T> DerefMut for RwLock<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::redis::*;
use std::mem::ManuallyDrop;
#[test]
fn test_rwlock() {
let client = redis::Client::open("redis://localhost:6379").unwrap();
let i32 = Di32::with_value(1, "test_rwlock", client.clone());
let mut lock = RwLock::new(i32);
{
// multiple reader locks can be held at once
let read = lock.read().unwrap();
assert_eq!(*read, 1);
let read2 = lock.read().unwrap();
assert_eq!(*read2, 1);
}
{
// only one writer lock can be held, however
let mut write = lock.write().unwrap();
write.store(2).unwrap();
assert_eq!(*write, 2);
}
// look, you can read it again
let read = lock.read().unwrap();
assert_eq!(*read, 2);
}
#[test]
fn test_rwlock_deadlock() {
let client = redis::Client::open("redis://localhost:6379").unwrap();
let i32 = Di32::with_value(1, "test_rwlock_deadlock", client.clone());
let mut lock = RwLock::new(i32);
{
let _ = ManuallyDrop::new(lock.read().unwrap());
}
eprintln!("1");
// This should not deadlocked forever
{
let _ = lock.write().unwrap();
}
{
let _ = ManuallyDrop::new(lock.write().unwrap());
}
// This should not deadlocked forever
{
let _ = lock.read().unwrap();
}
}
}