1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
use super::RwLockReadGuard;
use super::RwLockWriteGuard;
use crate::redis::rwlock::constants::{READER_LOCK, UUID_SCRIPT, WRITER_LOCK};
use crate::redis::{Generic, LockError};
use redis::Connection;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::ops::{Deref, DerefMut};

/// A Read-Write Lock.
///
/// This lock is similar to the [std::sync::RwLock](https://doc.rust-lang.org/std/sync/struct.RwLock.html).
/// But it is distributed over multiple instances of the same service.
///
/// # Threads
///
/// If you try to get a writer lock in a thread, which already has a reader lock, you will end up in a deadlock.
/// To use the RwLock in threads, you need a scoped thread.
///  
/// # Examples
///
/// ## Linear usage
/// ```
/// use dtypes::redis::RwLock;
/// use dtypes::redis::Di32;
/// use std::thread;
///
/// let client = redis::Client::open("redis://localhost:6379").unwrap();
/// let client2 = client.clone();
/// let mut i32 = Di32::with_value(1, "test_rwlock_example1", client.clone());
/// let mut lock = RwLock::new(i32);
///
/// // many reader locks can be held at once
/// {
///     let read1 = lock.read().unwrap();
///     let read2 = lock.read().unwrap();
///     assert_eq!(*read1, 1);
/// } // read locks are dropped at this point
///
/// // only one writer lock can be held, however
/// {
///     let mut write1 = lock.write().unwrap();
///     write1.store(2).unwrap();
///     assert_eq!(*write1, 2);
///     // the next line is not allowed, because it deadlocks.
///     //let mut _ = lock.write().unwrap();
/// } // write lock is dropped here
///
/// // look, you can read it again
/// {
///    let read1 = lock.read().unwrap();
///    assert_eq!(*read1, 2);
/// }
/// ```
/// ## Threaded usage
/// ```
/// use dtypes::redis::RwLock;
/// use dtypes::redis::Di32;
/// use std::thread;
///
/// let client = redis::Client::open("redis://localhost:6379").unwrap();
/// let i32 = Di32::with_value(1, "test_rwlock_example2", client.clone());
/// let mut lock = RwLock::new(i32);
/// // the reader lock is dropped immediately
/// assert_eq!(*lock.read().unwrap(), 1);
/// // Scoped threads are needed, otherwise the lifetime is unclear.
/// thread::scope(|s| {
///        s.spawn(|| {
///            let mut write = lock.write().unwrap();
///            write.store(2);
///            assert_eq!(*write, 2);
///        }).join().unwrap();
/// });
/// assert_eq!(*lock.read().unwrap(), 2);
/// ```
pub struct RwLock<T> {
    pub(crate) data: Generic<T>,
    pub(crate) conn: Option<redis::Connection>,
}

impl<T> RwLock<T>
where
    T: Serialize + DeserializeOwned,
{
    pub fn new(data: Generic<T>) -> Self {
        Self { data, conn: None }
    }

    /// Creates a new RwLock Reader.
    ///
    /// This function blocks until the lock is acquired.
    /// If there is a writer lock, this function blocks until the writer lock is dropped.
    /// Also if there is a writer locks waiting to be acquired, this function blocks until the writer lock is acquired and dropped.
    pub fn read(&self) -> Result<RwLockReadGuard<T>, LockError> {
        let mut conn = self.client.clone().get_connection().unwrap();
        let uuid = self.acquire_via_script(READER_LOCK, &mut conn);
        Ok(RwLockReadGuard::new(self, uuid, conn))
    }

    /// Creates a new RwLock Writer.
    ///
    /// This function blocks until the lock is acquired.
    /// If there is a reader lock, this function blocks until the reader lock is dropped.
    /// The acquiring writer lock has priority over any waiting reader lock.
    pub fn write(&mut self) -> Result<RwLockWriteGuard<T>, LockError> {
        let mut conn = self.client.clone().get_connection().unwrap();
        let uuid = self.acquire_via_script(WRITER_LOCK, &mut conn);
        Ok(RwLockWriteGuard::new(self, uuid, conn))
    }

    fn acquire_via_script(&self, script: &str, conn: &mut Connection) -> usize {
        let uuid = self.generate_uuid(conn);
        let mut res = false;

        while !res {
            res = redis::Script::new(script)
                .arg(&self.data.key)
                .arg(uuid)
                .arg(2)
                .invoke(conn)
                .unwrap();
        }
        uuid
    }

    pub(crate) fn generate_uuid(&self, conn: &mut Connection) -> usize {
        redis::Script::new(UUID_SCRIPT)
            .arg(&self.data.key)
            .invoke(conn)
            .unwrap()
    }
}

impl<T> Deref for RwLock<T> {
    type Target = Generic<T>;

    fn deref(&self) -> &Self::Target {
        &self.data
    }
}

impl<T> DerefMut for RwLock<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.data
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::redis::*;
    use std::mem::ManuallyDrop;

    #[test]
    fn test_rwlock() {
        let client = redis::Client::open("redis://localhost:6379").unwrap();
        let i32 = Di32::with_value(1, "test_rwlock", client.clone());
        let mut lock = RwLock::new(i32);
        {
            // multiple reader locks can be held at once
            let read = lock.read().unwrap();
            assert_eq!(*read, 1);
            let read2 = lock.read().unwrap();
            assert_eq!(*read2, 1);
        }
        {
            // only one writer lock can be held, however
            let mut write = lock.write().unwrap();
            write.store(2).unwrap();
            assert_eq!(*write, 2);
        }
        // look, you can read it again
        let read = lock.read().unwrap();
        assert_eq!(*read, 2);
    }

    #[test]
    fn test_rwlock_deadlock() {
        let client = redis::Client::open("redis://localhost:6379").unwrap();
        let i32 = Di32::with_value(1, "test_rwlock_deadlock", client.clone());
        let mut lock = RwLock::new(i32);
        {
            let _ = ManuallyDrop::new(lock.read().unwrap());
        }
        eprintln!("1");
        // This should not deadlocked forever
        {
            let _ = lock.write().unwrap();
        }

        {
            let _ = ManuallyDrop::new(lock.write().unwrap());
        }
        // This should not deadlocked forever
        {
            let _ = lock.read().unwrap();
        }
    }
}