use std::any::Any;
use std::borrow::Borrow;
use std::cmp::Eq;
use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::ops::DerefMut;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use crate::queues::CacheQueues;
use crate::queues::ExpiryOption;
use crate::CacheClock;
pub struct RetrievalResult<V, D> {
pub value: V,
pub footprint: usize,
pub stale_expiry: Option<D>,
pub ultimate_expiry: Option<D>,
}
struct EntryItem<C>
where
C: CacheClock,
{
pub value: Mutex<Option<EntryItemValue<C>>>,
pub stale: RwLock<Option<StaleItem<C>>>,
}
struct StaleItem<C>
where
C: CacheClock,
{
pub value: ItemBox,
pub absolute_ultimate_expiry: Option<C::Instant>,
}
struct EntryItemValue<C>
where
C: CacheClock,
{
pub item: ItemBox,
pub footprint: usize,
pub absolute_stale_expiry: Option<C::Instant>,
pub absolute_ultimate_expiry: Option<C::Instant>,
}
type ItemBox = Box<dyn Any + Send + Sync>;
type Entry<C> = Arc<EntryItem<C>>;
type Map<K, C> = HashMap<K, Entry<C>>;
pub struct Cache<K, C>
where
K: Clone + Hash + Eq + Send + Sync + 'static,
C: CacheClock,
{
cache: RwLock<Map<K, C>>,
queues: Mutex<CacheQueues<K, C::Instant>>,
cache_size: AtomicUsize,
max_cache_size: usize,
clock: C,
}
const CACHE_ENTRY_MISSING: &str = "cache value unexpectedly missing";
enum PopType {
LeastRecentlyUsed,
Expired,
}
#[derive(Debug, PartialEq)]
enum PostTask {
None,
SpawnMaintainCacheSize,
}
impl<K, C: CacheClock> Cache<K, C>
where
K: Clone + Hash + Eq + Send + Sync + 'static,
{
pub fn new(clock: C, max_cache_size: usize) -> Arc<Self> {
Arc::new(Self {
cache: RwLock::new(HashMap::new()),
queues: Mutex::new(CacheQueues::new()),
cache_size: AtomicUsize::new(0),
max_cache_size,
clock,
})
}
pub async fn get_or_insert_with<Q, V, E>(
self: &Arc<Self>,
key: &Q,
future: impl Future<Output = Result<RetrievalResult<V, C::Duration>, E>>,
) -> Result<V, E>
where
for<'a> K: Borrow<Q> + From<&'a Q>,
Q: Hash + Eq + ?Sized,
V: Clone + Send + Sync + 'static,
{
self.internal_get_or_insert_with(key, future, async {}, async {})
.await
.map(|(value, post_task)| {
match post_task {
PostTask::None => {}
PostTask::SpawnMaintainCacheSize => {
let cache = Arc::clone(self);
tokio::spawn(async move { cache.maintain_cache_size().await });
}
};
value
})
}
async fn internal_get_or_insert_with<Q, V, E>(
self: &Arc<Self>,
key: &Q,
future: impl Future<Output = Result<RetrievalResult<V, C::Duration>, E>>,
midflight_future: impl Future<Output = ()>,
queues_lock_future: impl Future<Output = ()>,
) -> Result<(V, PostTask), E>
where
for<'a> K: Borrow<Q> + From<&'a Q>,
Q: Hash + Eq + ?Sized,
V: Clone + Send + Sync + 'static,
{
self.purge_expired_items().await;
let (key, entry) = self.get_cache_entry(key).await;
let stale_value = entry
.stale
.read()
.await
.as_ref()
.map(|x| {
let is_expired = x
.absolute_ultimate_expiry
.as_ref()
.is_some_and(|x| *x <= self.clock.now());
(!is_expired)
.then(|| x.value.downcast_ref::<V>().cloned())
.flatten()
})
.flatten();
let (item_exists, result, absolute_expiry, post_task) = {
let guard_result = if let Some(stale_value) = stale_value {
entry.value.try_lock().map_err(|_| stale_value)
} else {
Ok(entry.value.lock().await)
};
match guard_result {
Err(stale_value) => {
(true, stale_value, ExpiryOption::Ignore, PostTask::None)
}
Ok(mut value_guard) => {
let is_stale = value_guard.as_ref().is_some_and(|value| {
value
.absolute_stale_expiry
.as_ref()
.is_some_and(|x| *x < self.clock.now())
});
if is_stale {
let absolute_ultimate_expiry = value_guard
.as_ref()
.and_then(|x| x.absolute_ultimate_expiry.clone());
let newly_stale_value = self.take_entry_item_value(value_guard.deref_mut());
let newly_stale_item = newly_stale_value.map(|value| StaleItem {
value,
absolute_ultimate_expiry,
});
*entry.stale.write().await = newly_stale_item;
}
midflight_future.await;
let value = value_guard
.as_ref()
.and_then(|x| x.item.downcast_ref::<V>())
.cloned();
if let Some(value) = value {
(true, value, ExpiryOption::Ignore, PostTask::None)
} else {
let retrieval_result = future.await?;
let absolute_stale_expiry =
retrieval_result.stale_expiry.map(|x| self.clock.now() + x);
let absolute_ultimate_expiry = retrieval_result
.ultimate_expiry
.map(|x| self.clock.now() + x);
self.take_entry_item_value(value_guard.deref_mut());
*value_guard = Some(EntryItemValue {
item: Box::new(retrieval_result.value.clone()),
footprint: retrieval_result.footprint,
absolute_stale_expiry,
absolute_ultimate_expiry: absolute_ultimate_expiry.clone(),
});
*entry.stale.write().await = None;
let absolute_ultimate_expiry = match absolute_ultimate_expiry {
None => ExpiryOption::Never,
Some(x) => ExpiryOption::Expires(x),
};
let new_cache_size = self
.cache_size
.fetch_add(retrieval_result.footprint, Ordering::Relaxed)
+ retrieval_result.footprint;
let post_task = if new_cache_size > self.max_cache_size {
PostTask::SpawnMaintainCacheSize
} else {
PostTask::None
};
(
is_stale,
retrieval_result.value,
absolute_ultimate_expiry,
post_task,
)
}
}
}
};
let mut queues = self.queues.lock().await;
queues_lock_future.await;
if !item_exists || queues.exists(&key) {
queues.push(key, absolute_expiry);
}
Ok((result, post_task))
}
async fn get_cache_entry<Q>(&self, key: &Q) -> (K, Entry<C>)
where
for<'a> K: Borrow<Q> + From<&'a Q>,
Q: Hash + Eq + ?Sized,
{
let reader = self.insert_key(key).await;
let (key, cache_value) = reader.get_key_value(key).expect(CACHE_ENTRY_MISSING);
(key.clone(), Arc::clone(cache_value))
}
async fn insert_key<Q>(&self, key: &Q) -> RwLockReadGuard<'_, Map<K, C>>
where
for<'a> K: Borrow<Q> + From<&'a Q>,
Q: Hash + Eq + ?Sized,
{
let mut reader = self.cache.read().await;
if !reader.contains_key(key) {
drop(reader);
let entry_item = EntryItem {
value: Mutex::new(None),
stale: RwLock::new(None),
};
self.cache
.write()
.await
.entry(key.into())
.or_insert_with(|| Arc::new(entry_item));
reader = self.cache.read().await;
}
reader
}
async fn maintain_cache_size(&self) {
while self.maintain_cache_size_once(async {}).await {}
}
async fn maintain_cache_size_once(&self, queue_pop_future: impl Future<Output = ()>) -> bool {
let exceeded_max_cache_size = self.cache_size.load(Ordering::Relaxed) > self.max_cache_size;
if exceeded_max_cache_size {
self.pop_from_queue_and_remove_from_cache(PopType::LeastRecentlyUsed, queue_pop_future)
.await;
}
exceeded_max_cache_size
}
async fn purge_expired_items(&self) {
while self
.pop_from_queue_and_remove_from_cache(PopType::Expired, async {})
.await
{}
}
async fn pop_from_queue_and_remove_from_cache(
&self,
pop_type: PopType,
queue_pop_future: impl Future<Output = ()>,
) -> bool {
let cache = self.cache.read().await;
let try_lock_callback = |key: &K| {
cache
.get(key.borrow())
.expect(CACHE_ENTRY_MISSING)
.value
.try_lock()
.ok()
};
let mut popped_item = {
let mut queues = self.queues.lock().await;
match pop_type {
PopType::LeastRecentlyUsed => queues.pop_least_recently_used(try_lock_callback),
PopType::Expired => queues.pop_expired(&self.clock.now(), try_lock_callback),
}
};
if let Some(ref mut value_guard) = popped_item.as_mut() {
queue_pop_future.await;
self.take_entry_item_value(value_guard);
}
popped_item.is_some()
}
fn take_entry_item_value(&self, value: &mut Option<EntryItemValue<C>>) -> Option<ItemBox> {
value.take().map(|entry| {
self.cache_size
.fetch_sub(entry.footprint, Ordering::Relaxed);
entry.item
})
}
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use std::result::Result;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use futures::future::join_all;
use futures::Future;
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;
use std::task::Poll;
use tokio::task::spawn;
use tokio::task::yield_now;
use super::Cache;
use super::PostTask;
use super::RetrievalResult;
use crate::fake_clock::FakeClock;
use crate::fake_clock::FakeClockDuration;
async fn rr<T>(value: T) -> Result<RetrievalResult<T, FakeClockDuration>, ()> {
Ok(RetrievalResult {
value,
footprint: 0,
ultimate_expiry: None,
stale_expiry: None,
})
}
async fn rr_fp(value: usize) -> Result<RetrievalResult<usize, FakeClockDuration>, ()> {
Ok(RetrievalResult {
value,
footprint: value,
ultimate_expiry: None,
stale_expiry: None,
})
}
async fn rr_expiry<T>(
value: T,
ultimate_expiry: FakeClockDuration,
) -> Result<RetrievalResult<T, FakeClockDuration>, ()> {
Ok(RetrievalResult {
value,
footprint: 0,
ultimate_expiry: Some(ultimate_expiry),
stale_expiry: None,
})
}
async fn rr_unexpected<T>() -> Result<RetrievalResult<T, FakeClockDuration>, ()> {
panic!("Should not get here")
}
#[tokio::test]
async fn key_str() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), 100);
assert_eq!(Ok(100), cache.get_or_insert_with("a", rr_fp(100)).await);
}
#[tokio::test]
async fn key_i32array() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<[i32]>, _>::new(fake_clock, 100);
let key = [123i32, 456, 789];
let actual = cache.get_or_insert_with(&key[..], rr_fp(100)).await;
assert_eq!(Ok(100), actual);
}
#[tokio::test]
async fn double_loading() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock, 100);
let t1 = cache.get_or_insert_with("a", async {
tokio::time::sleep(Duration::from_secs(1)).await;
rr_fp(1).await
});
let t2 = cache.get_or_insert_with("a", rr_unexpected::<usize>());
let (res1, res2) = futures::join!(t1, t2);
assert_eq!((res1.unwrap(), res2.unwrap()), (1, 1));
}
#[tokio::test]
async fn heterogeneous() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock, 100);
assert_eq!(Ok(42), cache.get_or_insert_with("a", rr(42)).await);
assert_eq!(Ok('X'), cache.get_or_insert_with("b", rr('X')).await);
assert_eq!(Ok('Y'), cache.get_or_insert_with("a", rr('Y')).await);
assert_eq!(Ok('Y'), cache.get_or_insert_with("a", rr('Z')).await);
}
#[tokio::test]
async fn eviction() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock, 250);
assert_eq!(Ok(100), cache.get_or_insert_with("a", rr_fp(100)).await,);
assert_eq!(Ok(100), cache.get_or_insert_with("b", rr_fp(100)).await,);
assert_eq!(
Ok(100),
cache
.get_or_insert_with("a", rr_unexpected::<usize>())
.await,
);
assert_eq!(Ok(100), cache.get_or_insert_with("c", rr_fp(100)).await,);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
assert_eq!(
Ok(100),
cache
.get_or_insert_with("a", rr_unexpected::<usize>())
.await,
);
assert_eq!(
Ok(42), cache.get_or_insert_with("b", rr_fp(42)).await,
);
}
#[tokio::test]
async fn once() {
let cache = Cache::<Arc<str>, _>::new(FakeClock::new(), 10000);
assert_eq!(cache.get_or_insert_with("a", rr('X')).await.unwrap(), 'X');
assert_eq!(
Ok('X'),
cache.get_or_insert_with("a", rr_unexpected::<char>()).await,
);
}
#[tokio::test]
async fn ultimate_expiry() {
let fake_clock = FakeClock::new();
fake_clock.set_time(0);
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), 10000);
assert_eq!(Ok('A'), cache.get_or_insert_with("a", rr('A')).await);
assert_eq!(
Ok('B'),
cache.get_or_insert_with("b", rr_expiry('B', 10)).await
);
assert_eq!(
Ok('C'),
cache.get_or_insert_with("c", rr_expiry('C', 20)).await
);
assert_eq!(
Ok('A'),
cache.get_or_insert_with("a", rr_unexpected::<char>()).await,
);
assert_eq!(
Ok('B'),
cache.get_or_insert_with("b", rr_unexpected::<char>()).await,
);
assert_eq!(
Ok('C'),
cache.get_or_insert_with("c", rr_unexpected::<char>()).await
);
fake_clock.set_time(15);
assert_eq!(
Ok('A'),
cache.get_or_insert_with("a", rr_unexpected::<char>()).await,
);
assert_eq!(
Ok('X'),
cache.get_or_insert_with("b", rr_expiry('X', 10)).await
);
assert_eq!(
Ok('C'),
cache.get_or_insert_with("c", rr_unexpected::<char>()).await,
);
}
#[tokio::test]
pub async fn stale() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), 10000);
let return_value = Arc::new(AtomicI32::new(100));
let my_retrieve = || async {
tokio::time::sleep(Duration::from_millis(1)).await;
let value = return_value.fetch_add(1, Ordering::Relaxed);
let result: Result<_, ()> = Ok(RetrievalResult {
value,
footprint: 100,
stale_expiry: Some(10),
ultimate_expiry: Some(20),
});
result
};
let actual = cache.get_or_insert_with("foo", my_retrieve()).await;
assert_eq!(Ok(100), actual);
let actual = cache.get_or_insert_with("foo", my_retrieve()).await;
assert_eq!(Ok(100), actual);
let actual = cache.get_or_insert_with("foo", my_retrieve()).await;
assert_eq!(Ok(100), actual);
fake_clock.set_time(15);
let results = join_all([
cache.get_or_insert_with("foo", my_retrieve()),
cache.get_or_insert_with("foo", my_retrieve()),
cache.get_or_insert_with("foo", my_retrieve()),
])
.await;
assert_eq!([Ok(101), Ok(100), Ok(100)], results.as_ref());
fake_clock.set_time(50);
let results = join_all([
cache.get_or_insert_with("foo", my_retrieve()),
cache.get_or_insert_with("foo", my_retrieve()),
cache.get_or_insert_with("foo", my_retrieve()),
])
.await;
assert_eq!([Ok(102), Ok(102), Ok(102)], results.as_ref());
}
#[tokio::test]
pub async fn lots_of_activity() {
const TICK_COUNT: i32 = 2048;
const HITS_PER_TICK: i32 = 32;
const RANDOM_SEED: u64 = 31337;
const CACHE_SIZE: usize = 100000;
const ITEM_MAX_SIZE: usize = 40000;
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), CACHE_SIZE);
let mut random = StdRng::seed_from_u64(RANDOM_SEED);
for tick in 0..TICK_COUNT {
fake_clock.set_time(tick);
let tasks = (0..HITS_PER_TICK).map(|_| {
let random_value = random.gen::<usize>() % ITEM_MAX_SIZE;
let key = format!("key{random_value}");
let value = format!("value{random_value}");
let stale_expiry =
(random_value & 0x100 != 0).then_some((random_value % 100) as i32);
let ultimate_expiry = stale_expiry.map(|x| x + 10);
let footprint = random_value % 1000 + 10;
let do_yield = random_value & 0x200 != 0;
let retrieval_result = RetrievalResult {
value: value.clone(),
footprint,
ultimate_expiry,
stale_expiry,
};
let cache = cache.clone();
let task = async move {
let result: Result<String, ()> = cache
.get_or_insert_with(key.as_str(), async {
if do_yield {
yield_now().await;
}
Ok(retrieval_result)
})
.await;
(value, result)
};
spawn(task)
});
for spawn_result in join_all(tasks).await {
let Ok((value, result)) = spawn_result else {
panic!("Spawn failed: {spawn_result:?}");
};
assert_eq!(Ok(value), result);
}
}
}
#[tokio::test]
async fn snowflake_timing_issue1() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), 100);
let cx = &mut Context::from_waker(&futures::task::noop_waker_ref());
let result = cache
.internal_get_or_insert_with("foo", rr_fp(200), async {}, async {})
.await;
assert_eq!(Ok((200, PostTask::SpawnMaintainCacheSize)), result);
let mut access_future =
pin!(cache.internal_get_or_insert_with("foo", rr_fp(200), fake_clock.wait_until(10), async {}));
assert_eq!(Poll::Pending, access_future.as_mut().poll(cx));
let mut maintain_cache_size_future =
pin!(cache.maintain_cache_size_once(async { unreachable!() }));
assert_eq!(
Poll::Ready(true),
maintain_cache_size_future.as_mut().poll(cx)
);
fake_clock.set_time(10);
assert_eq!(
Poll::Ready(Ok((200, PostTask::None))),
access_future.as_mut().poll(cx)
);
fake_clock.set_time(50);
assert!(cache.maintain_cache_size_once(async {}).await);
let cache_size = cache.cache_size.load(Ordering::Relaxed);
let cache_queues_are_empty = cache.queues.lock().await.is_empty();
assert_eq!((0, true), (cache_size, cache_queues_are_empty));
}
#[tokio::test]
async fn snowflake_timing_issue2() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), usize::MAX);
let cx = &mut Context::from_waker(&futures::task::noop_waker_ref());
let retrieval_result = RetrievalResult {
value: 1234,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let result: Result<_, ()> = cache
.internal_get_or_insert_with("foo", async { Ok(retrieval_result) }, async {}, async {})
.await;
assert_eq!(Ok((1234, PostTask::None)), result);
fake_clock.set_time(190);
let retrieval_result = RetrievalResult {
value: 2345,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let mut access_future = pin!(cache.internal_get_or_insert_with(
"foo",
async {
fake_clock.wait_until(220).await;
let result: Result<_, ()> = Ok(retrieval_result);
result
},
async {},
async {}
));
assert_eq!(Poll::Pending, access_future.as_mut().poll(cx));
fake_clock.set_time(210);
let retrieval_result = RetrievalResult {
value: 3456,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let result: Result<_, ()> = cache
.internal_get_or_insert_with("not_foo", async { Ok(retrieval_result) }, async {}, async {})
.await;
assert_eq!(Ok((3456, PostTask::None)), result);
assert_eq!(Poll::Pending, access_future.as_mut().poll(cx));
fake_clock.set_time(220);
assert_eq!(
Poll::Ready(Ok((2345, PostTask::None))),
access_future.as_mut().poll(cx)
);
}
#[tokio::test]
async fn snowflake_timing_issue3() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), usize::MAX);
let cx = &mut Context::from_waker(&futures::task::noop_waker_ref());
let retrieval_result = RetrievalResult {
value: 1234,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let result: Result<_, ()> = cache
.internal_get_or_insert_with("foo", async { Ok(retrieval_result) }, async {}, async {})
.await;
assert_eq!(Ok((1234, PostTask::None)), result);
fake_clock.set_time(150);
let retrieval_result = RetrievalResult {
value: 2345,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let mut access_future1 = pin!(cache.internal_get_or_insert_with(
"foo",
async {
fake_clock.wait_until(300).await;
let result: Result<_, ()> = Ok(retrieval_result);
result
},
async {},
async {}
));
assert_eq!(Poll::Pending, access_future1.as_mut().poll(cx));
let result = cache
.internal_get_or_insert_with("foo", rr_unexpected(), async {}, async {})
.await;
assert_eq!(Ok((1234, PostTask::None)), result);
assert_eq!(Poll::Pending, access_future1.as_mut().poll(cx));
fake_clock.set_time(250);
let mut access_future2 =
pin!(cache.internal_get_or_insert_with("foo", rr_unexpected(), async {}, async {}));
assert_eq!(Poll::Pending, access_future1.as_mut().poll(cx));
assert_eq!(Poll::Pending, access_future2.as_mut().poll(cx));
fake_clock.set_time(300);
assert_eq!(
Poll::Ready(Ok((2345, PostTask::None))),
access_future1.as_mut().poll(cx)
);
assert_eq!(
Poll::Ready(Ok((2345, PostTask::None))),
access_future2.as_mut().poll(cx)
);
}
#[tokio::test]
async fn snowflake_timing_issue4() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), usize::MAX);
let cx = &mut Context::from_waker(&futures::task::noop_waker_ref());
let retrieval_result = RetrievalResult {
value: 1234,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let result: Result<_, ()> = cache
.internal_get_or_insert_with("foo", async { Ok(retrieval_result) }, async {}, async {})
.await;
assert_eq!(Ok((1234, PostTask::None)), result);
fake_clock.set_time(150);
let retrieval_result = RetrievalResult {
value: 2345,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let mut access_future1 = pin!(cache.internal_get_or_insert_with(
"foo",
async {
fake_clock.wait_until(250).await;
let result: Result<_, ()> = Ok(retrieval_result);
result
},
async {},
async {},
));
assert_eq!(Poll::Pending, access_future1.as_mut().poll(cx));
fake_clock.set_time(175);
let retrieval_result = RetrievalResult {
value: 2345,
footprint: 100,
stale_expiry: Some(10000),
ultimate_expiry: Some(20000),
};
let mut access_future2 = pin!(cache.internal_get_or_insert_with(
"not foo",
async {
let result: Result<_, ()> = Ok(retrieval_result);
result
},
async {},
async {
fake_clock.wait_until(300).await;
},
));
assert_eq!(Poll::Pending, access_future2.as_mut().poll(cx));
fake_clock.set_time(210);
let retrieval_result = RetrievalResult {
value: 2345,
footprint: 100,
stale_expiry: Some(10000),
ultimate_expiry: Some(20000),
};
let mut access_future3 = pin!(cache.internal_get_or_insert_with(
"not foo again",
async {
let result: Result<_, ()> = Ok(retrieval_result);
result
},
async {},
async {},
));
assert_eq!(Poll::Pending, access_future3.as_mut().poll(cx));
fake_clock.set_time(310);
assert_eq!(Poll::Pending, access_future1.as_mut().poll(cx));
assert_eq!(Poll::Ready(Ok((2345, PostTask::None))), access_future2.as_mut().poll(cx));
assert_eq!(Poll::Pending, access_future3.as_mut().poll(cx));
assert_eq!(Poll::Ready(Ok((2345, PostTask::None))), access_future1.as_mut().poll(cx));
assert_eq!(Poll::Ready(Ok((2345, PostTask::None))), access_future3.as_mut().poll(cx));
fake_clock.set_time(800);
let retrieval_result = RetrievalResult {
value: 2345,
footprint: 100,
stale_expiry: Some(10000),
ultimate_expiry: Some(20000),
};
let mut access_future4 = pin!(cache.internal_get_or_insert_with(
"not foo again 2",
async {
let result: Result<_, ()> = Ok(retrieval_result);
result
},
async {},
async {},
));
assert_eq!(Poll::Ready(Ok((2345, PostTask::None))), access_future4.as_mut().poll(cx));
}
#[tokio::test]
async fn snowflake_timing_issue5() {
let fake_clock = FakeClock::new();
let cache = Cache::<Arc<str>, _>::new(fake_clock.clone(), usize::MAX);
let retrieval_result = RetrievalResult {
value: 1234,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let result: Result<_, ()> = cache
.internal_get_or_insert_with("foo", async { Ok(retrieval_result) }, async {}, async {})
.await;
assert_eq!(Ok((1234, PostTask::None)), result);
fake_clock.set_time(150);
let result: Result<((), PostTask), &'static str> = cache
.internal_get_or_insert_with("foo", async { Err("Some error occured") }, async {}, async {})
.await;
assert_eq!(Err("Some error occured"), result);
fake_clock.set_time(250);
let retrieval_result = RetrievalResult {
value: 1234,
footprint: 100,
stale_expiry: Some(100),
ultimate_expiry: Some(200),
};
let result: Result<_, ()> = cache
.internal_get_or_insert_with("not foo", async { Ok(retrieval_result) }, async {}, async {})
.await;
assert_eq!(Ok((1234, PostTask::None)), result);
}
}