use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
use consume_on_drop::{Consume, ConsumeOnDrop};
use cutoff_list::CutoffList;
use parking_lot::RwLock;
use tokio::{
sync::{RwLockMappedWriteGuard, RwLockReadGuard},
task::JoinHandle,
};
use uuid::Uuid;
mod entries;
pub mod backing_store;
pub mod convenience;
#[cfg(feature = "fbstore")]
pub mod fbstore;
use self::backing_store::{Strategy, TrackedPath};
use self::entries::{FullEntry, LimitedEntry};
pub use self::backing_store::{BackingStore, BackingStoreT};
pub struct Fb<T, B: BackingStoreT> {
entry: FullEntry<T, B>,
inner: FbInner<T, B>,
}
struct FbInner<T, B: BackingStoreT> {
index: cutoff_list::Index,
pool: Arc<FBPool<T, B>>,
}
impl<T, B: BackingStoreT> Drop for FbInner<T, B> {
fn drop(&mut self) {
let mut write_guard = self.pool.entries.write();
write_guard.remove(self.index).unwrap();
}
}
pub struct FBPool<T, B: BackingStoreT> {
entries: RwLock<CutoffList<LimitedEntry<T, B>>>,
store: Arc<BackingStore<B>>,
}
impl<T, B: BackingStoreT> FBPool<T, B> {
pub fn new(store: Arc<BackingStore<B>>, mem_size: usize) -> Self {
let entries = RwLock::new(CutoffList::new(vec![mem_size / 2, mem_size]));
Self { entries, store }
}
pub fn store(&self) -> &Arc<BackingStore<B>> {
&self.store
}
pub fn insert(self: &Arc<Self>, data: T) -> Fb<T, B>
where
T: Send + Sync + 'static,
B: Strategy<T>,
{
let entry = FullEntry::new(data);
let mut guard = self.entries.write();
let index = guard.insert_first(entry.limited());
let dump_entry = guard.get(guard.index_following_qth_cutoff(1));
if let Some(entry) = dump_entry {
entry.try_dump_to_disk(&self.store);
}
drop(guard);
Fb {
entry,
inner: FbInner {
index,
pool: Arc::clone(self),
},
}
}
pub async fn register(
self: &Arc<Self>,
path: &Arc<TrackedPath<B::PersistPath>>,
key: Uuid,
) -> Option<Fb<T, B>>
where
T: Send + Sync + 'static,
{
let entry = FullEntry::register(key, &self.store, path).await?;
let index = self.entries.write().insert_last(entry.limited());
Some(Fb {
entry,
inner: FbInner {
index,
pool: Arc::clone(self),
},
})
}
pub fn blocking_register(
self: &Arc<Self>,
path: &TrackedPath<B::PersistPath>,
key: Uuid,
) -> Option<Fb<T, B>> {
let entry = FullEntry::blocking_register(key, &self.store, path)?;
let index = self.entries.write().insert_last(entry.limited());
Some(Fb {
entry,
inner: FbInner {
index,
pool: Arc::clone(self),
},
})
}
pub fn size(&self) -> usize {
self.entries.read().len()
}
}
impl<T, B: BackingStoreT> Fb<T, B> {
pub fn key(&self) -> Uuid {
self.entry.key()
}
pub fn pool(&self) -> &Arc<FBPool<T, B>> {
&self.inner.pool
}
}
impl<T: Send + Sync + 'static, B: Strategy<T>> Fb<T, B> {
pub async fn load(&self) -> ReadGuard<'_, T, B> {
shift_forward(&self.inner.pool, self.inner.index);
let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
let data_guard = self.entry.load(&self.inner.pool.store).await;
ReadGuard {
data_guard,
_on_drop: on_drop,
}
}
pub fn try_load(&self) -> Option<ReadGuard<'_, T, B>> {
let guard = self.entry.try_load()?;
shift_forward(&self.inner.pool, self.inner.index);
let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
Some(ReadGuard {
data_guard: guard,
_on_drop: on_drop,
})
}
pub fn blocking_load(&self) -> ReadGuard<'_, T, B> {
shift_forward(&self.inner.pool, self.inner.index);
let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
let data_guard = self.entry.blocking_load(&self.inner.pool.store);
ReadGuard {
data_guard,
_on_drop: on_drop,
}
}
pub fn load_in_place(&self) -> ReadGuard<'_, T, B> {
shift_forward(&self.inner.pool, self.inner.index);
let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
let data_guard = self.entry.load_in_place(&self.inner.pool.store);
ReadGuard {
data_guard,
_on_drop: on_drop,
}
}
pub async fn load_mut(&mut self) -> WriteGuard<'_, T, B> {
shift_forward(&self.inner.pool, self.inner.index);
let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
let data_guard = self.entry.load_mut(&self.inner.pool.store).await;
WriteGuard {
data_guard,
_on_drop: on_drop,
}
}
pub fn try_load_mut(&mut self) -> Option<WriteGuard<'_, T, B>> {
let guard = self.entry.try_load_mut()?;
shift_forward(&self.inner.pool, self.inner.index);
let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
Some(WriteGuard {
data_guard: guard,
_on_drop: on_drop,
})
}
pub fn blocking_load_mut(&mut self) -> WriteGuard<'_, T, B> {
shift_forward(&self.inner.pool, self.inner.index);
let on_drop = GuardDropper::new(&self.inner.pool, self.inner.index);
let data_guard = self.entry.blocking_load_mut(&self.inner.pool.store);
WriteGuard {
data_guard,
_on_drop: on_drop,
}
}
pub async fn spawn_write_now(&self) -> JoinHandle<()> {
self.entry.spawn_write_now(&self.inner.pool.store).await
}
pub fn blocking_write_now(&self) {
self.entry.blocking_write_now(&self.inner.pool.store);
}
pub async fn spawn_persist(&self, path: &Arc<TrackedPath<B::PersistPath>>) -> JoinHandle<()> {
self.entry.spawn_persist(&self.inner.pool.store, path).await
}
pub fn blocking_persist(&self, path: &TrackedPath<B::PersistPath>) {
self.entry.blocking_persist(&self.inner.pool.store, path)
}
}
fn shift_forward<T: Send + Sync + 'static, B: Strategy<T>>(
pool: &FBPool<T, B>,
index: cutoff_list::Index,
) {
let read_guard = pool.entries.read();
let preceding_cutoffs = read_guard.preceding_cutoffs(index).unwrap();
if preceding_cutoffs == 0 {
return;
}
drop(read_guard);
let mut write_guard = pool.entries.write();
let preceding_cutoffs = write_guard.preceding_cutoffs(index).unwrap();
if preceding_cutoffs == 0 {
return;
}
write_guard.shift_to_front(index);
if preceding_cutoffs == 1 {
return;
}
assert!(preceding_cutoffs == 2);
let read_guard = parking_lot::RwLockWriteGuard::downgrade(write_guard);
let dump_entry = read_guard
.get(read_guard.index_following_qth_cutoff(1))
.unwrap();
dump_entry.try_dump_to_disk(&pool.store);
}
pub struct ReadGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
data_guard: RwLockReadGuard<'a, T>,
_on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
}
impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for ReadGuard<'_, T, B> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.data_guard
}
}
pub struct WriteGuard<'a, T: Send + Sync + 'static, B: Strategy<T>> {
data_guard: RwLockMappedWriteGuard<'a, T>,
_on_drop: ConsumeOnDrop<GuardDropper<'a, T, B>>,
}
impl<T: Send + Sync + 'static, B: Strategy<T>> Deref for WriteGuard<'_, T, B> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.data_guard
}
}
impl<T: Send + Sync + 'static, B: Strategy<T>> DerefMut for WriteGuard<'_, T, B> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data_guard
}
}
struct GuardDropper<'a, T: Send + Sync + 'static, B: Strategy<T>> {
pool: &'a FBPool<T, B>,
index: cutoff_list::Index,
}
impl<T: Send + Sync + 'static, B: Strategy<T>> Consume for GuardDropper<'_, T, B> {
fn consume(self) {
let entry_guard = self.pool.entries.read();
let preceding_cutoffs = entry_guard.preceding_cutoffs(self.index).unwrap();
assert!(preceding_cutoffs <= 2);
if preceding_cutoffs == 2 {
entry_guard
.get(self.index)
.unwrap()
.try_dump_to_disk(&self.pool.store);
}
}
}
impl<'a, T: Send + Sync + 'static, B: Strategy<T>> GuardDropper<'a, T, B> {
pub fn new(pool: &'a FBPool<T, B>, index: cutoff_list::Index) -> ConsumeOnDrop<Self> {
ConsumeOnDrop::new(GuardDropper { pool, index })
}
}