dtypes/redis/
clock.rs

1use crate::redis::types::Generic;
2use serde_json::from_str;
3use std::ops::{Deref, DerefMut};
4use thiserror::Error;
5
6#[derive(Debug, Error)]
7pub enum ClockOrderedError {
8    #[error("Ordering number is not greater than current number stored in redis.")]
9    OrderError,
10}
11
12/// This is the set_load script.
13/// It is used to set the value if order is greater than the current order.
14/// Returns the current value and the current_ordering number.
15///
16/// It takes 3 arguments:
17/// 1. The key of value to set
18/// 2. The order_number of the setting operation
19/// 3. The value itself to set
20const SET_LOAD_SCRIPT: &str = r#"
21local key = ARGV[1]
22local order = ARGV[2]
23local current_order = redis.call("GET", key .. ":order")
24if current_order == false or current_order < order then
25    redis.call("SET", key .. ":order", order)
26    redis.call("SET", key, ARGV[3])
27    current_order = order
28end
29return {redis.call("GET", key), current_order}
30"#;
31
32/// This is the load script.
33/// It is used to load the value and the order number of the value.
34/// Returns the current value and the current ordering number.
35///
36/// It takes 1 argument:
37/// 1. The key of value to load
38const LOAD_SCRIPT: &str = r#"
39local key = ARGV[1]
40return {redis.call("GET", key), redis.call("GET", key .. ":order")}
41"#;
42
43/// The ClockOrdered type.
44///
45/// It is used to store a value in redis and load it in sync.
46/// It tracks automatically an ordering number to ensure that the value is only stored if the order is greater than the current order, mostly from other instances.
47/// The value is only stored if the order is greater than the current order.
48///
49/// This helps to synchronize the value between multiple instances without any locking mechanism.
50/// But can results in more network traffic in benefit of less wait time because of locks.
51/// Mostly used in situations, where your value changes rarely but read often.
52/// Another use case is, when it is okay for you, that the value could be not the latest or
53/// computing a derived value multiple times is acceptable.
54#[derive(Debug)]
55pub struct ClockOrdered<T> {
56    data: Generic<T>,
57    counter: usize,
58}
59
60impl<T> ClockOrdered<T>
61where
62    T: serde::Serialize + serde::de::DeserializeOwned,
63{
64    /// Creates a new ClockOrdered.
65    /// The value is loaded from redis directly.
66    pub fn new(data: Generic<T>) -> Self {
67        let mut s = Self { data, counter: 0 };
68        s.load();
69        s
70    }
71
72    /// Stores the value in the redis server.
73    /// The value is only stored if the ordering_number is greater than the current number.
74    /// The order is incremented by one before each store.
75    ///
76    /// # Example
77    /// ```
78    /// use dtypes::redis::types::Generic;
79    /// use dtypes::redis::sync::ClockOrdered;
80    ///
81    /// let client = redis::Client::open("redis://localhost:6379").unwrap();
82    /// let mut i32 = Generic::with_value(1, "test_add_clock_ordered_example1", client.clone());
83    /// let mut clock_ordered = ClockOrdered::new(i32);
84    /// clock_ordered.store(2).unwrap();
85    /// assert_eq!(*clock_ordered, 2);
86    /// ```
87    ///
88    /// The store can fail if the order is not greater than the current order.
89    /// This happens, if the value was set from another instance before.
90    ///
91    /// # Example
92    /// ```
93    /// use std::thread;
94    /// use dtypes::redis::types::Generic;
95    /// use dtypes::redis::sync::ClockOrdered;
96    ///
97    /// let client = redis::Client::open("redis://localhost:6379").unwrap();
98    /// let client2 = client.clone();
99    ///
100    /// thread::scope(|s| {
101    ///     let t1 = s.spawn(|| {
102    ///         let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client2);
103    ///         let mut clock_ordered = ClockOrdered::new(i32);
104    ///         while let Err(_) = clock_ordered.store(2) {}
105    ///         assert_eq!(*clock_ordered, 2);
106    ///     });
107    ///     let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client);
108    ///     let mut clock_ordered = ClockOrdered::new(i32);
109    ///     while let Err(_) = clock_ordered.store(3) {}
110    ///     assert_eq!(*clock_ordered, 3);
111    ///     t1.join().unwrap();
112    /// });
113    /// ```
114    pub fn store(&mut self, val: T) -> Result<(), ClockOrderedError> {
115        self.counter += 1;
116        let val_json = serde_json::to_string(&val).unwrap();
117        let (v, order) = self.store_redis(&val_json);
118
119        if let Some(v) = v {
120            if self.counter >= order && v == val_json {
121                self.data.cache = Some(val);
122                return Ok(());
123            }
124        }
125        Err(ClockOrderedError::OrderError)
126    }
127
128    /// Stores the value in the redis server and blocks until succeeds.
129    /// Everything else is equal to [ClockOrdered::store].
130    ///
131    /// # Example
132    /// ```
133    /// use std::thread;
134    /// use dtypes::redis::types::Generic;
135    /// use dtypes::redis::sync::ClockOrdered;
136    ///
137    /// let client = redis::Client::open("redis://localhost:6379").unwrap();
138    /// let client2 = client.clone();
139    ///
140    /// thread::scope(|s| {
141    ///     let t1 = s.spawn(|| {
142    ///         let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client2);
143    ///         let mut clock_ordered = ClockOrdered::new(i32);
144    ///         clock_ordered.store_blocking(2).unwrap();
145    ///         assert_eq!(*clock_ordered, 2);
146    ///     });
147    ///     let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client);
148    ///     let mut clock_ordered = ClockOrdered::new(i32);
149    ///     clock_ordered.store_blocking(3).unwrap();
150    ///     assert_eq!(*clock_ordered, 3);
151    ///     t1.join().unwrap();
152    /// });
153    /// ```
154    pub fn store_blocking(&mut self, val: T) -> Result<(), ClockOrderedError> {
155        let val_json = serde_json::to_string(&val).unwrap();
156        let mut res = self.store_redis(&val_json);
157
158        while self.counter < res.1 || res.0.is_none() || res.0.unwrap() != val_json {
159            self.counter = res.1 + 1;
160            res = self.store_redis(&val_json);
161        }
162
163        self.data.cache = Some(val);
164        Ok(())
165    }
166
167    fn store_redis(&self, val: &str) -> (Option<String>, usize) {
168        let mut conn = self.data.client.get_connection().unwrap();
169        redis::Script::new(SET_LOAD_SCRIPT)
170            .arg(&self.data.key)
171            .arg(self.counter)
172            .arg(val)
173            .invoke(&mut conn)
174            .expect("Could not execute script")
175    }
176
177    /// Loads the value from the redis server.
178    /// This is done automatically on creation.
179    /// Mostly used for synchronization. Reset the counter to order from redis or 0.
180    pub fn load(&mut self) {
181        let mut conn = self.data.client.get_connection().unwrap();
182        let res: (Option<String>, Option<usize>) = redis::Script::new(LOAD_SCRIPT)
183            .arg(&self.data.key)
184            .invoke(&mut conn)
185            .expect("Could not execute script");
186
187        match res {
188            (Some(v), Some(order)) => {
189                self.data.cache = Some(from_str(&v).unwrap());
190                self.counter = order;
191            }
192            (Some(v), None) => {
193                self.data.cache = Some(from_str(&v).unwrap());
194                self.counter = 0;
195            }
196            (None, Some(c)) => {
197                self.data.cache = None;
198                self.counter = c;
199            }
200            _ => {
201                self.data.cache = None;
202                self.counter = 0;
203            }
204        }
205    }
206}
207
208impl<T> Deref for ClockOrdered<T> {
209    type Target = Generic<T>;
210
211    fn deref(&self) -> &Self::Target {
212        &self.data
213    }
214}
215
216impl<T> DerefMut for ClockOrdered<T> {
217    fn deref_mut(&mut self) -> &mut Self::Target {
218        &mut self.data
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    #[test]
225    fn test_set_load() {
226        use crate::redis::sync::ClockOrdered;
227        use crate::redis::types::Generic;
228
229        let client = redis::Client::open("redis://localhost:6379").unwrap();
230        let i32: Generic<i32> = Generic::new("test_add_clock_ordered", client.clone());
231        let mut clock_ordered = ClockOrdered::new(i32);
232        clock_ordered.store(2).unwrap();
233        assert_eq!(*clock_ordered, 2);
234    }
235}