use std::{
collections::VecDeque,
hash::Hash,
ops::{Deref, Neg},
};
pub use bitcode::{Decode, DecodeOwned, Encode};
pub use chrono::{DateTime, Duration, Utc};
use scc::HashMap;
use crate::math::{SaturatingAdd, SaturatingSub};
mod math;
pub struct Value<T> {
pub expires_at: DateTime<Utc>,
pub value: T,
}
impl<T> Deref for Value<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<T> Value<T> {
fn is_expired(&self, now: &DateTime<Utc>) -> bool {
self.expires_at <= *now
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Error {
Decoding,
}
pub struct TurboStore<K: Hash + Eq> {
pairs: HashMap<K, Value<Vec<u8>>>,
maps: HashMap<K, HashMap<K, Value<Vec<u8>>>>,
sets: HashMap<K, HashMap<Vec<u8>, Value<()>>>,
deques: HashMap<K, VecDeque<Value<Vec<u8>>>>,
}
impl<K> TurboStore<K>
where
K: Hash + Eq,
{
pub fn new() -> Self {
Self {
pairs: HashMap::new(),
maps: HashMap::new(),
sets: HashMap::new(),
deques: HashMap::new(),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
pairs: HashMap::with_capacity(capacity),
maps: HashMap::with_capacity(capacity),
sets: HashMap::with_capacity(capacity),
deques: HashMap::with_capacity(capacity),
}
}
pub async fn evict(&self) {
let now = Utc::now();
self.pairs.retain_async(|_, v| !v.is_expired(&now)).await;
self.maps
.retain_async(|_, map| {
map.retain(|_, v| !v.is_expired(&now));
!map.is_empty()
})
.await;
self.sets
.retain_async(|_, set| {
set.retain(|_, v| !v.is_expired(&now));
!set.is_empty()
})
.await;
self.deques
.retain_async(|_, deque| {
deque.retain(|i| !i.is_expired(&now));
!deque.is_empty()
})
.await;
}
pub async fn set<V>(&self, key: K, value: &V, ttl: Duration)
where
V: Encode,
{
let expires_at = Utc::now() + ttl;
let encoded: Vec<u8> = bitcode::encode(value);
self.pairs
.upsert_async(
key,
Value {
value: encoded,
expires_at,
},
)
.await;
}
pub async fn incr<V>(&self, key: K, by: V, ttl: Duration) -> Result<V, Error>
where
V: Encode + DecodeOwned + SaturatingAdd<Output = V>,
{
let expires_at = Utc::now() + ttl;
let value = self.pairs.get_async(&key).await;
if let Some(mut value) = value {
let decoded = bitcode::decode::<V>(&value).map_err(|_| Error::Decoding)?;
let new_value = decoded.saturating_add(&by);
*value = Value {
value: bitcode::encode(&new_value),
expires_at,
};
Ok(new_value)
} else {
self.pairs
.upsert_async(
key,
Value {
value: bitcode::encode(&by),
expires_at,
},
)
.await;
Ok(by)
}
}
pub async fn decr<V>(&self, key: K, by: V, ttl: Duration) -> Result<V, Error>
where
V: Encode + DecodeOwned + SaturatingSub<Output = V> + Neg<Output = V>,
{
let expires_at = Utc::now() + ttl;
let value = self.pairs.get_async(&key).await;
if let Some(mut value) = value {
let decoded = bitcode::decode::<V>(&value).map_err(|_| Error::Decoding)?;
let new_value = decoded.saturating_sub(&by);
*value = Value {
value: bitcode::encode(&new_value),
expires_at,
};
Ok(new_value)
} else {
let new_value = -by;
self.pairs
.upsert_async(
key,
Value {
value: bitcode::encode(&new_value),
expires_at,
},
)
.await;
Ok(new_value)
}
}
pub async fn get<V>(&self, key: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let value = self.pairs.get_async(key).await?;
if value.is_expired(&Utc::now()) {
drop(value);
self.pairs.remove_async(key).await;
return None;
}
Some(Value {
value: bitcode::decode(&value).map_err(|_| Error::Decoding),
expires_at: value.expires_at,
})
}
pub async fn rem(&self, key: &K) -> bool {
self.pairs.remove_async(key).await.is_some()
}
pub async fn pop<V>(&self, key: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let (_key, value) = self.pairs.remove_async(key).await?;
if value.is_expired(&Utc::now()) {
return None;
}
Some(Value {
value: bitcode::decode(&value).map_err(|_| Error::Decoding),
expires_at: value.expires_at,
})
}
pub async fn expire(&self, key: K, ttl: Duration) -> bool {
let expires_at = Utc::now() + ttl;
let maybe_new_value = self.pairs.get_async(&key).await.map(|value| Value {
value: value.value.clone(),
expires_at,
});
match maybe_new_value {
None => false,
Some(new_value) => {
self.pairs.upsert_async(key, new_value).await;
true
}
}
}
pub async fn exists(&self, key: &K) -> bool {
let expired = match self.pairs.get_async(key).await {
None => return false,
Some(inner) => inner.is_expired(&Utc::now()),
};
if expired {
self.pairs.remove_async(key).await;
return false;
}
true
}
pub async fn sadd<V>(&self, skey: K, item: V, ttl: Duration, capacity: usize)
where
V: Encode,
{
let expires_at = Utc::now() + ttl;
let set = self.sets.get_async(&skey).await;
let encoded = bitcode::encode(&item);
let value = Value {
value: (),
expires_at,
};
if let Some(set) = set {
set.upsert_async(encoded, value).await;
} else {
let set = HashMap::with_capacity(capacity.max(1));
set.upsert_async(encoded, value).await;
self.sets.upsert_async(skey, set).await;
}
}
pub async fn srem<V>(&self, skey: &K, item: &V)
where
V: Encode,
{
let set = match self.sets.get_async(skey).await {
None => return,
Some(v) => v,
};
let encoded = bitcode::encode(item);
let result = set.remove_async(&encoded).await;
if result.is_some() && set.is_empty() {
drop(set);
self.sets.remove_async(skey).await;
}
}
pub async fn slen(&self, skey: &K) -> usize {
let set = match self.sets.get_async(skey).await {
None => return 0,
Some(v) => v,
};
let now = Utc::now();
set.retain_async(|_k, v| !v.is_expired(&now)).await;
set.len()
}
pub async fn sclear(&self, skey: &K) {
self.sets.remove_async(skey).await;
}
pub async fn sttl<V>(&self, skey: &K, item: &V) -> Option<DateTime<Utc>>
where
V: Encode,
{
let set = self.sets.get_async(skey).await?;
let encoded = bitcode::encode(item);
let item = set.get_async(&encoded).await?;
if item.is_expired(&Utc::now()) {
set.remove_async(&encoded).await;
if set.is_empty() {
drop(item);
drop(set);
self.sets.remove_async(skey).await;
}
return None;
}
Some(item.expires_at)
}
pub async fn sexpire<V>(&self, skey: &K, item: &V, ttl: Duration)
where
V: Encode,
{
let set = match self.sets.get_async(skey).await {
None => return,
Some(v) => v,
};
let encoded = bitcode::encode(item);
let item = match set.get_async(&encoded).await {
None => return,
Some(v) => v,
};
let now = Utc::now();
if item.is_expired(&now) {
set.remove_async(&encoded).await;
if set.is_empty() {
drop(item);
drop(set);
self.sets.remove_async(skey).await;
}
return;
}
let expires_at = now + ttl;
set.upsert_async(
encoded,
Value {
value: (),
expires_at,
},
)
.await;
}
pub async fn scontains<V>(&self, skey: &K, item: &V) -> bool
where
V: Encode,
{
let set = match self.sets.get_async(skey).await {
None => return false,
Some(v) => v,
};
let encoded = bitcode::encode(item);
let value = match set.get_async(&encoded).await {
None => return false,
Some(v) => v,
};
if value.is_expired(&Utc::now()) {
set.remove_async(&encoded).await;
if set.is_empty() {
drop(value);
drop(set);
self.sets.remove_async(skey).await;
}
return false;
}
true
}
pub async fn sall<V>(&self, skey: &K) -> Vec<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let set = match self.sets.get_async(skey).await {
None => return Vec::default(),
Some(v) => v,
};
let mut values = Vec::with_capacity(set.len());
let now = Utc::now();
set.retain_async(|k, v| {
if v.is_expired(&now) {
return false;
}
let decoded = bitcode::decode::<V>(k).map_err(|_| Error::Decoding);
values.push(Value {
value: decoded,
expires_at: v.expires_at,
});
true
})
.await;
values
}
pub async fn hset<V>(&self, hkey: K, key: K, value: V, ttl: Duration, capacity: usize)
where
V: Encode,
{
let map = self.maps.get_async(&hkey).await;
let expires_at = Utc::now() + ttl;
let encoded = bitcode::encode(&value);
let value = Value {
value: encoded,
expires_at,
};
if let Some(map) = map {
map.upsert_async(key, value).await;
} else {
let map = HashMap::with_capacity(std::cmp::max(capacity, 1));
map.upsert_async(key, value).await;
self.maps.upsert_async(hkey, map).await;
}
}
pub async fn hget<V>(&self, hkey: &K, key: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let map = self.maps.get_async(hkey).await?;
let value = map.get_async(key).await?;
if value.is_expired(&Utc::now()) {
map.remove_async(key).await;
if map.is_empty() {
drop(value);
drop(map);
self.maps.remove_async(hkey).await;
}
return None;
}
Some(Value {
value: bitcode::decode(&value).map_err(|_| Error::Decoding),
expires_at: value.expires_at,
})
}
pub async fn hrem(&self, hkey: &K, key: &K) {
let map = match self.maps.get_async(hkey).await {
None => return,
Some(v) => v,
};
map.remove_async(key).await;
}
pub async fn hpop<V>(&self, hkey: &K, key: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let map = self.maps.get_async(hkey).await?;
let (_key, value) = map.remove_async(key).await?;
if map.is_empty() {
self.maps.remove_async(hkey).await;
}
drop(map);
if value.is_expired(&Utc::now()) {
return None;
}
Some(Value {
value: bitcode::decode(&value).map_err(|_| Error::Decoding),
expires_at: value.expires_at,
})
}
pub async fn hexists(&self, hkey: &K, key: &K) -> bool {
let map = match self.maps.get_async(hkey).await {
None => return false,
Some(v) => v,
};
let value = match map.get_async(key).await {
None => return false,
Some(v) => v,
};
if value.is_expired(&Utc::now()) {
map.remove_async(key).await;
if map.is_empty() {
drop(value);
drop(map);
self.maps.remove_async(hkey).await;
}
return false;
}
true
}
pub async fn hexpire(&self, hkey: &K, key: K, ttl: Duration) {
let map = match self.maps.get_async(hkey).await {
None => return,
Some(v) => v,
};
let value = match map.get_async(&key).await {
None => return,
Some(v) => v,
};
let now = Utc::now();
if value.is_expired(&now) {
map.remove_async(&key).await;
if map.is_empty() {
drop(value);
drop(map);
self.maps.remove_async(hkey).await;
}
return;
}
map.upsert_async(
key,
Value {
value: value.value.clone(),
expires_at: now + ttl,
},
)
.await;
}
pub async fn hlen(&self, hkey: &K) -> usize {
let map = match self.maps.get_async(hkey).await {
None => return 0,
Some(v) => v,
};
let now = Utc::now();
map.retain_async(|_k, v| !v.is_expired(&now)).await;
if map.is_empty() {
drop(map);
self.maps.remove_async(hkey).await;
return 0;
}
map.len()
}
pub async fn hclear(&self, hkey: &K) {
self.maps.remove_async(hkey).await;
}
pub async fn dappend<V>(&self, dkey: K, value: V, ttl: Duration, capacity: usize)
where
V: Encode,
{
let deque = self.deques.get_async(&dkey).await;
let encoded = bitcode::encode(&value);
let value = Value {
value: encoded,
expires_at: Utc::now() + ttl,
};
if let Some(mut deque) = deque {
deque.push_back(value);
} else {
let mut deque = VecDeque::with_capacity(capacity.max(1));
deque.push_back(value);
self.deques.upsert_async(dkey, deque).await;
}
}
pub async fn dprepend<V>(&self, dkey: K, value: V, ttl: Duration, capacity: usize)
where
V: Encode,
{
let deque = self.deques.get_async(&dkey).await;
let encoded = bitcode::encode(&value);
let value = Value {
value: encoded,
expires_at: Utc::now() + ttl,
};
if let Some(mut deque) = deque {
deque.push_front(value);
} else {
let mut deque = VecDeque::with_capacity(capacity.max(1));
deque.push_front(value);
self.deques.upsert_async(dkey, deque).await;
}
}
pub async fn dfpop<V>(&self, dkey: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let mut deque = self.deques.get_async(dkey).await?;
let now = Utc::now();
let value = loop {
let value = deque.pop_front()?;
if value.is_expired(&now) {
continue;
}
break Some(value);
}?;
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
} else {
drop(deque);
}
let decoded = bitcode::decode::<V>(&value).map_err(|_| Error::Decoding);
Some(Value {
value: decoded,
expires_at: value.expires_at,
})
}
pub async fn dbpop<V>(&self, dkey: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let mut deque = self.deques.get_async(dkey).await?;
let now = Utc::now();
let value = loop {
let value = deque.pop_back()?;
if value.is_expired(&now) {
continue;
}
break Some(value);
}?;
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
} else {
drop(deque);
}
let decoded = bitcode::decode::<V>(&value).map_err(|_| Error::Decoding);
Some(Value {
value: decoded,
expires_at: value.expires_at,
})
}
pub async fn dpop<V>(&self, dkey: &K, index: usize) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let mut deque = self.deques.get_async(dkey).await?;
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
let value = deque.drain(index..=index).next()?;
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
} else {
drop(deque);
}
let decoded = bitcode::decode::<V>(&value).map_err(|_| Error::Decoding);
Some(Value {
value: decoded,
expires_at: value.expires_at,
})
}
pub async fn dfrem(&self, dkey: &K) {
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return,
};
let now = Utc::now();
loop {
let value = match deque.pop_front() {
Some(v) => v,
None => {
drop(deque);
self.deques.remove_async(dkey).await;
break;
}
};
if !value.is_expired(&now) {
break;
}
}
}
pub async fn dbrem(&self, dkey: &K) {
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return,
};
let now = Utc::now();
loop {
let value = match deque.pop_back() {
Some(v) => v,
None => {
drop(deque);
self.deques.remove_async(dkey).await;
break;
}
};
if !value.is_expired(&now) {
break;
}
}
}
pub async fn drem(&self, dkey: &K, index: usize) {
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return,
};
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
deque.drain(index..=index);
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
}
}
pub async fn dfpeek<V>(&self, dkey: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let mut deque = self.deques.get_async(dkey).await?;
let now = Utc::now();
let value = loop {
let value = match deque.front() {
Some(v) => v,
None => {
drop(deque);
self.deques.remove_async(dkey).await;
return None;
}
};
if value.is_expired(&now) {
deque.pop_front();
continue;
}
break Some(value);
}?;
let decoded = bitcode::decode::<V>(value).map_err(|_| Error::Decoding);
Some(Value {
value: decoded,
expires_at: value.expires_at,
})
}
pub async fn dbpeek<V>(&self, dkey: &K) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let mut deque = self.deques.get_async(dkey).await?;
let now = Utc::now();
let value = loop {
let value = match deque.back() {
Some(v) => v,
None => {
drop(deque);
self.deques.remove_async(dkey).await;
return None;
}
};
if value.is_expired(&now) {
deque.pop_back();
continue;
}
break Some(value);
}?;
let decoded = bitcode::decode::<V>(value).map_err(|_| Error::Decoding);
Some(Value {
value: decoded,
expires_at: value.expires_at,
})
}
pub async fn dpeek<V>(&self, dkey: &K, index: usize) -> Option<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let mut deque = self.deques.get_async(dkey).await?;
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
return None;
}
let item = (*deque).get(index)?;
let decoded = bitcode::decode::<V>(item).map_err(|_| Error::Decoding);
Some(Value {
value: decoded,
expires_at: item.expires_at,
})
}
pub async fn dlen(&self, dkey: &K) -> usize {
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return 0,
};
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
return 0;
}
deque.len()
}
pub async fn dexpire(&self, dkey: &K, index: usize, ttl: Duration) {
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return,
};
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
return;
}
let item = match (*deque).get_mut(index) {
Some(v) => v,
None => return,
};
item.expires_at = now + ttl;
}
pub async fn dclear(&self, dkey: &K) {
self.deques.remove_async(dkey).await;
}
pub async fn dmap<V>(
&self,
dkey: &K,
f: impl Fn(Value<Result<V, Error>>) -> Option<Value<Result<V, Error>>>,
) where
V: Encode + DecodeOwned,
{
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return,
};
let now = Utc::now();
deque.retain_mut(|v| {
if v.is_expired(&now) {
return false;
}
let decoded = bitcode::decode::<V>(v).map_err(|_| Error::Decoding);
let new_value = f(Value {
value: decoded,
expires_at: v.expires_at,
});
if let Some(new_value) = new_value {
if let Ok(inner) = new_value.value {
let encoded = bitcode::encode(&inner);
*v = Value {
value: encoded,
expires_at: new_value.expires_at,
};
} else {
v.expires_at = new_value.expires_at;
}
}
true
});
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
}
}
pub async fn dall<V>(&self, dkey: &K) -> VecDeque<Value<Result<V, Error>>>
where
V: DecodeOwned,
{
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return VecDeque::new(),
};
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
return VecDeque::new();
}
(*deque)
.iter()
.map(|i| {
let decoded = bitcode::decode::<V>(i).map_err(|_| Error::Decoding);
Value {
value: decoded,
expires_at: i.expires_at,
}
})
.collect()
}
pub async fn dftruncate(&self, dkey: &K, len: usize) {
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return,
};
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
return;
}
deque.truncate(len);
}
pub async fn dbtruncate(&self, dkey: &K, len: usize) {
let mut deque = match self.deques.get_async(dkey).await {
Some(v) => v,
None => return,
};
let now = Utc::now();
deque.retain(|i| !i.is_expired(&now));
if deque.is_empty() {
drop(deque);
self.deques.remove_async(dkey).await;
return;
}
let idx = (deque.len() - len).max(0);
deque.drain(..idx);
}
}
impl<K> Default for TurboStore<K>
where
K: Hash + Eq,
{
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Encode, Decode, PartialEq, Eq, Clone)]
struct TestValue {
number: i32,
string: String,
}
#[pollster::test]
async fn test_kv_operations() {
let store: TurboStore<i32> = TurboStore::with_capacity(2);
let value_1 = TestValue {
number: 123,
string: "Hello world!".into(),
};
let value_2 = TestValue {
number: 456,
string: "Bye bye world!".into(),
};
store.set(1, &value_1, Duration::minutes(1)).await;
store.set(2, &value_1, Duration::minutes(1)).await;
store.set(2, &value_2, Duration::minutes(1)).await;
let retrieved_1 = store
.get::<TestValue>(&1)
.await
.expect("There was no KV pair with key 1");
let now = Utc::now();
assert!(
retrieved_1.expires_at > now + Duration::seconds(59)
&& retrieved_1.expires_at <= now + Duration::minutes(1),
"The expiry time for key 1 was not between 1 minute and 59 seconds in the future."
);
assert_eq!(
retrieved_1
.as_ref()
.expect("The retrieved KV 1 failed to be decoded"),
&value_1,
"The retrieved value for key 1 was not the same value than the one set previously."
);
store.expire(1, Duration::seconds(30)).await;
let popped_1 = store
.pop::<TestValue>(&1)
.await
.expect("There was no value to pop for key 1");
let now = Utc::now();
assert!(
popped_1.expires_at > now + Duration::seconds(29)
&& popped_1.expires_at <= now + Duration::seconds(30),
"The expiry time for key 1 was not between 29 and 30 seconds in the future, even after its TTL was updated."
);
assert_eq!(
popped_1
.as_ref()
.expect("The popped KV 1 failed to be decoded"),
&value_1,
"The retrieved value for key 1 was not the same value than the one set previously."
);
let retrieved_after_popped_1 = store.get::<TestValue>(&1).await;
assert!(
retrieved_after_popped_1.is_none(),
"The value for key 1 was there even after popped."
);
assert!(
store.exists(&2).await,
"The key 2 did not exist even after it was added."
);
store.rem(&2).await;
assert!(
!store.exists(&2).await,
"The key 2 existed even after it was removed."
);
let retrieved_after_removed_2 = store.get::<TestValue>(&2).await;
assert!(
retrieved_after_removed_2.is_none(),
"The value for key 2 was there even after removed."
);
store.set(1, &1, Duration::minutes(1)).await;
let incr_1 = store
.get::<i32>(&1)
.await
.expect("There was no value for key 1 after setting to a previously-removed key");
let now = Utc::now();
assert!(
incr_1.expires_at > now + Duration::seconds(59)
&& popped_1.expires_at <= now + Duration::minutes(1),
"The expiry time for key 1 was not between 59 seconds and a minute in the future after setting to a previously-removed key."
);
assert_eq!(
incr_1.as_ref().expect(
"The value could not be decoded into i32 after setting to a previously-removed key"
),
&1,
"The value of key 1 was not 1 after setting to a previously-removed key"
);
store
.incr(1, 2, Duration::minutes(5))
.await
.expect("INCR failed to run because the old value could not be decoded");
let incr_1 = store
.get::<i32>(&1)
.await
.expect("There was no value for key 1 after incrementing");
assert_eq!(
incr_1
.as_ref()
.expect("The value could not be decoded into i32 after incrementing"),
&3,
"The value of key 1 was not properly incremented by INCR"
);
assert!(
incr_1.expires_at > now + Duration::seconds(60 * 4 + 59)
&& popped_1.expires_at <= now + Duration::minutes(5),
"The expiry time for key 1 was not between 4:59 min and 5:00 min in the future after setting to a previously-removed key."
);
store
.decr(1, 1, Duration::minutes(1))
.await
.expect("Could not decode the old value to i32 before decrementing");
let incr_1 = store
.get::<i32>(&1)
.await
.expect("There was no value for key 1 after decrementing after incrementing");
let now = Utc::now();
assert!(
incr_1.expires_at > now + Duration::seconds(59)
&& popped_1.expires_at <= now + Duration::minutes(1),
"The expiry time for key 1 was not between 59 seconds and a minute in the future after decrementing after incrementing."
);
assert_eq!(
incr_1.as_ref().expect(
"The value could not be decoded into i32 after decrementing after incrementing"
),
&2,
"The value of key 1 was not 1 after decrementing after incrementing"
);
store.set(1, &0, Duration::minutes(1)).await;
store
.decr(1, i32::MAX, Duration::minutes(1))
.await
.expect("Could not decode the old value when decrementing by i32::MAX");
store.decr(1, i32::MAX, Duration::minutes(1)).await.expect(
"Could not decode the old value when decrementing by i32::MAX for the second time",
);
let decr_1 = store
.get::<i32>(&1)
.await
.expect("There was no value for key 1 after decrementing after incrementing");
assert_eq!(
decr_1
.as_ref()
.expect("The value could not be decoded into i32 after decrementing by i32::MAX"),
&i32::MIN,
"The value of key 1 was not saturated to i32::MIN after decrementing by i32::MAX"
);
store.set(1, &0, Duration::minutes(1)).await;
store
.incr(1, i32::MAX, Duration::minutes(1))
.await
.expect("Could not decode the old value when incrementing by i32::MAX");
store.incr(1, i32::MAX, Duration::minutes(1)).await.expect(
"Could not decode the old value when incrementing by i32::MAX for the second time",
);
let incr_1 = store
.get::<i32>(&1)
.await
.expect("There was no value for key 1 after incrementing after resetting");
assert_eq!(
incr_1
.as_ref()
.expect("The value could not be decoded into i32 after incrementing by i32::MAX"),
&i32::MAX,
"The value of key 1 was not saturated to i32::MAX after incrementing by i32::MAX"
);
}
}