use crate::redis::Generic;
use serde_json::from_str;
use std::ops::{Deref, DerefMut};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ClockOrderedError {
#[error("Ordering number is not greater than current number stored in redis.")]
OrderError,
}
const SET_LOAD_SCRIPT: &str = r#"
local key = ARGV[1]
local order = ARGV[2]
local current_order = redis.call("GET", key .. ":order")
if current_order == false or current_order < order then
redis.call("SET", key .. ":order", order)
redis.call("SET", key, ARGV[3])
current_order = order
end
return {redis.call("GET", key), current_order}
"#;
const LOAD_SCRIPT: &str = r#"
local key = ARGV[1]
return {redis.call("GET", key), redis.call("GET", key .. ":order")}
"#;
#[derive(Debug)]
pub struct ClockOrdered<T> {
data: Generic<T>,
counter: usize,
}
impl<T> ClockOrdered<T>
where
T: serde::Serialize + serde::de::DeserializeOwned,
{
pub fn new(data: Generic<T>) -> Self {
let mut s = Self { data, counter: 0 };
s.load();
s
}
pub fn store(&mut self, val: T) -> Result<(), ClockOrderedError> {
self.counter += 1;
let val_json = serde_json::to_string(&val).unwrap();
let (v, order) = self.store_redis(&val_json);
if let Some(v) = v {
if self.counter >= order && v == val_json {
self.data.cache = Some(val);
return Ok(());
}
}
Err(ClockOrderedError::OrderError)
}
pub fn store_blocking(&mut self, val: T) -> Result<(), ClockOrderedError> {
let val_json = serde_json::to_string(&val).unwrap();
let mut res = self.store_redis(&val_json);
while self.counter < res.1 || res.0.is_none() || res.0.unwrap() != val_json {
self.counter = res.1 + 1;
res = self.store_redis(&val_json);
}
self.data.cache = Some(val);
Ok(())
}
fn store_redis(&self, val: &str) -> (Option<String>, usize) {
let mut conn = self.data.client.get_connection().unwrap();
redis::Script::new(SET_LOAD_SCRIPT)
.arg(&self.data.key)
.arg(self.counter)
.arg(val)
.invoke(&mut conn)
.expect("Could not execute script")
}
pub fn load(&mut self) {
let mut conn = self.data.client.get_connection().unwrap();
let res: (Option<String>, Option<usize>) = redis::Script::new(LOAD_SCRIPT)
.arg(&self.data.key)
.invoke(&mut conn)
.expect("Could not execute script");
match res {
(Some(v), Some(order)) => {
self.data.cache = Some(from_str(&v).unwrap());
self.counter = order;
}
(Some(v), None) => {
self.data.cache = Some(from_str(&v).unwrap());
self.counter = 0;
}
(None, Some(c)) => {
self.data.cache = None;
self.counter = c;
}
_ => {
self.data.cache = None;
self.counter = 0;
}
}
}
}
impl<T> Deref for ClockOrdered<T> {
type Target = Generic<T>;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<T> DerefMut for ClockOrdered<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_set_load() {
use crate::redis::ClockOrdered;
use crate::redis::Generic;
let client = redis::Client::open("redis://localhost:6379").unwrap();
let i32: Generic<i32> = Generic::new("test_add_clock_ordered", client.clone());
let mut clock_ordered = ClockOrdered::new(i32);
clock_ordered.store(2).unwrap();
assert_eq!(*clock_ordered, 2);
}
}