1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
use crate::redis::Generic;
use serde_json::from_str;
use std::ops::{Deref, DerefMut};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ClockOrderedError {
#[error("Ordering number is not greater than current number stored in redis.")]
OrderError,
}
/// This is the set_load script.
/// It is used to set the value if order is greater than the current order.
/// Returns the current value and the current_ordering number.
///
/// It takes 3 arguments:
/// 1. The key of value to set
/// 2. The order_number of the setting operation
/// 3. The value itself to set
const SET_LOAD_SCRIPT: &str = r#"
local key = ARGV[1]
local order = ARGV[2]
local current_order = redis.call("GET", key .. ":order")
if current_order == false or current_order < order then
redis.call("SET", key .. ":order", order)
redis.call("SET", key, ARGV[3])
current_order = order
end
return {redis.call("GET", key), current_order}
"#;
/// This is the load script.
/// It is used to load the value and the order number of the value.
/// Returns the current value and the current ordering number.
///
/// It takes 1 argument:
/// 1. The key of value to load
const LOAD_SCRIPT: &str = r#"
local key = ARGV[1]
return {redis.call("GET", key), redis.call("GET", key .. ":order")}
"#;
/// The ClockOrdered type.
///
/// It is used to store a value in redis and load it in sync.
/// It tracks automatically an ordering number to ensure that the value is only stored if the order is greater than the current order, mostly from other instances.
/// The value is only stored if the order is greater than the current order.
///
/// This helps to synchronize the value between multiple instances without any locking mechanism.
/// But can results in more network traffic in benefit of less wait time because of locks.
/// Mostly used in situations, where your value changes rarely but read often.
/// Another use case is, when it is okay for you, that the value could be not the latest or
/// computing a derived value multiple times is acceptable.
#[derive(Debug)]
pub struct ClockOrdered<T> {
data: Generic<T>,
counter: usize,
}
impl<T> ClockOrdered<T>
where
T: serde::Serialize + serde::de::DeserializeOwned,
{
/// Creates a new ClockOrdered.
/// The value is loaded from redis directly.
pub fn new(data: Generic<T>) -> Self {
let mut s = Self { data, counter: 0 };
s.load();
s
}
/// Stores the value in the redis server.
/// The value is only stored if the ordering_number is greater than the current number.
/// The order is incremented by one before each store.
///
/// # Example
/// ```
/// use dtypes::redis::Generic;
/// use dtypes::redis::ClockOrdered;
///
/// let client = redis::Client::open("redis://localhost:6379").unwrap();
/// let mut i32 = Generic::with_value(1, "test_add_clock_ordered_example1", client.clone());
/// let mut clock_ordered = ClockOrdered::new(i32);
/// clock_ordered.store(2).unwrap();
/// assert_eq!(*clock_ordered, 2);
/// ```
///
/// The store can fail if the order is not greater than the current order.
/// This happens, if the value was set from another instance before.
///
/// # Example
/// ```
/// use std::thread;
/// use dtypes::redis::Generic;
/// use dtypes::redis::ClockOrdered;
///
/// let client = redis::Client::open("redis://localhost:6379").unwrap();
/// let client2 = client.clone();
///
/// thread::scope(|s| {
/// let t1 = s.spawn(|| {
/// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client2);
/// let mut clock_ordered = ClockOrdered::new(i32);
/// while let Err(_) = clock_ordered.store(2) {}
/// assert_eq!(*clock_ordered, 2);
/// });
/// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example2", client);
/// let mut clock_ordered = ClockOrdered::new(i32);
/// while let Err(_) = clock_ordered.store(3) {}
/// assert_eq!(*clock_ordered, 3);
/// t1.join().unwrap();
/// });
/// ```
pub fn store(&mut self, val: T) -> Result<(), ClockOrderedError> {
self.counter += 1;
let val_json = serde_json::to_string(&val).unwrap();
let (v, order) = self.store_redis(&val_json);
if let Some(v) = v {
if self.counter >= order && v == val_json {
self.data.cache = Some(val);
return Ok(());
}
}
Err(ClockOrderedError::OrderError)
}
/// Stores the value in the redis server and blocks until succeeds.
/// Everything else is equal to [ClockOrdered::store].
///
/// # Example
/// ```
/// use std::thread;
/// use dtypes::redis::Generic;
/// use dtypes::redis::ClockOrdered;
///
/// let client = redis::Client::open("redis://localhost:6379").unwrap();
/// let client2 = client.clone();
///
/// thread::scope(|s| {
/// let t1 = s.spawn(|| {
/// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client2);
/// let mut clock_ordered = ClockOrdered::new(i32);
/// clock_ordered.store_blocking(2).unwrap();
/// assert_eq!(*clock_ordered, 2);
/// });
/// let mut i32: Generic<i32> = Generic::new("test_add_clock_ordered_example3", client);
/// let mut clock_ordered = ClockOrdered::new(i32);
/// clock_ordered.store_blocking(3).unwrap();
/// assert_eq!(*clock_ordered, 3);
/// t1.join().unwrap();
/// });
/// ```
pub fn store_blocking(&mut self, val: T) -> Result<(), ClockOrderedError> {
let val_json = serde_json::to_string(&val).unwrap();
let mut res = self.store_redis(&val_json);
while self.counter < res.1 || res.0.is_none() || res.0.unwrap() != val_json {
self.counter = res.1 + 1;
res = self.store_redis(&val_json);
}
self.data.cache = Some(val);
Ok(())
}
fn store_redis(&self, val: &str) -> (Option<String>, usize) {
let mut conn = self.data.client.get_connection().unwrap();
redis::Script::new(SET_LOAD_SCRIPT)
.arg(&self.data.key)
.arg(self.counter)
.arg(val)
.invoke(&mut conn)
.expect("Could not execute script")
}
/// Loads the value from the redis server.
/// This is done automatically on creation.
/// Mostly used for synchronization. Reset the counter to order from redis or 0.
pub fn load(&mut self) {
let mut conn = self.data.client.get_connection().unwrap();
let res: (Option<String>, Option<usize>) = redis::Script::new(LOAD_SCRIPT)
.arg(&self.data.key)
.invoke(&mut conn)
.expect("Could not execute script");
match res {
(Some(v), Some(order)) => {
self.data.cache = Some(from_str(&v).unwrap());
self.counter = order;
}
(Some(v), None) => {
self.data.cache = Some(from_str(&v).unwrap());
self.counter = 0;
}
(None, Some(c)) => {
self.data.cache = None;
self.counter = c;
}
_ => {
self.data.cache = None;
self.counter = 0;
}
}
}
}
impl<T> Deref for ClockOrdered<T> {
type Target = Generic<T>;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<T> DerefMut for ClockOrdered<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_set_load() {
use crate::redis::ClockOrdered;
use crate::redis::Generic;
let client = redis::Client::open("redis://localhost:6379").unwrap();
let i32: Generic<i32> = Generic::new("test_add_clock_ordered", client.clone());
let mut clock_ordered = ClockOrdered::new(i32);
clock_ordered.store(2).unwrap();
assert_eq!(*clock_ordered, 2);
}
}