use core::hash::Hash;
use interruptible_polling::{PollingTask, StillActiveChecker};
use std::num::TryFromIntError;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use crate::source_of_record::SourceOfRecord;
use crate::store::CacheStoreStrategy;
use super::{CacheHydrationStrategy, CacheLookupSuccess, StoreResult};
pub struct PollingHydrationStrategy<Key, Value> {
shared_inner_state: Arc<InnerState<Key, Value>>,
_polling_thread: PollingTask,
}
struct InnerState<Key, Value> {
data_source: Box<dyn SourceOfRecord<Key, Value> + Send + Sync>,
store: RwLock<Box<dyn CacheStoreStrategy<Key, Value> + Send + Sync>>,
}
pub enum PollHydrationError {
IntervalRangeError(String),
}
impl<Key, Value> PollingHydrationStrategy<Key, Value>
where
Key: Eq + Hash + Clone + 'static,
Value: Clone + 'static,
{
pub fn new(
data_source: Box<dyn SourceOfRecord<Key, Value> + Send + Sync>,
store: Box<dyn CacheStoreStrategy<Key, Value> + Send + Sync>,
polling_interval: Duration,
) -> Result<Self, TryFromIntError> {
let shared_state = Arc::new(InnerState {
data_source,
store: RwLock::from(store),
});
let hydrator = Self {
shared_inner_state: shared_state.clone(),
_polling_thread: PollingTask::new_with_checker(
polling_interval,
Box::new(move |checker| {
Self::poll(&shared_state, checker);
}),
)?,
};
Ok(hydrator)
}
fn poll(shared_inner_state: &Arc<InnerState<Key, Value>>, checker: &StillActiveChecker) {
let keys: Vec<Key> = shared_inner_state
.store
.read()
.unwrap()
.get_keys()
.collect();
for key in keys {
if !checker() {
break;
}
let peeked_value = shared_inner_state.store.read().unwrap().peek(&key);
if let Some(value) = peeked_value {
let canonical_value = shared_inner_state
.data_source
.retrieve_with_hint(&key, &value);
if let Some(v) = canonical_value.as_ref() {
let mut store_handle = shared_inner_state.store.write().unwrap();
if store_handle.contains(&key) {
store_handle.put(&key, v.clone());
}
}
}
}
}
}
impl<Key, Value> CacheHydrationStrategy<Key, Value> for PollingHydrationStrategy<Key, Value>
where
Key: Eq + Hash + Clone + 'static,
Value: Clone + 'static,
{
fn get(&mut self, key: &Key) -> Option<CacheLookupSuccess<Value>> {
let value = self.shared_inner_state.store.write().unwrap().get(key);
value
.map(|value| {
CacheLookupSuccess::new(
if self.shared_inner_state.data_source.is_valid(key, &value) {
StoreResult::Valid
} else {
StoreResult::Invalid
},
false,
value,
)
})
.or_else(|| {
self.shared_inner_state
.data_source
.retrieve(key)
.map(|value| {
self.shared_inner_state
.store
.write()
.unwrap()
.put(key, value.clone());
CacheLookupSuccess::new(StoreResult::NotFound, true, value)
})
})
}
fn flush(&mut self) {
self.shared_inner_state.store.write().unwrap().flush()
}
fn stop_tracking(&mut self, key: &Key) {
self.shared_inner_state.store.write().unwrap().delete(key);
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use std::{collections::HashMap, sync::Mutex, thread};
use crate::store::memory::MemoryStore;
use super::*;
use uuid::Uuid;
struct BasicSource<K, V> {
inner: HashMap<K, V>,
}
impl<K, V> BasicSource<K, V>
where
K: Eq + PartialEq + Hash,
V: Eq,
{
fn new() -> Self {
Self {
inner: HashMap::new(),
}
}
fn insert(&mut self, key: K, value: V) {
self.inner.insert(key, value);
}
}
impl<K, V> SourceOfRecord<K, V> for BasicSource<K, V>
where
K: Eq + PartialEq + Hash,
V: Clone + Eq,
{
fn retrieve(&self, key: &K) -> Option<V> {
self.inner.get(key).cloned()
}
fn retrieve_with_hint(&self, key: &K, _current: &V) -> Option<V> {
self.inner.get(key).cloned()
}
fn is_valid(&self, key: &K, value: &V) -> bool {
self.retrieve(key) == Some(value.clone())
}
}
struct ForgetfulSource {
inner: HashMap<Uuid, Mutex<Option<u64>>>,
}
impl ForgetfulSource {
fn new() -> Self {
Self {
inner: HashMap::new(),
}
}
fn insert(&mut self, key: Uuid, value: u64) {
self.inner.insert(key, Mutex::new(Some(value)));
}
}
impl SourceOfRecord<Uuid, u64> for ForgetfulSource {
fn retrieve(&self, key: &Uuid) -> Option<u64> {
self.inner.get(key).and_then(|x| {
let mut v = x.lock().unwrap();
let previous = *v;
*v = None;
previous
})
}
fn retrieve_with_hint(&self, key: &Uuid, _value: &u64) -> Option<u64> {
self.retrieve(key)
}
fn is_valid(&self, key: &Uuid, value: &u64) -> bool {
self.inner
.get(key)
.map_or(false, |v| *v.lock().unwrap() == Some(*value))
}
}
struct IncrementingSource {
value: AtomicU64,
}
impl IncrementingSource {
fn new() -> Self {
Self {
value: AtomicU64::new(0),
}
}
}
impl SourceOfRecord<String, u64> for IncrementingSource {
fn retrieve(&self, _key: &String) -> Option<u64> {
Some(self.value.fetch_add(1, Ordering::Relaxed))
}
fn retrieve_with_hint(&self, key: &String, _v: &u64) -> Option<u64> {
self.retrieve(key)
}
fn is_valid(&self, _key: &String, _value: &u64) -> bool {
false
}
}
#[test]
fn poll_hydration_valid_key() {
let mut source = BasicSource::new();
let key_1 = Uuid::new_v4();
let key_2 = Uuid::new_v4();
source.insert(key_1, String::from("AAAA"));
source.insert(key_2, String::from("AAAAAAAA"));
let mut cache = PollingHydrationStrategy::new(
Box::new(source),
Box::new(MemoryStore::new()),
Duration::from_secs(1),
)
.unwrap();
assert_eq!(
cache.get(&key_1),
Some(CacheLookupSuccess::Miss(String::from("AAAA")))
);
assert_eq!(
cache.get(&key_1),
Some(CacheLookupSuccess::Hit(String::from("AAAA")))
);
}
#[test]
fn poll_hydration_no_key() {
let mut source = BasicSource::new();
let key_1 = Uuid::new_v4();
let key_2 = Uuid::new_v4();
let key_3 = Uuid::new_v4();
source.insert(key_1, String::from("AAAA"));
source.insert(key_2, String::from("AAAAAAAA"));
let mut cache = PollingHydrationStrategy::new(
Box::new(source),
Box::new(MemoryStore::new()),
Duration::from_secs(1),
)
.unwrap();
assert_eq!(cache.get(&key_3), None);
assert_eq!(cache.get(&key_3), None);
}
#[test]
fn poll_hydration_lost_key() {
let mut source = ForgetfulSource::new();
let key_1 = Uuid::new_v4();
let key_2 = Uuid::new_v4();
source.insert(key_1, 0);
source.insert(key_2, 1);
let mut cache = PollingHydrationStrategy::new(
Box::new(source),
Box::new(MemoryStore::new()),
Duration::from_secs(1),
)
.unwrap();
assert_eq!(cache.get(&key_1), Some(CacheLookupSuccess::Miss(0)));
assert_eq!(cache.get(&key_1), Some(CacheLookupSuccess::Stale(0)));
}
#[test]
fn poll_hydration_update_after_poll() {
let source = IncrementingSource::new();
let mut cache = PollingHydrationStrategy::new(
Box::new(source),
Box::new(MemoryStore::new()),
Duration::from_secs(2),
)
.unwrap();
let key = String::from("asdf");
assert_eq!(cache.get(&key), Some(CacheLookupSuccess::Miss(0)),);
thread::sleep(Duration::from_secs(1));
assert_eq!(cache.get(&key), Some(CacheLookupSuccess::Stale(1)));
assert_eq!(cache.get(&key), Some(CacheLookupSuccess::Stale(1)),);
thread::sleep(Duration::from_secs(2));
assert_eq!(cache.get(&key), Some(CacheLookupSuccess::Stale(2)),);
}
#[test]
fn poll_hydration_stop_tracking_miss() {
let mut source = BasicSource::new();
let key_1 = Uuid::new_v4();
let key_2 = Uuid::new_v4();
source.insert(key_1, String::from("AAAA"));
source.insert(key_2, String::from("AAAAAAAA"));
let mut cache = PollingHydrationStrategy::new(
Box::new(source),
Box::new(MemoryStore::new()),
Duration::from_secs(1),
)
.unwrap();
assert_eq!(
cache.get(&key_1),
Some(CacheLookupSuccess::Miss(String::from("AAAA")))
);
assert_eq!(
cache.get(&key_1),
Some(CacheLookupSuccess::Hit(String::from("AAAA")))
);
cache.stop_tracking(&key_1);
assert_eq!(
cache.get(&key_1),
Some(CacheLookupSuccess::Miss(String::from("AAAA")))
);
}
}