use std::{
path::{Path, PathBuf},
sync::Arc,
};
use derive_more::Deref;
use opendal::Metadata;
use scc::hash_map::Entry;
use tantivy::directory::{
FileHandle,
error::{OpenReadError, OpenWriteError},
};
use crate::utils::FastConcurrentMap;
#[derive(Clone, Debug, Default)]
pub(crate) struct Cache {
created: Arc<CreatedCache>,
files: Arc<FilesCache>,
metadata: Arc<MetadataCache>,
}
#[derive(Debug, Default, Deref)]
struct CreatedCache {
#[deref]
cache: FastConcurrentMap<PathBuf, bool>,
}
pub(crate) struct CreatedEntry {
path: PathBuf,
cache: Arc<CreatedCache>,
done: bool,
}
#[derive(Debug, Default, Deref)]
pub(crate) struct FilesCache {
#[deref]
cache: FastConcurrentMap<PathBuf, Arc<dyn FileHandle>>,
}
#[derive(Debug, Default, Deref)]
struct MetadataCache {
#[deref]
cache: FastConcurrentMap<PathBuf, Arc<Metadata>>,
}
impl Cache {
pub async fn get_file(&self, path: &Path) -> Option<Arc<dyn FileHandle>> {
self.files
.read_async(path, |_, file| Arc::clone(file))
.await
}
pub async fn metadata(
&self,
path: &Path,
fetch: impl AsyncFnOnce() -> Result<Metadata, OpenReadError>,
) -> Result<Arc<Metadata>, OpenReadError> {
self.metadata.fetch(path, fetch).await
}
pub async fn file(
&self,
path: &Path,
open: impl AsyncFnOnce() -> Result<Arc<dyn FileHandle>, OpenReadError>,
) -> Result<Arc<dyn FileHandle>, OpenReadError> {
if let Some(file) = self.get_file(path).await {
return Ok(file);
}
let entry = self.files.entry_sync(path.to_path_buf());
let entry = match entry {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
let file = open().await?;
entry.insert_entry(file)
}
};
Ok(Arc::clone(entry.get()))
}
pub async fn is_created(&self, filepath: &Path) -> bool {
self.created.contains_async(filepath).await
}
pub async fn created(&self, filepath: PathBuf) -> Result<CreatedEntry, OpenWriteError> {
let filepath = filepath.to_path_buf();
let result = self.created.insert_async(filepath.clone(), false).await;
match result {
Ok(_) => Ok(CreatedEntry {
path: filepath,
cache: Arc::clone(&self.created),
done: false,
}),
Err(_) => Err(OpenWriteError::FileAlreadyExists(filepath)),
}
}
pub async fn sync(&self) -> Vec<PathBuf> {
let mut flushed = vec![];
self.created
.iter_mut_async(|entry| {
if *entry {
let (path, _) = entry.consume();
flushed.push(path);
}
true
})
.await;
flushed
}
}
impl CreatedEntry {
pub fn done(&mut self) {
if !self.done {
self.done = true;
self.cache.update_sync(&self.path, |_, done| *done = true);
}
}
}
impl MetadataCache {
async fn get(&self, path: &Path) -> Option<Arc<Metadata>> {
self.read_async(path, |_, metadata| Arc::clone(metadata))
.await
}
async fn fetch(
&self,
path: &Path,
fetch: impl AsyncFnOnce() -> Result<Metadata, OpenReadError>,
) -> Result<Arc<Metadata>, OpenReadError> {
if let Some(metadata) = self.get(path).await {
return Ok(metadata);
}
let entry = self.entry_sync(path.to_path_buf());
let entry = match entry {
Entry::Occupied(entry) => entry,
Entry::Vacant(entry) => {
let metadata = fetch().await.map(Arc::new)?;
entry.insert_entry(metadata)
}
};
Ok(Arc::clone(entry.get()))
}
}