dtypes/redis/rwlock/lock.rs
1use super::RwLockReadGuard;
2use super::RwLockWriteGuard;
3use crate::redis::rwlock::constants::{READER_LOCK, UUID_SCRIPT, WRITER_LOCK};
4use crate::redis::{sync::LockError, types::Generic};
5use redis::Connection;
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use std::ops::{Deref, DerefMut};
9
10/// A Read-Write Lock.
11///
12/// This lock is similar to the [std::sync::RwLock](https://doc.rust-lang.org/std/sync/struct.RwLock.html).
13/// But it is distributed over multiple instances of the same service.
14///
15/// # Threads
16///
17/// If you try to get a writer lock in a thread, which already has a reader lock, you will end up in a deadlock.
18/// To use the RwLock in threads, you need a scoped thread.
19///
20/// # Examples
21///
22/// ## Linear usage
23/// ```
24/// use dtypes::redis::sync::RwLock;
25/// use dtypes::redis::types::Di32;
26/// use std::thread;
27///
28/// let client = redis::Client::open("redis://localhost:6379").unwrap();
29/// let client2 = client.clone();
30/// let mut i32 = Di32::with_value(1, "test_rwlock_example1", client.clone());
31/// let mut lock = RwLock::new(i32);
32///
33/// // many reader locks can be held at once
34/// {
35/// let read1 = lock.read().unwrap();
36/// let read2 = lock.read().unwrap();
37/// assert_eq!(*read1, 1);
38/// } // read locks are dropped at this point
39///
40/// // only one writer lock can be held, however
41/// {
42/// let mut write1 = lock.write().unwrap();
43/// write1.store(2).unwrap();
44/// assert_eq!(*write1, 2);
45/// // the next line is not allowed, because it deadlocks.
46/// //let mut _ = lock.write().unwrap();
47/// } // write lock is dropped here
48///
49/// // look, you can read it again
50/// {
51/// let read1 = lock.read().unwrap();
52/// assert_eq!(*read1, 2);
53/// }
54/// ```
55/// ## Threaded usage
56/// ```
57/// use dtypes::redis::sync::RwLock;
58/// use dtypes::redis::types::Di32;
59/// use std::thread;
60///
61/// let client = redis::Client::open("redis://localhost:6379").unwrap();
62/// let i32 = Di32::with_value(1, "test_rwlock_example2", client.clone());
63/// let mut lock = RwLock::new(i32);
64/// // the reader lock is dropped immediately
65/// assert_eq!(*lock.read().unwrap(), 1);
66/// // Scoped threads are needed, otherwise the lifetime is unclear.
67/// thread::scope(|s| {
68/// s.spawn(|| {
69/// let mut write = lock.write().unwrap();
70/// write.store(2);
71/// assert_eq!(*write, 2);
72/// }).join().unwrap();
73/// });
74/// assert_eq!(*lock.read().unwrap(), 2);
75/// ```
76pub struct RwLock<T> {
77 pub(crate) data: Generic<T>,
78 pub(crate) conn: Option<redis::Connection>,
79}
80
81impl<T> RwLock<T>
82where
83 T: Serialize + DeserializeOwned,
84{
85 pub fn new(data: Generic<T>) -> Self {
86 Self { data, conn: None }
87 }
88
89 /// Creates a new RwLock Reader.
90 ///
91 /// This function blocks until the lock is acquired.
92 /// If there is a writer lock, this function blocks until the writer lock is dropped.
93 /// Also if there is a writer locks waiting to be acquired, this function blocks until the writer lock is acquired and dropped.
94 pub fn read(&self) -> Result<RwLockReadGuard<T>, LockError> {
95 let mut conn = self.client.clone().get_connection().unwrap();
96 let uuid = self.acquire_via_script(READER_LOCK, &mut conn);
97 Ok(RwLockReadGuard::new(self, uuid, conn))
98 }
99
100 /// Creates a new RwLock Writer.
101 ///
102 /// This function blocks until the lock is acquired.
103 /// If there is a reader lock, this function blocks until the reader lock is dropped.
104 /// The acquiring writer lock has priority over any waiting reader lock.
105 pub fn write(&mut self) -> Result<RwLockWriteGuard<T>, LockError> {
106 let mut conn = self.client.clone().get_connection().unwrap();
107 let uuid = self.acquire_via_script(WRITER_LOCK, &mut conn);
108 Ok(RwLockWriteGuard::new(self, uuid, conn))
109 }
110
111 fn acquire_via_script(&self, script: &str, conn: &mut Connection) -> usize {
112 let uuid = self.generate_uuid(conn);
113 let mut res = false;
114
115 while !res {
116 res = redis::Script::new(script)
117 .arg(&self.data.key)
118 .arg(uuid)
119 .arg(2)
120 .invoke(conn)
121 .unwrap();
122 }
123 uuid
124 }
125
126 pub(crate) fn generate_uuid(&self, conn: &mut Connection) -> usize {
127 redis::Script::new(UUID_SCRIPT)
128 .arg(&self.data.key)
129 .invoke(conn)
130 .unwrap()
131 }
132}
133
134impl<T> Deref for RwLock<T> {
135 type Target = Generic<T>;
136
137 fn deref(&self) -> &Self::Target {
138 &self.data
139 }
140}
141
142impl<T> DerefMut for RwLock<T> {
143 fn deref_mut(&mut self) -> &mut Self::Target {
144 &mut self.data
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::redis::types::*;
152 use std::mem::ManuallyDrop;
153
154 #[test]
155 fn test_rwlock() {
156 let client = redis::Client::open("redis://localhost:6379").unwrap();
157 let i32 = Di32::with_value(1, "test_rwlock", client.clone());
158 let mut lock = RwLock::new(i32);
159 {
160 // multiple reader locks can be held at once
161 let read = lock.read().unwrap();
162 assert_eq!(*read, 1);
163 let read2 = lock.read().unwrap();
164 assert_eq!(*read2, 1);
165 }
166 {
167 // only one writer lock can be held, however
168 let mut write = lock.write().unwrap();
169 write.store(2).unwrap();
170 assert_eq!(*write, 2);
171 }
172 // look, you can read it again
173 let read = lock.read().unwrap();
174 assert_eq!(*read, 2);
175 }
176
177 #[test]
178 fn test_rwlock_deadlock() {
179 let client = redis::Client::open("redis://localhost:6379").unwrap();
180 let i32 = Di32::with_value(1, "test_rwlock_deadlock", client.clone());
181 let mut lock = RwLock::new(i32);
182 {
183 let _ = ManuallyDrop::new(lock.read().unwrap());
184 }
185 // This should not deadlocked forever
186 {
187 let _ = lock.write().unwrap();
188 }
189
190 {
191 let _ = ManuallyDrop::new(lock.write().unwrap());
192 }
193 // This should not deadlocked forever
194 {
195 let _ = lock.read().unwrap();
196 }
197 }
198}