use crate::{
db::{consts, Database, DatabaseKey},
fs::FileId,
id_lock::IdLock,
Context, ErrorKind, Result,
};
use serde::{de::DeserializeOwned, Serialize};
use std::{
collections::HashMap,
sync::{Arc, Condvar, Mutex, RwLock, RwLockWriteGuard},
time::Duration,
};
use tracing::error;
#[derive(Default)]
struct State<T> {
updated: HashMap<FileId, T>,
immediate: bool,
stopped: bool,
}
pub struct CachedStorage<T> {
db: Arc<Database>,
lock: IdLock<T>,
shared: Arc<(Mutex<State<T>>, Condvar)>,
derive: &'static [u8],
}
impl<T> CachedStorage<T>
where
T: Serialize + DeserializeOwned + Clone + Default + Send + std::fmt::Debug + 'static,
{
const BATCH_DELAY: Duration = Duration::from_millis(100);
pub fn new(db: Arc<Database>, derive: &'static [u8]) -> Self {
let shared = Arc::new((Mutex::default(), Condvar::new()));
std::thread::spawn({
let db = Arc::clone(&db);
let shared = Arc::clone(&shared);
move || loop {
let (lock, cvar) = &*shared;
let guard = lock.lock().unwrap();
let mut guard = cvar
.wait_while(guard, |guard: &mut State<T>| {
!guard.stopped && guard.updated.is_empty()
})
.unwrap();
if guard.stopped {
break;
}
if !guard.immediate {
drop(guard);
std::thread::sleep(Self::BATCH_DELAY);
guard = lock.lock().unwrap();
} else {
guard.immediate = false;
}
for (id, value) in guard.updated.drain() {
if let Err(err) = db
.key(consts::FILE_ROOT)
.derive(id)
.derive(derive)
.typed()
.put(&value)
{
error!("failed to persist object: {}", err);
}
}
}
});
Self {
db,
lock: IdLock::new(),
shared,
derive,
}
}
fn db_key(&self, id: FileId) -> DatabaseKey<T> {
self.db
.key(consts::FILE_ROOT)
.derive(id)
.derive(self.derive)
.typed()
}
fn fetch(&self, id: FileId) -> Result<T> {
self.db_key(id).get()?.kind(ErrorKind::NotFound)
}
pub fn store(&self, id: FileId, meta: T) {
self.lock.insert(id, meta.clone());
let mut guard = self.shared.0.lock().unwrap();
guard.updated.insert(id, meta);
guard.immediate = true;
self.shared.1.notify_one();
}
pub fn touch(&self, id: FileId) {
self.store(id, T::default());
}
pub fn stat(&self, id: FileId) -> Result<T> {
self.lock
.get_opt(id)
.map_or_else(|| self.fetch(id), |lock| Ok(lock.read().unwrap().clone()))
}
pub fn exists(&self, id: FileId) -> Result<bool> {
self.db_key(id).exists()
}
pub fn delete(&self, id: FileId) -> Result<()> {
self.db_key(id).delete()
}
pub fn key(&self, id: FileId) -> Result<CachedStorageKey<T>> {
Ok(CachedStorageKey {
id,
lock: self.lock.get_or_try_insert(id, || self.fetch(id))?,
shared: Arc::clone(&self.shared),
})
}
}
pub struct CachedStorageKey<T> {
id: FileId,
lock: Arc<RwLock<T>>,
shared: Arc<(Mutex<State<T>>, Condvar)>,
}
impl<T> CachedStorageKey<T>
where
T: Clone + Send + 'static,
{
pub fn write(&self) -> RwLockWriteGuard<T> {
self.lock.write().unwrap()
}
pub fn update(&self, guard: RwLockWriteGuard<T>) {
self.shared
.0
.lock()
.unwrap()
.updated
.insert(self.id, guard.clone());
self.shared.1.notify_one();
}
}