dtypes/redis/rwlock/
lock.rs

1use super::RwLockReadGuard;
2use super::RwLockWriteGuard;
3use crate::redis::rwlock::constants::{READER_LOCK, UUID_SCRIPT, WRITER_LOCK};
4use crate::redis::{sync::LockError, types::Generic};
5use redis::Connection;
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use std::ops::{Deref, DerefMut};
9
10/// A Read-Write Lock.
11///
12/// This lock is similar to the [std::sync::RwLock](https://doc.rust-lang.org/std/sync/struct.RwLock.html).
13/// But it is distributed over multiple instances of the same service.
14///
15/// # Threads
16///
17/// If you try to get a writer lock in a thread, which already has a reader lock, you will end up in a deadlock.
18/// To use the RwLock in threads, you need a scoped thread.
19///  
20/// # Examples
21///
22/// ## Linear usage
23/// ```
24/// use dtypes::redis::sync::RwLock;
25/// use dtypes::redis::types::Di32;
26/// use std::thread;
27///
28/// let client = redis::Client::open("redis://localhost:6379").unwrap();
29/// let client2 = client.clone();
30/// let mut i32 = Di32::with_value(1, "test_rwlock_example1", client.clone());
31/// let mut lock = RwLock::new(i32);
32///
33/// // many reader locks can be held at once
34/// {
35///     let read1 = lock.read().unwrap();
36///     let read2 = lock.read().unwrap();
37///     assert_eq!(*read1, 1);
38/// } // read locks are dropped at this point
39///
40/// // only one writer lock can be held, however
41/// {
42///     let mut write1 = lock.write().unwrap();
43///     write1.store(2).unwrap();
44///     assert_eq!(*write1, 2);
45///     // the next line is not allowed, because it deadlocks.
46///     //let mut _ = lock.write().unwrap();
47/// } // write lock is dropped here
48///
49/// // look, you can read it again
50/// {
51///    let read1 = lock.read().unwrap();
52///    assert_eq!(*read1, 2);
53/// }
54/// ```
55/// ## Threaded usage
56/// ```
57/// use dtypes::redis::sync::RwLock;
58/// use dtypes::redis::types::Di32;
59/// use std::thread;
60///
61/// let client = redis::Client::open("redis://localhost:6379").unwrap();
62/// let i32 = Di32::with_value(1, "test_rwlock_example2", client.clone());
63/// let mut lock = RwLock::new(i32);
64/// // the reader lock is dropped immediately
65/// assert_eq!(*lock.read().unwrap(), 1);
66/// // Scoped threads are needed, otherwise the lifetime is unclear.
67/// thread::scope(|s| {
68///        s.spawn(|| {
69///            let mut write = lock.write().unwrap();
70///            write.store(2);
71///            assert_eq!(*write, 2);
72///        }).join().unwrap();
73/// });
74/// assert_eq!(*lock.read().unwrap(), 2);
75/// ```
76pub struct RwLock<T> {
77    pub(crate) data: Generic<T>,
78    pub(crate) conn: Option<redis::Connection>,
79}
80
81impl<T> RwLock<T>
82where
83    T: Serialize + DeserializeOwned,
84{
85    pub fn new(data: Generic<T>) -> Self {
86        Self { data, conn: None }
87    }
88
89    /// Creates a new RwLock Reader.
90    ///
91    /// This function blocks until the lock is acquired.
92    /// If there is a writer lock, this function blocks until the writer lock is dropped.
93    /// Also if there is a writer locks waiting to be acquired, this function blocks until the writer lock is acquired and dropped.
94    pub fn read(&self) -> Result<RwLockReadGuard<T>, LockError> {
95        let mut conn = self.client.clone().get_connection().unwrap();
96        let uuid = self.acquire_via_script(READER_LOCK, &mut conn);
97        Ok(RwLockReadGuard::new(self, uuid, conn))
98    }
99
100    /// Creates a new RwLock Writer.
101    ///
102    /// This function blocks until the lock is acquired.
103    /// If there is a reader lock, this function blocks until the reader lock is dropped.
104    /// The acquiring writer lock has priority over any waiting reader lock.
105    pub fn write(&mut self) -> Result<RwLockWriteGuard<T>, LockError> {
106        let mut conn = self.client.clone().get_connection().unwrap();
107        let uuid = self.acquire_via_script(WRITER_LOCK, &mut conn);
108        Ok(RwLockWriteGuard::new(self, uuid, conn))
109    }
110
111    fn acquire_via_script(&self, script: &str, conn: &mut Connection) -> usize {
112        let uuid = self.generate_uuid(conn);
113        let mut res = false;
114
115        while !res {
116            res = redis::Script::new(script)
117                .arg(&self.data.key)
118                .arg(uuid)
119                .arg(2)
120                .invoke(conn)
121                .unwrap();
122        }
123        uuid
124    }
125
126    pub(crate) fn generate_uuid(&self, conn: &mut Connection) -> usize {
127        redis::Script::new(UUID_SCRIPT)
128            .arg(&self.data.key)
129            .invoke(conn)
130            .unwrap()
131    }
132}
133
134impl<T> Deref for RwLock<T> {
135    type Target = Generic<T>;
136
137    fn deref(&self) -> &Self::Target {
138        &self.data
139    }
140}
141
142impl<T> DerefMut for RwLock<T> {
143    fn deref_mut(&mut self) -> &mut Self::Target {
144        &mut self.data
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::redis::types::*;
152    use std::mem::ManuallyDrop;
153
154    #[test]
155    fn test_rwlock() {
156        let client = redis::Client::open("redis://localhost:6379").unwrap();
157        let i32 = Di32::with_value(1, "test_rwlock", client.clone());
158        let mut lock = RwLock::new(i32);
159        {
160            // multiple reader locks can be held at once
161            let read = lock.read().unwrap();
162            assert_eq!(*read, 1);
163            let read2 = lock.read().unwrap();
164            assert_eq!(*read2, 1);
165        }
166        {
167            // only one writer lock can be held, however
168            let mut write = lock.write().unwrap();
169            write.store(2).unwrap();
170            assert_eq!(*write, 2);
171        }
172        // look, you can read it again
173        let read = lock.read().unwrap();
174        assert_eq!(*read, 2);
175    }
176
177    #[test]
178    fn test_rwlock_deadlock() {
179        let client = redis::Client::open("redis://localhost:6379").unwrap();
180        let i32 = Di32::with_value(1, "test_rwlock_deadlock", client.clone());
181        let mut lock = RwLock::new(i32);
182        {
183            let _ = ManuallyDrop::new(lock.read().unwrap());
184        }
185        // This should not deadlocked forever
186        {
187            let _ = lock.write().unwrap();
188        }
189
190        {
191            let _ = ManuallyDrop::new(lock.write().unwrap());
192        }
193        // This should not deadlocked forever
194        {
195            let _ = lock.read().unwrap();
196        }
197    }
198}