use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Duration,
};
use dashmap::{DashMap, DashSet};
use uuid::Uuid;
use file_backed::backing_store::{BackingStore, BackingStoreT, Strategy};
use file_backed::{FBPool, Fb, convenience::blocking_save};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TestData {
pub id: u32,
pub content: String,
}
#[derive(Debug, Clone)]
pub struct TestStore {
temp_data: Arc<DashMap<Uuid, TestData>>,
persisted_data: Arc<DashMap<PathBuf, DashSet<Uuid>>>,
pub call_counts: Arc<CallCounts>,
store_sleep_duration: Duration,
}
#[derive(Debug, Default)]
pub struct CallCounts {
pub store: AtomicUsize,
pub load: AtomicUsize,
pub delete: AtomicUsize,
pub delete_persisted: AtomicUsize,
pub register: AtomicUsize,
pub persist: AtomicUsize,
pub all_persisted_keys: AtomicUsize,
pub sync_persisted: AtomicUsize,
}
impl TestStore {
pub fn new(store_sleep_duration: Duration) -> Self {
Self {
temp_data: Arc::new(DashMap::new()),
persisted_data: Arc::new(DashMap::new()),
call_counts: Arc::new(Default::default()),
store_sleep_duration,
}
}
pub fn new_no_sleep() -> Self {
Self::new(Duration::from_millis(0))
}
pub fn _add_persisted(&self, path: &Path, key: Uuid, data: TestData) {
self.temp_data.insert(key, data); self.persisted_data
.entry(path.to_path_buf())
.or_default()
.insert(key);
println!("TestStore: Pre-added persisted key {} at {:?}", key, path);
}
pub fn get_temp_keys(&self) -> HashSet<Uuid> {
self.temp_data.iter().map(|entry| *entry.key()).collect()
}
pub fn get_persisted_keys(&self, path: &Path) -> HashSet<Uuid> {
self.persisted_data
.get(path)
.map_or_else(HashSet::new, |set| set.iter().map(|k| *k).collect())
}
}
impl BackingStoreT for TestStore {
type PersistPath = PathBuf;
fn delete(&self, key: Uuid) {
self.call_counts.delete.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Deleting temp key {}", key);
let removed_entry = self.temp_data.remove(&key);
assert!(
removed_entry.is_some(),
"Attempted to delete non-existent temp key: {}",
key
);
}
fn delete_persisted(&self, path: &Self::PersistPath, key: Uuid) {
self.call_counts
.delete_persisted
.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Deleting persisted key {} from {:?}", key, path);
let path_entry = self.persisted_data.get_mut(path).unwrap_or_else(|| {
panic!(
"Attempted to delete persisted key {} from non-tracked path {:?}",
key, path
)
});
let removed = path_entry.remove(&key);
assert!(
removed.is_some(),
"Attempted to delete non-existent persisted key {} from path {:?}",
key,
path
);
}
fn register(&self, src_path: &Self::PersistPath, key: Uuid) {
self.call_counts.register.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Registering key {} from {:?}", key, src_path);
assert!(
self.persisted_data
.get(src_path)
.is_some_and(|s| s.contains(&key)),
"Attempted to register key {} which is not persisted at {:?}",
key,
src_path
);
assert!(
self.temp_data.contains_key(&key),
"Registered key {} not found in temp data (should have been pre-added for mock)",
key
);
}
fn persist(&self, dest_path: &Self::PersistPath, key: Uuid) {
self.call_counts.persist.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Persisting key {} to {:?}", key, dest_path);
assert!(
self.temp_data.contains_key(&key),
"Attempted to persist non-existent temp key: {}",
key
);
self.persisted_data
.entry(dest_path.to_path_buf())
.or_default()
.insert(key);
}
fn sanitize_path(&self, path: &Self::PersistPath) -> impl IntoIterator<Item = Uuid> {
self.call_counts
.all_persisted_keys
.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Getting all persisted keys for {:?}", path);
match self.persisted_data.get(path) {
Some(keys) => keys.iter().map(|k| *k).collect::<Vec<_>>(),
None => vec![], }
}
fn sync_persisted(&self, path: &Self::PersistPath) {
self.call_counts
.sync_persisted
.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Syncing persisted path {:?}", path);
}
}
impl Strategy<TestData> for TestStore {
fn store(&self, key: Uuid, data: &TestData) {
println!("TestStore: Storing key {} data {:?} - STARTING", key, data);
if !self.store_sleep_duration.is_zero() {
println!("TestStore: Sleeping for {:?}", self.store_sleep_duration);
std::thread::sleep(self.store_sleep_duration);
}
self.temp_data.insert(key, data.clone());
self.call_counts.store.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Storing key {} data {:?} - COMPLETED", key, data);
}
fn load(&self, key: Uuid) -> TestData {
self.call_counts.load.fetch_add(1, Ordering::SeqCst);
println!("TestStore: Loading key {}", key);
let entry = self
.temp_data
.get(&key)
.unwrap_or_else(|| panic!("Attempted to load non-existent temp key: {}", key));
entry.value().clone()
}
}
pub struct TestSetup {
pub runtime: tokio::runtime::Handle,
pub store_impl: Arc<TestStore>,
pub backing_store: Arc<BackingStore<Arc<TestStore>>>,
pub pool: Arc<FBPool<TestData, Arc<TestStore>>>,
pub calls: Arc<CallCounts>,
}
pub fn setup(mem_size: usize) -> TestSetup {
setup_with_store_sleep(mem_size, Duration::from_millis(0)) }
pub fn setup_with_store_sleep(mem_size: usize, store_sleep: Duration) -> TestSetup {
let runtime = tokio::runtime::Handle::current();
let store_impl = Arc::new(TestStore::new(store_sleep));
let backing_store = Arc::new(BackingStore::new(store_impl.clone(), runtime.clone()));
let pool = Arc::new(FBPool::new(backing_store.clone(), mem_size));
let calls = store_impl.call_counts.clone();
TestSetup {
runtime,
store_impl,
backing_store,
pool,
calls,
}
}
pub async fn wait_for_store(backing_store: &Arc<BackingStore<Arc<TestStore>>>) {
backing_store.finished().await;
}
fn test_path_a() -> PathBuf {
PathBuf::from("/test/persist/a")
}
fn test_path_b() -> PathBuf {
PathBuf::from("/test/persist/b")
}
#[tokio::test]
async fn test_insert_and_load_async() {
let setup = setup(10);
let data1 = TestData {
id: 1,
content: "hello".to_string(),
};
let arc1 = Arc::new(setup.pool.insert(data1.clone()));
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert_eq!(setup.pool.size(), 1);
assert_eq!(Arc::strong_count(&arc1), 1);
let guard1 = arc1.load().await;
assert_eq!(*guard1, data1);
drop(guard1);
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
let arc2 = arc1.clone();
drop(arc1);
drop(arc2);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert_eq!(setup.pool.size(), 0);
assert!(setup.store_impl.get_temp_keys().is_empty());
}
#[tokio::test]
async fn test_eviction_triggers_store_reload_triggers_load() {
let setup = setup(1); let data1 = TestData {
id: 1,
content: "data1".to_string(),
};
let data2 = TestData {
id: 2,
content: "data2".to_string(),
};
let arc1 = setup.pool.insert(data1.clone());
let key1 = arc1.key();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let arc2 = setup.pool.insert(data2.clone());
let key2 = arc2.key();
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert!(setup.store_impl.get_temp_keys().contains(&key1));
assert!(!setup.store_impl.get_temp_keys().contains(&key2));
let guard1 = arc1.load().await; assert_eq!(*guard1, data1);
drop(guard1);
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.store.load(Ordering::SeqCst), 2);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
assert!(setup.store_impl.get_temp_keys().contains(&key1));
assert!(setup.store_impl.get_temp_keys().contains(&key2));
let guard2 = arc2.load().await; assert_eq!(*guard2, data2);
drop(guard2);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 2); assert_eq!(setup.calls.load.load(Ordering::SeqCst), 2);
drop(arc1);
drop(arc2);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 2);
assert_eq!(setup.pool.size(), 0);
assert!(setup.store_impl.get_temp_keys().is_empty());
}
#[tokio::test]
async fn test_write_now_triggers_store_drop_triggers_delete() {
let setup = setup(10);
let data1 = TestData {
id: 10,
content: "write_now_delete".to_string(),
};
let arc1 = setup.pool.insert(data1.clone());
let key1 = arc1.key();
assert!(!setup.store_impl.get_temp_keys().contains(&key1));
let write_handle = arc1.spawn_write_now().await; write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert!(setup.store_impl.get_temp_keys().contains(&key1));
let write_handle2 = arc1.spawn_write_now().await;
write_handle2.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
drop(arc1);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&key1));
assert_eq!(setup.pool.size(), 0);
}
#[tokio::test]
async fn test_register_does_not_load_or_store_first_load_does() {
let setup = setup(10);
let reg_key = Uuid::new_v4();
let reg_path = test_path_a();
let reg_data = TestData {
id: 99,
content: "registered".to_string(),
};
setup
.store_impl
._add_persisted(®_path, reg_key, reg_data.clone());
assert!(setup.store_impl.get_temp_keys().contains(®_key)); setup.calls.store.store(0, Ordering::SeqCst);
let tracked_path = setup
.backing_store
.track_path(reg_path.clone())
.await
.unwrap();
let tracked_path = Arc::new(tracked_path);
assert!(tracked_path.all_keys().contains(®_key));
assert_eq!(setup.calls.all_persisted_keys.load(Ordering::SeqCst), 1);
let maybe_arc = setup.pool.register(&tracked_path, reg_key).await;
let arc = maybe_arc.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.register.load(Ordering::SeqCst), 1);
assert_eq!(setup.pool.size(), 1);
assert_eq!(arc.key(), reg_key);
let guard = arc.load().await;
assert_eq!(*guard, reg_data);
drop(guard);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
let guard2 = arc.load().await;
assert_eq!(*guard2, reg_data);
drop(guard2);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
drop(arc);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(®_key));
assert_eq!(setup.pool.size(), 0);
}
#[tokio::test]
async fn test_persist_triggers_store_if_not_written_noop_if_done() {
let setup = setup(10);
let data = TestData {
id: 5,
content: "persist_store".to_string(),
};
let persist_path = test_path_b();
let arc = setup.pool.insert(data.clone());
let key = arc.key();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 0);
assert!(!setup.store_impl.get_temp_keys().contains(&key));
let tracked_path = setup
.backing_store
.track_path(persist_path.clone())
.await
.unwrap();
let tracked_path = Arc::new(tracked_path);
assert!(!tracked_path.all_keys().contains(&key));
let persist_handle = arc.spawn_persist(&tracked_path).await;
persist_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 1);
assert!(setup.store_impl.get_temp_keys().contains(&key));
assert!(
setup
.store_impl
.get_persisted_keys(&persist_path)
.contains(&key)
);
let persist_handle2 = arc.spawn_persist(&tracked_path).await; persist_handle2.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 1);
drop(arc);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&key));
assert!(
setup
.store_impl
.get_persisted_keys(&persist_path)
.contains(&key)
);
let delete_persisted_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let tracked_clone = tracked_path.clone();
move || store.blocking_delete_persisted(&tracked_clone, key)
});
delete_persisted_handle.await.unwrap();
assert!(
!setup
.store_impl
.get_persisted_keys(&persist_path)
.contains(&key)
);
}
#[tokio::test]
async fn test_persist_is_noop_if_already_persisted() {
let setup = setup(10);
let key = Uuid::new_v4();
let path = test_path_a();
let data = TestData {
id: 100,
content: "already_persisted".to_string(),
};
setup.store_impl._add_persisted(&path, key, data.clone());
assert!(setup.store_impl.get_persisted_keys(&path).contains(&key));
assert!(setup.store_impl.get_temp_keys().contains(&key)); setup.calls.store.store(0, Ordering::SeqCst);
let tracked_path = setup.backing_store.track_path(path.clone()).await.unwrap();
let tracked_path = Arc::new(tracked_path);
assert!(tracked_path.all_keys().contains(&key)); assert_eq!(setup.calls.all_persisted_keys.load(Ordering::SeqCst), 1);
let maybe_arc = setup.pool.register(&tracked_path, key).await;
let arc = maybe_arc.unwrap();
setup.calls.store.store(0, Ordering::SeqCst);
setup.calls.persist.store(0, Ordering::SeqCst);
setup.calls.load.store(0, Ordering::SeqCst);
setup.calls.register.store(0, Ordering::SeqCst);
let persist_handle = arc.spawn_persist(&tracked_path).await;
persist_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 0);
let guard = arc.load().await;
assert_eq!(*guard, data);
drop(guard);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
drop(arc);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&key)); assert!(setup.store_impl.get_persisted_keys(&path).contains(&key));
let delete_persisted_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let tracked_clone = tracked_path.clone();
move || store.blocking_delete_persisted(&tracked_clone, key)
});
delete_persisted_handle.await.unwrap();
assert!(!setup.store_impl.get_persisted_keys(&path).contains(&key));
}
#[tokio::test]
async fn test_try_load_mut_unique_deletes_original_temp() {
let setup = setup(10);
let data = TestData {
id: 20,
content: "mutate_unique_del".to_string(),
};
let mut item = setup.pool.insert(data.clone());
let original_key = item.key();
let write_handle = item.spawn_write_now().await;
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert!(setup.store_impl.get_temp_keys().contains(&original_key));
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
let mut guard = item.load_mut().await; guard.content = "mutated_content".to_string();
drop(guard); let new_key = item.key();
assert_ne!(original_key, new_key);
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key)); assert!(!setup.store_impl.get_temp_keys().contains(&new_key));
drop(item);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&new_key));
assert_eq!(setup.pool.size(), 0);
}
#[tokio::test]
async fn test_try_load_mut_unique_original_not_written() {
let setup = setup(10);
let data = TestData {
id: 23,
content: "mutate_unique_not_written".to_string(),
};
let mut item = setup.pool.insert(data.clone());
let original_key = item.key();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key));
let mut guard = item.load_mut().await;
guard.content = "mutated_content".to_string();
drop(guard);
let new_key = item.key();
assert_ne!(original_key, new_key);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0); assert!(!setup.store_impl.get_temp_keys().contains(&original_key));
assert!(!setup.store_impl.get_temp_keys().contains(&new_key));
drop(item);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert_eq!(setup.pool.size(), 0);
}
#[tokio::test]
async fn test_make_mut_shared_clones_no_delete_during_call() {
let setup = setup(10);
let data = TestData {
id: 31,
content: "make_mut_shared_clone".to_string(),
};
let mut arc1 = Arc::new(setup.pool.insert(data.clone()));
let arc2 = arc1.clone(); let original_key = arc1.key();
let write_handle = arc1.spawn_write_now().await;
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert!(setup.store_impl.get_temp_keys().contains(&original_key));
let mut guard = arc1.make_mut().await; guard.id = 32;
guard.content = "mutated_via_clone".to_string();
drop(guard);
let new_key = arc1.key();
assert_ne!(original_key, new_key);
assert_eq!(arc2.key(), original_key); assert!(!Arc::ptr_eq(&arc1, &arc2));
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0); assert!(setup.store_impl.get_temp_keys().contains(&original_key)); assert!(!setup.store_impl.get_temp_keys().contains(&new_key));
drop(arc1);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert!(!setup.store_impl.get_temp_keys().contains(&new_key));
drop(arc2);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key));
assert_eq!(setup.pool.size(), 0);
}
#[tokio::test]
async fn test_blocking_save_convenience() {
let setup = setup(10);
let path = test_path_a();
let data1 = TestData {
id: 101,
content: "save1".to_string(),
};
let data2 = TestData {
id: 102,
content: "save2".to_string(),
};
let existing_key = Uuid::new_v4();
let existing_data = TestData {
id: 100,
content: "existing".to_string(),
};
setup
.store_impl
._add_persisted(&path, existing_key, existing_data.clone());
let arc1 = Arc::new(setup.pool.insert(data1));
let arc2 = Arc::new(setup.pool.insert(data2));
let key1 = arc1.key();
let key2 = arc2.key();
let tracked = Arc::new(setup.backing_store.blocking_track_path(path.clone())); assert!(tracked.all_keys().contains(&existing_key));
let change_key_called = Arc::new(AtomicBool::new(false));
let save_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let arcs_to_save = vec![arc1.clone(), arc2.clone()];
let tracked_clone = tracked.clone();
let flag = change_key_called.clone();
move || {
let change_key_closure = move || -> Result<i32, String> {
println!("Change Key Closure Called!");
flag.store(true, Ordering::SeqCst);
Ok(42)
};
blocking_save(
&store,
arcs_to_save, &tracked_clone,
4,
change_key_closure,
)
}
});
let result = save_handle.await.unwrap(); assert!(result.is_ok());
assert_eq!(result.unwrap(), 42);
setup.backing_store.finished().await;
assert!(change_key_called.load(Ordering::SeqCst));
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 2);
assert_eq!(setup.calls.sync_persisted.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete_persisted.load(Ordering::SeqCst), 1);
let persisted_keys = setup.store_impl.get_persisted_keys(&path);
assert!(persisted_keys.contains(&key1));
assert!(persisted_keys.contains(&key2));
assert!(!persisted_keys.contains(&existing_key));
drop(arc1);
drop(arc2);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_guard_held_item_evicted_then_dumped_on_guard_drop() {
let setup = setup(2); let data_a = TestData {
id: 1,
content: "Item A".to_string(),
};
let data_b = TestData {
id: 2,
content: "Item B".to_string(),
};
let data_c = TestData {
id: 3,
content: "Item C".to_string(),
};
let arc_a = setup.pool.insert(data_a.clone()); let key_a = arc_a.key();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let guard_a = arc_a.load().await; assert_eq!(*guard_a, data_a);
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let arc_b = setup.pool.insert(data_b.clone()); let key_b = arc_b.key(); assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let arc_c = setup.pool.insert(data_c.clone()); let key_c = arc_c.key(); wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert!(!setup.store_impl.get_temp_keys().contains(&key_a));
drop(guard_a);
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert!(setup.store_impl.get_temp_keys().contains(&key_a));
let data_d = TestData {
id: 4,
content: "Item D".to_string(),
};
let _arc_d = setup.pool.insert(data_d.clone()); wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.store.load(Ordering::SeqCst), 2);
assert!(setup.store_impl.get_temp_keys().contains(&key_b));
drop(arc_a); drop(arc_b); drop(arc_c); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 2);
assert!(!setup.store_impl.get_temp_keys().contains(&key_a));
assert!(!setup.store_impl.get_temp_keys().contains(&key_b));
assert!(!setup.store_impl.get_temp_keys().contains(&key_c)); }
#[tokio::test]
async fn test_lru_strict_half_behavior_on_access() {
{
println!("--- LRU Strict Half Test: Scenario 1 (Oldest A Evicted) ---");
let setup = setup(4); let items = (0..5)
.map(|i| TestData {
id: i,
content: format!("Item {}", i),
})
.collect::<Vec<_>>();
let arcs = items[0..4]
.iter()
.map(|d| setup.pool.insert(d.clone()))
.collect::<Vec<_>>();
let keys: Vec<Uuid> = arcs.iter().map(|a| a.key()).collect(); assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let _guard_c = arcs[2].load().await;
drop(_guard_c);
let _guard_b = arcs[1].load().await;
drop(_guard_b);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let _arc_e = setup.pool.insert(items[4].clone()); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert!(setup.store_impl.get_temp_keys().contains(&keys[0])); assert!(!setup.store_impl.get_temp_keys().contains(&keys[1])); assert!(!setup.store_impl.get_temp_keys().contains(&keys[2])); assert!(!setup.store_impl.get_temp_keys().contains(&keys[3])); }
{
println!("--- LRU Strict Half Test: Scenario 2 (Oldest C Evicted) ---");
let setup = setup(4); let items = (0..5)
.map(|i| TestData {
id: i,
content: format!("Item {}", i),
})
.collect::<Vec<_>>();
let arcs = items[0..4]
.iter()
.map(|d| setup.pool.insert(d.clone()))
.collect::<Vec<_>>();
let keys: Vec<Uuid> = arcs.iter().map(|a| a.key()).collect(); assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let _guard_c = arcs[2].load().await;
drop(_guard_c);
let _guard_b = arcs[1].load().await;
drop(_guard_b);
let _guard_a = arcs[0].load().await;
drop(_guard_a);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let _arc_e = setup.pool.insert(items[4].clone()); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert!(!setup.store_impl.get_temp_keys().contains(&keys[0])); assert!(!setup.store_impl.get_temp_keys().contains(&keys[1])); assert!(setup.store_impl.get_temp_keys().contains(&keys[2])); assert!(!setup.store_impl.get_temp_keys().contains(&keys[3])); } }
#[tokio::test]
async fn test_register_fails_if_key_not_in_tracked_path() {
let setup = setup(10);
let path = test_path_a();
let missing_key = Uuid::new_v4();
let tracked_path = setup.backing_store.track_path(path.clone()).await.unwrap();
let tracked_path = Arc::new(tracked_path);
assert!(!tracked_path.all_keys().contains(&missing_key)); let initial_register_calls = setup.calls.register.load(Ordering::SeqCst);
let initial_pool_size = setup.pool.size();
let result_arc = setup.pool.register(&tracked_path, missing_key).await;
assert!(result_arc.is_none()); assert_eq!(setup.pool.size(), initial_pool_size); assert_eq!(
setup.calls.register.load(Ordering::SeqCst),
initial_register_calls
);
let register_handle = tokio::task::spawn_blocking({
let pool = setup.pool.clone();
let tracked = tracked_path.clone();
move || pool.blocking_register(&tracked, missing_key)
});
let result_blocking = register_handle.await.unwrap();
assert!(result_blocking.is_none()); assert_eq!(setup.pool.size(), initial_pool_size);
assert_eq!(
setup.calls.register.load(Ordering::SeqCst),
initial_register_calls
);
}
#[tokio::test]
async fn test_try_load_mut_triggers_load_if_not_cached() {
let setup = setup(1); let data_a = TestData {
id: 50,
content: "Load For Mut A".to_string(),
};
let data_b = TestData {
id: 51,
content: "Load For Mut B".to_string(),
};
let mut item_a = setup.pool.insert(data_a.clone());
let original_key_a = item_a.key();
let write_handle = item_a.spawn_write_now().await;
write_handle.await.unwrap(); assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert!(setup.store_impl.get_temp_keys().contains(&original_key_a));
let _arc_b = setup.pool.insert(data_b.clone());
wait_for_store(&setup.backing_store).await; let stores_after_b_insert = setup.calls.store.load(Ordering::SeqCst);
let mut guard = item_a.load_mut().await;
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key_a));
guard.content = "Mutated A after load".to_string();
drop(guard);
let new_key_a = item_a.key(); assert_ne!(original_key_a, new_key_a);
let final_guard = item_a.load().await;
assert_eq!(final_guard.content, "Mutated A after load");
drop(final_guard);
assert!(setup.calls.store.load(Ordering::SeqCst) <= stores_after_b_insert + 1);
drop(item_a); wait_for_store(&setup.backing_store).await;
assert!(setup.calls.delete.load(Ordering::SeqCst) >= 1);
}
#[tokio::test]
async fn test_blocking_save_skips_already_persisted() {
let setup = setup(10);
let path = test_path_b(); let data_a = TestData {
id: 70,
content: "Save A".to_string(),
};
let data_b = TestData {
id: 71,
content: "Save B".to_string(),
};
let arc_a = Arc::new(setup.pool.insert(data_a.clone()));
let arc_b = Arc::new(setup.pool.insert(data_b.clone()));
let key_a = arc_a.key();
let key_b = arc_b.key();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.sync_persisted.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete_persisted.load(Ordering::SeqCst), 0);
let tracked_path_1 = setup.backing_store.track_path(path.clone()).await.unwrap();
let tracked_path_1 = Arc::new(tracked_path_1);
assert!(tracked_path_1.all_keys().is_empty());
let save_handle_1 = tokio::task::spawn_blocking({
let store_clone = setup.backing_store.clone();
let arcs_clone = vec![arc_a.clone()]; let tracked_clone = tracked_path_1.clone();
move || -> Result<(), String> {
blocking_save(&store_clone, arcs_clone, &tracked_clone, 1, || Ok(()))
}
});
let result1 = save_handle_1.await.unwrap();
assert!(result1.is_ok());
setup.backing_store.finished().await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.sync_persisted.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete_persisted.load(Ordering::SeqCst), 0);
assert!(setup.store_impl.get_persisted_keys(&path).contains(&key_a)); assert!(!setup.store_impl.get_persisted_keys(&path).contains(&key_b)); assert!(setup.store_impl.get_temp_keys().contains(&key_a));
let tracked_path_2 = setup.backing_store.track_path(path.clone()).await.unwrap();
let tracked_path_2 = Arc::new(tracked_path_2);
assert!(tracked_path_2.all_keys().contains(&key_a)); assert!(!tracked_path_2.all_keys().contains(&key_b));
let save_handle_2 = tokio::task::spawn_blocking({
let store_clone = setup.backing_store.clone();
let arcs_clone = vec![arc_a.clone(), arc_b.clone()]; let tracked_clone = tracked_path_2.clone();
move || -> Result<(), String> {
blocking_save(&store_clone, arcs_clone, &tracked_clone, 2, || Ok(()))
}
});
let result2 = save_handle_2.await.unwrap();
assert!(result2.is_ok());
setup.backing_store.finished().await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 2);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 2);
assert_eq!(setup.calls.sync_persisted.load(Ordering::SeqCst), 2);
assert_eq!(setup.calls.delete_persisted.load(Ordering::SeqCst), 0);
let final_persisted_keys = setup.store_impl.get_persisted_keys(&path);
assert_eq!(final_persisted_keys.len(), 2);
assert!(final_persisted_keys.contains(&key_a)); assert!(final_persisted_keys.contains(&key_b)); assert!(setup.store_impl.get_temp_keys().contains(&key_a)); assert!(setup.store_impl.get_temp_keys().contains(&key_b));
drop(arc_a);
drop(arc_b);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_blocking_save_more_items_than_max_simultaneous() {
let setup = setup(10); let path = test_path_a();
let num_items = 5;
let max_simultaneous = 2;
let items: Vec<TestData> = (0..num_items)
.map(|i| TestData {
id: 100 + i,
content: format!("Item {}", i),
})
.collect();
let arcs: Vec<Arc<Fb<TestData, Arc<TestStore>>>> = items
.iter()
.map(|d| Arc::new(setup.pool.insert(d.clone())))
.collect();
let keys: Vec<Uuid> = arcs.iter().map(|a| a.key()).collect();
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let tracked_path = setup.backing_store.track_path(path.clone()).await.unwrap();
let tracked_path = Arc::new(tracked_path);
assert!(tracked_path.all_keys().is_empty());
let save_handle = tokio::task::spawn_blocking({
let store_clone = setup.backing_store.clone();
let arcs_clone = arcs.clone();
let tracked_clone = tracked_path.clone();
move || -> Result<usize, String> {
let change_key_closure = || -> Result<usize, String> { Ok(num_items as usize) };
blocking_save(
&store_clone,
arcs_clone,
&tracked_clone,
max_simultaneous, change_key_closure,
)
}
});
let result = save_handle.await.unwrap(); assert!(result.is_ok());
assert_eq!(result.unwrap(), num_items as usize);
setup.backing_store.finished().await;
assert_eq!(
setup.calls.persist.load(Ordering::SeqCst),
num_items as usize
);
assert_eq!(setup.calls.store.load(Ordering::SeqCst), num_items as usize);
assert_eq!(setup.calls.sync_persisted.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete_persisted.load(Ordering::SeqCst), 0);
let persisted_keys_set = setup.store_impl.get_persisted_keys(&path);
assert_eq!(persisted_keys_set.len(), num_items as usize);
for key in &keys {
assert!(persisted_keys_set.contains(key));
}
drop(arcs); wait_for_store(&setup.backing_store).await;
assert_eq!(
setup.calls.delete.load(Ordering::SeqCst),
num_items as usize
);
let cleanup_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let tracked_clone = tracked_path.clone();
let keys_clone = keys;
move || {
for key in keys_clone {
store.blocking_delete_persisted(&tracked_clone, key);
}
}
});
cleanup_handle.await.unwrap();
}
#[tokio::test]
async fn test_blocking_register_success() {
let setup = setup(10);
let reg_key = Uuid::new_v4();
let reg_path = test_path_a();
let reg_data = TestData {
id: 200,
content: "blocking_register_data".to_string(),
};
setup
.store_impl
._add_persisted(®_path, reg_key, reg_data.clone());
setup.calls.store.store(0, Ordering::SeqCst);
let track_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let path = reg_path.clone();
move || store.blocking_track_path(path)
});
let tracked_path = Arc::new(track_handle.await.unwrap());
assert!(tracked_path.all_keys().contains(®_key));
let initial_pool_size = setup.pool.size();
let register_handle = tokio::task::spawn_blocking({
let pool = setup.pool.clone();
let tracked = tracked_path.clone();
move || pool.blocking_register(&tracked, reg_key)
});
let result_arc = register_handle.await.unwrap();
assert!(result_arc.is_some()); let arc = result_arc.unwrap();
assert_eq!(arc.key(), reg_key);
assert_eq!(setup.pool.size(), initial_pool_size + 1);
assert_eq!(setup.calls.register.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
drop(arc);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn test_blocking_load_success() {
let setup = setup(1); let data_a = TestData {
id: 210,
content: "blocking_load_A".to_string(),
};
let data_b = TestData {
id: 211,
content: "blocking_load_B".to_string(),
};
let arc_a = Arc::new(setup.pool.insert(data_a.clone()));
let key_a = arc_a.key();
tokio::task::spawn_blocking({
let a = arc_a.clone();
move || a.blocking_write_now()
})
.await
.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert!(setup.store_impl.get_temp_keys().contains(&key_a));
let _item_b = setup.pool.insert(data_b.clone());
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let load_handle = tokio::task::spawn_blocking({
let arc_clone = arc_a.clone();
move || {
let guard = arc_clone.blocking_load();
guard.clone() }
});
let loaded_data = load_handle.await.unwrap();
assert_eq!(loaded_data, data_a);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
drop(arc_a);
wait_for_store(&setup.backing_store).await; }
#[tokio::test]
async fn test_blocking_write_now_success() {
let setup = setup(10);
let data = TestData {
id: 220,
content: "blocking_write_now".to_string(),
};
let arc = Arc::new(setup.pool.insert(data.clone()));
let key = arc.key();
assert!(!setup.store_impl.get_temp_keys().contains(&key));
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
let write_handle = tokio::task::spawn_blocking({
let arc_clone = arc.clone();
move || arc_clone.blocking_write_now()
});
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1); assert!(setup.store_impl.get_temp_keys().contains(&key));
let write_handle2 = tokio::task::spawn_blocking({
let arc_clone = arc.clone();
move || arc_clone.blocking_write_now()
});
write_handle2.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
drop(arc);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn test_blocking_persist_success() {
let setup = setup(10);
let data = TestData {
id: 230,
content: "blocking_persist".to_string(),
};
let path = test_path_a();
let arc = Arc::new(setup.pool.insert(data.clone()));
let key = arc.key();
assert!(!setup.store_impl.get_temp_keys().contains(&key));
assert!(!setup.store_impl.get_persisted_keys(&path).contains(&key));
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 0);
let tracked_path = Arc::new(setup.backing_store.blocking_track_path(path.clone()));
let persist_handle = tokio::task::spawn_blocking({
let arc_clone = arc.clone();
let tracked_clone = tracked_path.clone();
move || arc_clone.blocking_persist(&tracked_clone)
});
persist_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1); assert_eq!(setup.calls.persist.load(Ordering::SeqCst), 1); assert!(setup.store_impl.get_temp_keys().contains(&key));
assert!(setup.store_impl.get_persisted_keys(&path).contains(&key));
drop(arc);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1); let cleanup_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let tracked_clone = tracked_path.clone();
move || store.blocking_delete_persisted(&tracked_clone, key)
});
cleanup_handle.await.unwrap(); }
#[tokio::test]
async fn test_blocking_try_load_mut_success() {
let setup = setup(10);
let data = TestData {
id: 240,
content: "blocking_try_mut".to_string(),
};
let mut arc = Arc::new(setup.pool.insert(data.clone()));
let original_key = arc.key();
tokio::task::spawn_blocking({
let a = arc.clone();
move || a.blocking_write_now()
})
.await
.unwrap();
assert!(setup.store_impl.get_temp_keys().contains(&original_key));
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let mutate_handle = tokio::task::spawn_blocking(move || {
let maybe_guard = Arc::get_mut(&mut arc).map(Fb::blocking_load_mut);
assert!(maybe_guard.is_some());
let mut guard = maybe_guard.unwrap();
guard.content = "mutated_blocking".to_string();
drop(guard);
arc });
let arc_mutated = mutate_handle.await.unwrap();
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0); assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1); let new_key = arc_mutated.key();
assert_ne!(original_key, new_key);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key));
let load_handle = tokio::task::spawn_blocking({
let a = arc_mutated.clone();
move || a.blocking_load().content.clone()
});
assert_eq!(load_handle.await.unwrap(), "mutated_blocking");
drop(arc_mutated);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_blocking_make_mut_success_unique() {
let setup = setup(10);
let data = TestData {
id: 250,
content: "blocking_make_mut".to_string(),
};
let mut arc = Arc::new(setup.pool.insert(data.clone()));
let original_key = arc.key();
tokio::task::spawn_blocking({
let a = arc.clone();
move || a.blocking_write_now()
})
.await
.unwrap();
assert!(setup.store_impl.get_temp_keys().contains(&original_key));
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
let mutate_handle = tokio::task::spawn_blocking(move || {
let mut guard = arc.blocking_make_mut();
guard.content = "mutated_make_mut_blocking".to_string();
drop(guard);
arc });
let arc_mutated = mutate_handle.await.unwrap();
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1); let new_key = arc_mutated.key();
assert_ne!(original_key, new_key);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key));
let load_handle = tokio::task::spawn_blocking({
let a = arc_mutated.clone();
move || a.blocking_load().content.clone()
});
assert_eq!(load_handle.await.unwrap(), "mutated_make_mut_blocking");
drop(arc_mutated);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn test_try_load_mut_fails_if_not_unique() {
let setup = setup(10);
let data = TestData {
id: 300,
content: "try_mut_fail".to_string(),
};
let mut arc1 = Arc::new(setup.pool.insert(data.clone()));
let arc2 = arc1.clone(); let original_key = arc1.key();
let initial_delete_count = setup.calls.delete.load(Ordering::SeqCst);
assert_eq!(Arc::strong_count(&arc1), 2);
let result_async = Arc::get_mut(&mut arc1);
assert!(result_async.is_none()); assert_eq!(arc1.key(), original_key); assert_eq!(
setup.calls.delete.load(Ordering::SeqCst),
initial_delete_count
);
let handle_blocking = tokio::task::spawn_blocking({
let mut arc1_clone = arc1;
move || Arc::get_mut(&mut arc1_clone).is_some()
});
let succeeded = handle_blocking.await.unwrap();
assert!(!succeeded); assert_eq!(
setup.calls.delete.load(Ordering::SeqCst),
initial_delete_count
);
drop(arc2);
}
#[tokio::test]
async fn test_blocking_save_many_deletions_exceeding_limit() {
let setup = setup(20); let path = test_path_a();
let num_initial = 5;
let num_new = 2;
let max_simultaneous = 2;
let initial_items: Vec<TestData> = (0..num_initial)
.map(|i| TestData {
id: 400 + i,
content: format!("Initial {}", i),
})
.collect();
let initial_arcs: Vec<_> = initial_items
.iter()
.map(|d| Arc::new(setup.pool.insert(d.clone())))
.collect();
let initial_keys: HashSet<Uuid> = initial_arcs.iter().map(|a| a.key()).collect();
let tracked_path_1 = Arc::new(setup.backing_store.track_path(path.clone()).await.unwrap());
assert!(tracked_path_1.all_keys().is_empty());
let save_handle_1 = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let arcs = initial_arcs.clone();
let tracked = tracked_path_1.clone();
move || -> Result<(), String> {
blocking_save(&store, arcs, &tracked, max_simultaneous, || Ok(()))
}
});
assert!(save_handle_1.await.unwrap().is_ok());
setup.backing_store.finished().await;
assert_eq!(
setup.calls.persist.load(Ordering::SeqCst),
num_initial as usize
);
assert_eq!(setup.calls.delete_persisted.load(Ordering::SeqCst), 0);
assert_eq!(setup.store_impl.get_persisted_keys(&path), initial_keys);
let new_items: Vec<TestData> = (0..num_new)
.map(|i| TestData {
id: 500 + i,
content: format!("New {}", i),
})
.collect();
let new_arcs: Vec<_> = new_items
.iter()
.map(|d| Arc::new(setup.pool.insert(d.clone())))
.collect();
let new_keys: HashSet<Uuid> = new_arcs.iter().map(|a| a.key()).collect();
let tracked_path_2 = Arc::new(setup.backing_store.track_path(path.clone()).await.unwrap());
assert_eq!(HashSet::from_iter(tracked_path_2.all_keys()), initial_keys);
let save_handle_2 = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let arcs = new_arcs.clone(); let tracked = tracked_path_2.clone();
move || -> Result<(), String> {
blocking_save(&store, arcs, &tracked, max_simultaneous, || Ok(()))
}
});
assert!(save_handle_2.await.unwrap().is_ok());
setup.backing_store.finished().await;
assert_eq!(
setup.calls.persist.load(Ordering::SeqCst),
num_initial as usize + num_new as usize
);
assert_eq!(
setup.calls.delete_persisted.load(Ordering::SeqCst),
num_initial as usize
);
assert_eq!(setup.calls.sync_persisted.load(Ordering::SeqCst), 2);
assert_eq!(
setup.calls.store.load(Ordering::SeqCst),
num_initial as usize + num_new as usize
);
let final_persisted_keys = setup.store_impl.get_persisted_keys(&path);
assert_eq!(final_persisted_keys, new_keys);
drop(initial_arcs);
drop(new_arcs);
wait_for_store(&setup.backing_store).await;
assert_eq!(
setup.calls.delete.load(Ordering::SeqCst),
num_initial as usize + num_new as usize
);
}
#[tokio::test]
async fn test_blocking_make_mut_shared_clones() {
let setup = setup(10);
let data = TestData {
id: 31,
content: "make_mut_shared_blocking".to_string(),
};
let mut arc1 = Arc::new(setup.pool.insert(data.clone()));
let arc2 = arc1.clone(); let original_key = arc1.key();
let write_handle = tokio::task::spawn_blocking({
let arc_clone = arc1.clone();
move || arc_clone.blocking_write_now() });
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1); assert!(Arc::ptr_eq(&arc1, &arc2)); assert_eq!(Arc::strong_count(&arc1), 2); assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
let mutate_handle = tokio::task::spawn_blocking(move || {
let mut guard = arc1.blocking_make_mut(); guard.id = 32; guard.content = "mutated_via_clone_blocking".to_string();
drop(guard);
arc1 });
let arc1_mutated = mutate_handle.await.unwrap();
assert_eq!(Arc::strong_count(&arc1_mutated), 1);
assert_eq!(Arc::strong_count(&arc2), 1); assert!(!Arc::ptr_eq(&arc1_mutated, &arc2));
let new_key = arc1_mutated.key();
assert_ne!(original_key, new_key);
assert_eq!(arc2.key(), original_key);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert!(setup.store_impl.get_temp_keys().contains(&original_key));
let load1_handle = tokio::task::spawn_blocking({
let a = arc1_mutated.clone();
move || a.blocking_load().clone()
});
let data1_loaded = load1_handle.await.unwrap();
assert_eq!(data1_loaded.id, 32);
assert_eq!(data1_loaded.content, "mutated_via_clone_blocking");
let load2_handle = tokio::task::spawn_blocking({
let a = arc2.clone();
move || a.blocking_load().clone()
});
let data2_loaded = load2_handle.await.unwrap();
assert_eq!(data2_loaded, data);
drop(arc1_mutated); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
assert!(!setup.store_impl.get_temp_keys().contains(&new_key));
drop(arc2); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key));
assert_eq!(setup.pool.size(), 0); }
#[tokio::test]
async fn test_blocking_try_load_mut_triggers_load_if_not_cached() {
let setup = setup(1); let data_a = TestData {
id: 55,
content: "Blocking Load Mut A".to_string(),
};
let data_b = TestData {
id: 56,
content: "Blocking Load Mut B".to_string(),
};
let mut arc_a = Arc::new(setup.pool.insert(data_a.clone()));
let original_key_a = arc_a.key();
let write_handle = tokio::task::spawn_blocking({
let a = arc_a.clone();
move || a.blocking_write_now()
});
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1); assert!(setup.store_impl.get_temp_keys().contains(&original_key_a));
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0); assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
let _arc_b = setup.pool.insert(data_b.clone());
wait_for_store(&setup.backing_store).await; let stores_after_b_insert = setup.calls.store.load(Ordering::SeqCst);
assert_eq!(Arc::strong_count(&arc_a), 1); let mutate_handle = tokio::task::spawn_blocking(move || {
let maybe_entry = Arc::get_mut(&mut arc_a);
assert!(maybe_entry.is_some());
let mut guard = maybe_entry.unwrap().blocking_load_mut(); guard.content = "Mutated A after blocking load".to_string();
drop(guard); arc_a });
let arc_a_mutated = mutate_handle.await.unwrap();
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
let new_key_a = arc_a_mutated.key();
assert_ne!(original_key_a, new_key_a);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key_a)); assert!(setup.calls.store.load(Ordering::SeqCst) <= stores_after_b_insert + 1);
let load_handle = tokio::task::spawn_blocking({
let a = arc_a_mutated.clone();
move || a.blocking_load().content.clone()
});
let final_content = load_handle.await.unwrap();
assert_eq!(final_content, "Mutated A after blocking load");
drop(arc_a_mutated); wait_for_store(&setup.backing_store).await;
assert!(setup.calls.delete.load(Ordering::SeqCst) >= 1);
}
#[tokio::test(flavor = "multi_thread")] async fn test_load_triggers_blocking_load_if_not_cached() {
let setup = setup(1); let data_a = TestData {
id: 600,
content: "Load A".to_string(),
};
let data_b = TestData {
id: 601,
content: "Load B".to_string(),
};
let arc_a = setup.pool.insert(data_a.clone());
let key_a = arc_a.key();
let write_handle = arc_a.spawn_write_now().await; write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1); assert!(setup.store_impl.get_temp_keys().contains(&key_a));
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let _arc_b = setup.pool.insert(data_b.clone());
wait_for_store(&setup.backing_store).await;
let guard_a = arc_a.load_in_place();
assert_eq!(*guard_a, data_a); assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
drop(guard_a);
drop(arc_a);
wait_for_store(&setup.backing_store).await;
assert!(setup.calls.delete.load(Ordering::SeqCst) >= 1); }
#[tokio::test(flavor = "multi_thread")] async fn test_load_returns_from_cache_if_present() {
let setup = setup(10); let data_a = TestData {
id: 610,
content: "Load Cached A".to_string(),
};
let arc_a = setup.pool.insert(data_a.clone());
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let guard_a = arc_a.load_in_place();
assert_eq!(*guard_a, data_a); assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
drop(guard_a);
drop(arc_a); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
}
#[tokio::test] async fn test_try_load_success_when_cached() {
let setup = setup(10); let data_a = TestData {
id: 700,
content: "Try Load Cached A".to_string(),
};
let item_a = setup.pool.insert(data_a.clone());
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let maybe_guard = item_a.try_load();
assert!(maybe_guard.is_some()); let guard = maybe_guard.unwrap();
assert_eq!(*guard, data_a); assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
drop(guard);
drop(item_a);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0); }
#[tokio::test]
async fn test_try_load_fails_when_not_cached() {
let setup = setup(1); let data_a = TestData {
id: 710,
content: "Try Load Evicted A".to_string(),
};
let data_b = TestData {
id: 711,
content: "Try Load Evictor B".to_string(),
};
let item_a = setup.pool.insert(data_a.clone());
let write_handle = item_a.spawn_write_now().await;
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
let _item_b = setup.pool.insert(data_b.clone());
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let maybe_guard = item_a.try_load();
assert!(maybe_guard.is_none()); assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
drop(maybe_guard);
drop(item_a); wait_for_store(&setup.backing_store).await;
assert!(setup.calls.delete.load(Ordering::SeqCst) >= 1); }
#[tokio::test]
async fn test_sync_try_load_mut_success_when_cached() {
let setup = setup(10); let data_a = TestData {
id: 720,
content: "Sync Try Mut A".to_string(),
};
let mut item_a = setup.pool.insert(data_a.clone());
let original_key_a = item_a.key();
let write_handle = item_a.spawn_write_now().await;
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert!(setup.store_impl.get_temp_keys().contains(&original_key_a));
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0); assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
let maybe_guard = item_a.try_load_mut();
assert!(maybe_guard.is_some()); wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key_a));
let mut guard = maybe_guard.unwrap();
guard.content = "Sync Mutated A".to_string();
drop(guard);
let new_key_a = item_a.key(); assert_ne!(original_key_a, new_key_a);
let final_guard = item_a.load().await;
assert_eq!(final_guard.content, "Sync Mutated A");
drop(final_guard);
drop(item_a); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_sync_try_load_mut_fails_when_not_cached() {
let setup = setup(1); let data_a = TestData {
id: 730,
content: "Sync Try Mut Evicted A".to_string(),
};
let data_b = TestData {
id: 731,
content: "Sync Try Mut Evictor B".to_string(),
};
let mut item_a = setup.pool.insert(data_a.clone());
let original_key_a = item_a.key();
let write_handle = item_a.spawn_write_now().await;
write_handle.await.unwrap(); let _arc_b = setup.pool.insert(data_b.clone()); wait_for_store(&setup.backing_store).await;
let initial_delete_count = setup.calls.delete.load(Ordering::SeqCst);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let maybe_guard = item_a.try_load_mut();
assert!(maybe_guard.is_none()); drop(maybe_guard);
assert_eq!(item_a.key(), original_key_a); assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(
setup.calls.delete.load(Ordering::SeqCst),
initial_delete_count
);
drop(item_a); wait_for_store(&setup.backing_store).await;
assert!(setup.calls.delete.load(Ordering::SeqCst) > initial_delete_count);
}
#[tokio::test]
async fn test_register_same_persisted_key_twice() {
let setup = setup(10); let path = test_path_a();
let key = Uuid::new_v4();
let data = TestData {
id: 800,
content: "Register Twice".to_string(),
};
setup.store_impl._add_persisted(&path, key, data.clone());
assert!(setup.store_impl.get_persisted_keys(&path).contains(&key));
assert!(setup.store_impl.get_temp_keys().contains(&key)); setup.calls.store.store(0, Ordering::SeqCst);
let tracked_path = setup.backing_store.track_path(path.clone()).await.unwrap();
let tracked_path = Arc::new(tracked_path);
assert!(tracked_path.all_keys().contains(&key));
setup.calls.register.store(0, Ordering::SeqCst);
let maybe_item1 = setup.pool.register(&tracked_path, key).await;
assert!(maybe_item1.is_some());
let item1 = maybe_item1.unwrap();
assert_eq!(item1.key(), key);
assert_eq!(setup.calls.register.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
let maybe_item2 = setup.pool.register(&tracked_path, key).await;
assert!(maybe_item2.is_some());
let item2 = maybe_item2.unwrap();
assert_eq!(item2.key(), key);
assert_eq!(setup.calls.register.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(item1.key(), item2.key());
let guard1 = item1.load().await;
assert_eq!(*guard1, data);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1); drop(guard1);
let guard2 = item2.load().await;
assert_eq!(*guard2, data);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 2); drop(guard2);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
drop(item1);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
drop(item2);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&key));
let cleanup_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let tracked_clone = tracked_path.clone();
move || store.blocking_delete_persisted(&tracked_clone, key)
});
cleanup_handle.await.unwrap();
}
#[tokio::test]
async fn test_blocking_register_same_persisted_key_twice() {
let setup = setup(10);
let path = test_path_b(); let key = Uuid::new_v4();
let data = TestData {
id: 801,
content: "Blocking Register Twice".to_string(),
};
setup.store_impl._add_persisted(&path, key, data.clone());
setup.calls.store.store(0, Ordering::SeqCst);
let tracked_path = Arc::new(setup.backing_store.blocking_track_path(path.clone()));
assert!(tracked_path.all_keys().contains(&key));
setup.calls.register.store(0, Ordering::SeqCst);
let register1_handle = tokio::task::spawn_blocking({
let p = setup.pool.clone();
let t = tracked_path.clone();
move || p.blocking_register(&t, key)
});
let item1 = register1_handle.await.unwrap().unwrap();
assert_eq!(setup.calls.register.load(Ordering::SeqCst), 1);
let register2_handle = tokio::task::spawn_blocking({
let p = setup.pool.clone();
let t = tracked_path.clone();
move || p.blocking_register(&t, key)
});
let item2 = register2_handle.await.unwrap().unwrap();
assert_eq!(setup.calls.register.load(Ordering::SeqCst), 1);
assert_eq!(item1.key(), item2.key());
let load1_handle = tokio::task::spawn_blocking(move || item1.blocking_load().clone());
assert_eq!(load1_handle.await.unwrap(), data);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 1);
let load2_handle = tokio::task::spawn_blocking(move || item2.blocking_load().clone());
assert_eq!(load2_handle.await.unwrap(), data);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 2);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
let cleanup_handle = tokio::task::spawn_blocking({
let store = setup.backing_store.clone();
let t = tracked_path.clone();
move || store.blocking_delete_persisted(&t, key)
});
cleanup_handle.await.unwrap();
}
#[tokio::test]
async fn test_async_make_mut_unique_no_clone() {
let setup = setup(10);
let data_a = TestData {
id: 900,
content: "Async Make Mut Unique".to_string(),
};
let mut arc_a = Arc::new(setup.pool.insert(data_a.clone()));
let original_key_a = arc_a.key();
let write_handle = arc_a.spawn_write_now().await;
write_handle.await.unwrap();
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert!(setup.store_impl.get_temp_keys().contains(&original_key_a));
assert_eq!(Arc::strong_count(&arc_a), 1); assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0); assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
let mut guard = arc_a.make_mut().await;
wait_for_store(&setup.backing_store).await; assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(!setup.store_impl.get_temp_keys().contains(&original_key_a));
guard.content = "Async Make Mut Unique - Modified".to_string();
drop(guard);
let new_key_a = arc_a.key(); assert_ne!(original_key_a, new_key_a);
let final_guard = arc_a.load().await;
assert_eq!(final_guard.content, "Async Make Mut Unique - Modified");
drop(final_guard);
drop(arc_a); wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn test_try_load_fails_while_store_is_running_for_eviction() {
let sleep_duration = Duration::from_millis(50);
println!(
"NOTE: This test uses mock TestStore::store with sleep: {:?}",
sleep_duration
);
let setup = setup_with_store_sleep(1, sleep_duration); let data_a = TestData {
id: 960,
content: "Eviction Target A Sleep".to_string(),
};
let data_b = TestData {
id: 961,
content: "Evictor B Sleep".to_string(),
};
let item_a = setup.pool.insert(data_a.clone());
let _item_b = setup.pool.insert(data_b.clone());
tokio::time::sleep(Duration::from_millis(10)).await;
let maybe_guard = item_a.try_load();
assert!(
maybe_guard.is_none(),
"try_load should fail if item is being evicted (store running)"
);
drop(maybe_guard);
wait_for_store(&setup.backing_store).await;
assert_eq!(
setup.calls.store.load(Ordering::SeqCst),
1,
"Store for A should have completed"
);
drop(item_a);
wait_for_store(&setup.backing_store).await;
assert!(
setup.calls.delete.load(Ordering::SeqCst) >= 1,
"Delete for A should have occurred"
);
}
#[tokio::test]
async fn test_sync_try_load_mut_fails_while_store_is_running_for_eviction() {
let sleep_duration = Duration::from_millis(50);
println!(
"NOTE: This test uses mock TestStore::store with sleep: {:?}",
sleep_duration
);
let setup = setup_with_store_sleep(1, sleep_duration); let data_a = TestData {
id: 970,
content: "Eviction Target Mut A Sleep".to_string(),
};
let data_b = TestData {
id: 971,
content: "Evictor Mut B Sleep".to_string(),
};
let mut item_a = setup.pool.insert(data_a.clone());
let _arc_b = setup.pool.insert(data_b.clone());
tokio::time::sleep(Duration::from_millis(10)).await;
let maybe_guard = item_a.try_load_mut();
assert!(
maybe_guard.is_none(),
"sync try_load_mut should fail if item is being evicted (store running)"
);
drop(maybe_guard);
assert_eq!(setup.calls.load.load(Ordering::SeqCst), 0);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
wait_for_store(&setup.backing_store).await;
assert_eq!(
setup.calls.store.load(Ordering::SeqCst),
1,
"Store for A should have completed"
);
drop(item_a);
wait_for_store(&setup.backing_store).await;
assert!(
setup.calls.delete.load(Ordering::SeqCst) >= 1,
"Delete for A should have occurred"
);
}
#[tokio::test]
async fn test_item_dropped_during_eviction_write_triggers_delete() {
let sleep_duration = Duration::from_millis(100); println!(
"NOTE: This test uses mock TestStore::store with sleep: {:?}",
sleep_duration
);
let setup = setup_with_store_sleep(1, sleep_duration); let data_a = TestData {
id: 980,
content: "Drop During Store A".to_string(),
};
let data_b = TestData {
id: 981,
content: "Drop During Store B".to_string(),
};
let item_a = setup.pool.insert(data_a.clone());
let key_a = item_a.key();
let _item_b = setup.pool.insert(data_b.clone());
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 0); assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 0);
drop(item_a);
wait_for_store(&setup.backing_store).await;
assert_eq!(setup.calls.store.load(Ordering::SeqCst), 1);
assert_eq!(setup.calls.delete.load(Ordering::SeqCst), 1);
assert!(
!setup.store_impl.get_temp_keys().contains(&key_a),
"Item A should have been deleted from temp store immediately after being stored"
);
}
const DEADLOCK_TIMEOUT: Duration = Duration::from_secs(30);
#[test]
fn stress_saturated_blocking_pool() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.max_blocking_threads(1) .enable_all()
.build()
.unwrap();
rt.block_on(async {
let setup = setup_with_store_sleep(1, Duration::from_millis(200));
let pool = setup.pool.clone();
let shared = Arc::new(pool.insert(TestData {
id: 42,
content: "deadlock".into(),
}));
const N: usize = 8;
let mut handles = Vec::with_capacity(N);
for _ in 0..N {
let mut arc = shared.clone();
handles.push(tokio::spawn(async move {
let mut g = arc.make_mut().await;
g.content.push('X'); drop(g);
arc.spawn_write_now().await.await.unwrap();
}));
}
tokio::time::timeout(DEADLOCK_TIMEOUT, futures::future::join_all(handles))
.await
.expect("probable deadlock – the operations never completed");
setup.backing_store.finished().await;
});
}