1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
use crate::redis::Generic;
use serde_json::from_str;
use std::ops::{Deref, DerefMut};
use thiserror::Error;

#[derive(Debug, Error)]
pub enum ClockOrderedError {
    #[error("Ordering number is not greater than current number stored in redis.")]
    OrderError,
}

/// This is the set_load script.
/// It is used to set the value if order is greater than the current order.
/// Returns the current value and the current_ordering number.
///
/// It takes 3 arguments:
/// 1. The key of value to set
/// 2. The order_number of the setting operation
/// 3. The value itself to set
const SET_LOAD_SCRIPT: &str = r#"
local key = ARGV[1]
local order = ARGV[2]
local current_order = redis.call("GET", key .. ":order")
if current_order == false or current_order < order then
    redis.call("SET", key .. ":order", order)
    redis.call("SET", key, ARGV[3])
    current_order = order
end
return {redis.call("GET", key), current_order}
"#;

/// This is the load script.
/// It is used to load the value and the order number of the value.
/// Returns the current value and the current ordering number.
///
/// It takes 1 argument:
/// 1. The key of value to load
const LOAD_SCRIPT: &str = r#"
local key = ARGV[1]
return {redis.call("GET", key), redis.call("GET", key .. ":order")}
"#;

/// The ClockOrdered type.
///
/// It is used to store a value in redis and load it in sync.
/// It tracks automatically an ordering number to ensure that the value is only stored if the order is greater than the current order, mostly from other instances.
/// The value is only stored if the order is greater than the current order.
///
/// This helps to synchronize the value between multiple instances without any locking mechanism.
/// But can results in more network traffic in benefit of less wait time because of locks.
/// Mostly used in situations, where your value changes rarely but read often.
/// Another use case is, when it is okay for you, that the value could be not the latest or
/// computing a derived value multiple times is acceptable.
#[derive(Debug)]
pub struct ClockOrdered<T> {
    data: Generic<T>,
    counter: usize,
}

impl<T> ClockOrdered<T>
where
    T: serde::Serialize + serde::de::DeserializeOwned,
{
    /// Creates a new ClockOrdered.
    /// The value is loaded from redis directly.
    pub fn new(data: Generic<T>) -> Self {
        let mut s = Self { data, counter: 0 };
        s.load();
        s
    }

    /// Stores the value in the redis server.
    /// The value is only stored if the ordering_number is greater than the current number.
    /// The order is incremented by one before each store.
    ///
    /// # Example
    /// ```
    /// use dtypes::redis::Generic;
    /// use dtypes::redis::ClockOrdered;
    ///
    /// let client = redis::Client::open("redis://localhost:6379").unwrap();
    /// let mut i32 = Generic::with_value(1, "test_add_clock_ordered_example1", client.clone());
    /// let mut clock_ordered = ClockOrdered::new(i32);
    /// clock_ordered.store(2).unwrap();
    /// assert_eq!(*clock_ordered, 2);
    /// ```
    ///
    /// The store can fail if the order is not greater than the current order.
    /// This happens, if the value was set from another instance before.
    ///
    /// # Example
    /// ```
    /// use std::thread;
    /// use dtypes::redis::Generic;
    /// use dtypes::redis::ClockOrdered;
    ///
    /// let client = redis::Client::open("redis://localhost:6379").unwrap();
    /// let client2 = client.clone();
    ///
    /// thread::scope(|s| {
    ///     let t1 = s.spawn(|| {
    ///         let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client2);
    ///         let mut clock_ordered = ClockOrdered::new(i32);
    ///         while let Err(_) = clock_ordered.store(2) {}
    ///         assert_eq!(*clock_ordered, 2);
    ///     });
    ///     let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client);
    ///     let mut clock_ordered = ClockOrdered::new(i32);
    ///     while let Err(_) = clock_ordered.store(3) {}
    ///     assert_eq!(*clock_ordered, 3);
    ///     t1.join().unwrap();
    /// });
    /// ```
    pub fn store(&mut self, val: T) -> Result<(), ClockOrderedError> {
        self.counter += 1;
        let val_json = serde_json::to_string(&val).unwrap();
        let (v, order) = self.store_redis(&val_json);

        if let Some(v) = v {
            if self.counter >= order && v == val_json {
                self.data.cache = Some(val);
                return Ok(());
            }
        }
        Err(ClockOrderedError::OrderError)
    }

    /// Stores the value in the redis server and blocks until succeeds.
    /// Everything else is equal to [ClockOrdered::store].
    ///
    /// # Example
    /// ```
    /// use std::thread;
    /// use dtypes::redis::Generic;
    /// use dtypes::redis::ClockOrdered;
    ///
    /// let client = redis::Client::open("redis://localhost:6379").unwrap();
    /// let client2 = client.clone();
    ///
    /// thread::scope(|s| {
    ///     let t1 = s.spawn(|| {
    ///         let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client2);
    ///         let mut clock_ordered = ClockOrdered::new(i32);
    ///         clock_ordered.store_blocking(2).unwrap();
    ///         assert_eq!(*clock_ordered, 2);
    ///     });
    ///     let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client);
    ///     let mut clock_ordered = ClockOrdered::new(i32);
    ///     clock_ordered.store_blocking(3).unwrap();
    ///     assert_eq!(*clock_ordered, 3);
    ///     t1.join().unwrap();
    /// });
    /// ```
    pub fn store_blocking(&mut self, val: T) -> Result<(), ClockOrderedError> {
        let val_json = serde_json::to_string(&val).unwrap();
        let mut res = self.store_redis(&val_json);

        while self.counter < res.1 || res.0.is_none() || res.0.unwrap() != val_json {
            self.counter = res.1 + 1;
            res = self.store_redis(&val_json);
        }

        self.data.cache = Some(val);
        Ok(())
    }

    fn store_redis(&self, val: &str) -> (Option<String>, usize) {
        let mut conn = self.data.client.get_connection().unwrap();
        redis::Script::new(SET_LOAD_SCRIPT)
            .arg(&self.data.key)
            .arg(self.counter)
            .arg(val)
            .invoke(&mut conn)
            .expect("Could not execute script")
    }

    /// Loads the value from the redis server.
    /// This is done automatically on creation.
    /// Mostly used for synchronization. Reset the counter to order from redis or 0.
    pub fn load(&mut self) {
        let mut conn = self.data.client.get_connection().unwrap();
        let res: (Option<String>, Option<usize>) = redis::Script::new(LOAD_SCRIPT)
            .arg(&self.data.key)
            .invoke(&mut conn)
            .expect("Could not execute script");

        match res {
            (Some(v), Some(order)) => {
                self.data.cache = Some(from_str(&v).unwrap());
                self.counter = order;
            }
            (Some(v), None) => {
                self.data.cache = Some(from_str(&v).unwrap());
                self.counter = 0;
            }
            (None, Some(c)) => {
                self.data.cache = None;
                self.counter = c;
            }
            _ => {
                self.data.cache = None;
                self.counter = 0;
            }
        }
    }
}

impl<T> Deref for ClockOrdered<T> {
    type Target = Generic<T>;

    fn deref(&self) -> &Self::Target {
        &self.data
    }
}

impl<T> DerefMut for ClockOrdered<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.data
    }
}

#[cfg(test)]
mod tests {
    #[test]
    fn test_set_load() {
        use crate::redis::ClockOrdered;
        use crate::redis::Generic;

        let client = redis::Client::open("redis://localhost:6379").unwrap();
        let i32: Generic<i32> = Generic::new("test_add_clock_ordered", client.clone());
        let mut clock_ordered = ClockOrdered::new(i32);
        clock_ordered.store(2).unwrap();
        assert_eq!(*clock_ordered, 2);
    }
}