mod db;
mod meta;
mod options;
mod scope_fs;
use std::sync::{Arc, Mutex};
use rustc_hash::FxHashMap as HashMap;
pub use self::options::FileSystemOptions;
use self::{db::DB, meta::Meta, scope_fs::ScopeFileSystem};
use crate::{Result, Storage};
type BucketChangesMap = HashMap<Vec<u8>, Option<Vec<u8>>>;
#[derive(Debug)]
pub struct FileSystemStorage {
fs: ScopeFileSystem,
db: DB,
updates: HashMap<String, BucketChangesMap>,
options: FileSystemOptions,
next_meta_refresh_time: Arc<Mutex<u64>>,
}
impl FileSystemStorage {
pub fn new(options: FileSystemOptions) -> Self {
let fs = ScopeFileSystem::new(options.directory.clone(), options.fs.clone());
Self {
db: DB::new(fs.child_fs(&options.version)),
updates: Default::default(),
next_meta_refresh_time: Default::default(),
fs,
options,
}
}
}
#[async_trait::async_trait]
impl Storage for FileSystemStorage {
async fn load(&self, scope: &'static str) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let data = self.db.load(scope).await?;
Ok(data)
}
fn set(&mut self, scope: &'static str, key: Vec<u8>, value: Vec<u8>) {
let scope_update = self.updates.entry(scope.to_string()).or_default();
scope_update.insert(key, Some(value));
}
fn remove(&mut self, scope: &'static str, key: &[u8]) {
let scope_update = self.updates.entry(scope.to_string()).or_default();
scope_update.insert(key.to_vec(), None);
}
fn save(&mut self) {
let updates = std::mem::take(&mut self.updates);
self.db.save(
updates
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect()))
.collect(),
self.options.max_pack_size,
);
let fs = self.fs.clone();
let version = self.options.version.clone();
let expire = self.options.expire;
let next_meta_refresh_time_lock = self.next_meta_refresh_time.clone();
tokio::spawn(async move {
let now = Meta::current_timestamp();
let should_refresh = {
let next_time = next_meta_refresh_time_lock.lock().expect("should get lock");
*next_time <= now
};
if !should_refresh {
return;
}
let mut meta = match Meta::load(&fs).await {
Ok(meta) => meta,
Err(e) if e.is_not_found() => Default::default(),
Err(_) => return,
};
if let Ok((expired_versions, next_refresh_time)) = meta.refresh(&version, expire).await {
let _ = meta.save(&fs).await;
for v in expired_versions {
let _ = fs.child_fs(&v).remove().await;
}
let mut next_time = next_meta_refresh_time_lock.lock().expect("should get lock");
*next_time = next_refresh_time;
}
});
}
fn reset(&mut self, scope: &'static str) {
self.updates.remove(scope);
self.db.reset(scope);
}
async fn flush(&self) {
self.db.flush().await;
}
async fn scopes(&self) -> Result<Vec<String>> {
let names = self.db.bucket_names().await?;
Ok(names)
}
}