dtypes/redis/clock.rs
1use crate::redis::types::Generic;
2use serde_json::from_str;
3use std::ops::{Deref, DerefMut};
4use thiserror::Error;
5
6#[derive(Debug, Error)]
7pub enum ClockOrderedError {
8 #[error("Ordering number is not greater than current number stored in redis.")]
9 OrderError,
10}
11
12/// This is the set_load script.
13/// It is used to set the value if order is greater than the current order.
14/// Returns the current value and the current_ordering number.
15///
16/// It takes 3 arguments:
17/// 1. The key of value to set
18/// 2. The order_number of the setting operation
19/// 3. The value itself to set
20const SET_LOAD_SCRIPT: &str = r#"
21local key = ARGV[1]
22local order = ARGV[2]
23local current_order = redis.call("GET", key .. ":order")
24if current_order == false or current_order < order then
25 redis.call("SET", key .. ":order", order)
26 redis.call("SET", key, ARGV[3])
27 current_order = order
28end
29return {redis.call("GET", key), current_order}
30"#;
31
32/// This is the load script.
33/// It is used to load the value and the order number of the value.
34/// Returns the current value and the current ordering number.
35///
36/// It takes 1 argument:
37/// 1. The key of value to load
38const LOAD_SCRIPT: &str = r#"
39local key = ARGV[1]
40return {redis.call("GET", key), redis.call("GET", key .. ":order")}
41"#;
42
43/// The ClockOrdered type.
44///
45/// It is used to store a value in redis and load it in sync.
46/// It tracks automatically an ordering number to ensure that the value is only stored if the order is greater than the current order, mostly from other instances.
47/// The value is only stored if the order is greater than the current order.
48///
49/// This helps to synchronize the value between multiple instances without any locking mechanism.
50/// But can results in more network traffic in benefit of less wait time because of locks.
51/// Mostly used in situations, where your value changes rarely but read often.
52/// Another use case is, when it is okay for you, that the value could be not the latest or
53/// computing a derived value multiple times is acceptable.
54#[derive(Debug)]
55pub struct ClockOrdered<T> {
56 data: Generic<T>,
57 counter: usize,
58}
59
60impl<T> ClockOrdered<T>
61where
62 T: serde::Serialize + serde::de::DeserializeOwned,
63{
64 /// Creates a new ClockOrdered.
65 /// The value is loaded from redis directly.
66 pub fn new(data: Generic<T>) -> Self {
67 let mut s = Self { data, counter: 0 };
68 s.load();
69 s
70 }
71
72 /// Stores the value in the redis server.
73 /// The value is only stored if the ordering_number is greater than the current number.
74 /// The order is incremented by one before each store.
75 ///
76 /// # Example
77 /// ```
78 /// use dtypes::redis::types::Generic;
79 /// use dtypes::redis::sync::ClockOrdered;
80 ///
81 /// let client = redis::Client::open("redis://localhost:6379").unwrap();
82 /// let mut i32 = Generic::with_value(1, "test_add_clock_ordered_example1", client.clone());
83 /// let mut clock_ordered = ClockOrdered::new(i32);
84 /// clock_ordered.store(2).unwrap();
85 /// assert_eq!(*clock_ordered, 2);
86 /// ```
87 ///
88 /// The store can fail if the order is not greater than the current order.
89 /// This happens, if the value was set from another instance before.
90 ///
91 /// # Example
92 /// ```
93 /// use std::thread;
94 /// use dtypes::redis::types::Generic;
95 /// use dtypes::redis::sync::ClockOrdered;
96 ///
97 /// let client = redis::Client::open("redis://localhost:6379").unwrap();
98 /// let client2 = client.clone();
99 ///
100 /// thread::scope(|s| {
101 /// let t1 = s.spawn(|| {
102 /// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client2);
103 /// let mut clock_ordered = ClockOrdered::new(i32);
104 /// while let Err(_) = clock_ordered.store(2) {}
105 /// assert_eq!(*clock_ordered, 2);
106 /// });
107 /// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client);
108 /// let mut clock_ordered = ClockOrdered::new(i32);
109 /// while let Err(_) = clock_ordered.store(3) {}
110 /// assert_eq!(*clock_ordered, 3);
111 /// t1.join().unwrap();
112 /// });
113 /// ```
114 pub fn store(&mut self, val: T) -> Result<(), ClockOrderedError> {
115 self.counter += 1;
116 let val_json = serde_json::to_string(&val).unwrap();
117 let (v, order) = self.store_redis(&val_json);
118
119 if let Some(v) = v {
120 if self.counter >= order && v == val_json {
121 self.data.cache = Some(val);
122 return Ok(());
123 }
124 }
125 Err(ClockOrderedError::OrderError)
126 }
127
128 /// Stores the value in the redis server and blocks until succeeds.
129 /// Everything else is equal to [ClockOrdered::store].
130 ///
131 /// # Example
132 /// ```
133 /// use std::thread;
134 /// use dtypes::redis::types::Generic;
135 /// use dtypes::redis::sync::ClockOrdered;
136 ///
137 /// let client = redis::Client::open("redis://localhost:6379").unwrap();
138 /// let client2 = client.clone();
139 ///
140 /// thread::scope(|s| {
141 /// let t1 = s.spawn(|| {
142 /// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client2);
143 /// let mut clock_ordered = ClockOrdered::new(i32);
144 /// clock_ordered.store_blocking(2).unwrap();
145 /// assert_eq!(*clock_ordered, 2);
146 /// });
147 /// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client);
148 /// let mut clock_ordered = ClockOrdered::new(i32);
149 /// clock_ordered.store_blocking(3).unwrap();
150 /// assert_eq!(*clock_ordered, 3);
151 /// t1.join().unwrap();
152 /// });
153 /// ```
154 pub fn store_blocking(&mut self, val: T) -> Result<(), ClockOrderedError> {
155 let val_json = serde_json::to_string(&val).unwrap();
156 let mut res = self.store_redis(&val_json);
157
158 while self.counter < res.1 || res.0.is_none() || res.0.unwrap() != val_json {
159 self.counter = res.1 + 1;
160 res = self.store_redis(&val_json);
161 }
162
163 self.data.cache = Some(val);
164 Ok(())
165 }
166
167 fn store_redis(&self, val: &str) -> (Option<String>, usize) {
168 let mut conn = self.data.client.get_connection().unwrap();
169 redis::Script::new(SET_LOAD_SCRIPT)
170 .arg(&self.data.key)
171 .arg(self.counter)
172 .arg(val)
173 .invoke(&mut conn)
174 .expect("Could not execute script")
175 }
176
177 /// Loads the value from the redis server.
178 /// This is done automatically on creation.
179 /// Mostly used for synchronization. Reset the counter to order from redis or 0.
180 pub fn load(&mut self) {
181 let mut conn = self.data.client.get_connection().unwrap();
182 let res: (Option<String>, Option<usize>) = redis::Script::new(LOAD_SCRIPT)
183 .arg(&self.data.key)
184 .invoke(&mut conn)
185 .expect("Could not execute script");
186
187 match res {
188 (Some(v), Some(order)) => {
189 self.data.cache = Some(from_str(&v).unwrap());
190 self.counter = order;
191 }
192 (Some(v), None) => {
193 self.data.cache = Some(from_str(&v).unwrap());
194 self.counter = 0;
195 }
196 (None, Some(c)) => {
197 self.data.cache = None;
198 self.counter = c;
199 }
200 _ => {
201 self.data.cache = None;
202 self.counter = 0;
203 }
204 }
205 }
206}
207
208impl<T> Deref for ClockOrdered<T> {
209 type Target = Generic<T>;
210
211 fn deref(&self) -> &Self::Target {
212 &self.data
213 }
214}
215
216impl<T> DerefMut for ClockOrdered<T> {
217 fn deref_mut(&mut self) -> &mut Self::Target {
218 &mut self.data
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 #[test]
225 fn test_set_load() {
226 use crate::redis::sync::ClockOrdered;
227 use crate::redis::types::Generic;
228
229 let client = redis::Client::open("redis://localhost:6379").unwrap();
230 let i32: Generic<i32> = Generic::new("test_add_clock_ordered", client.clone());
231 let mut clock_ordered = ClockOrdered::new(i32);
232 clock_ordered.store(2).unwrap();
233 assert_eq!(*clock_ordered, 2);
234 }
235}