use std::{
io,
path::{Path, PathBuf},
sync::{Arc, LazyLock},
};
use derive_more::Debug;
use eyre::Result;
use sqlx::PgPool;
use tantivy::{
Directory, TantivyError,
directory::{
DirectoryLock, FileHandle, Lock, WatchCallback, WatchHandle, WritePtr,
error::{DeleteError, LockError, OpenReadError, OpenWriteError},
},
};
use tokio::runtime::Handle;
use uuid::Uuid;
use crate::{
cache::Cache,
file::File,
metadata::MetadataStore,
operator::Operator,
utils::{PathExt, WrapIoErrorExt},
writer::Writer,
};
static META_JSON: LazyLock<&'static Path> = LazyLock::new(|| Path::new("meta.json"));
static MANAGED_JSON: LazyLock<&'static Path> = LazyLock::new(|| Path::new(".managed.json"));
#[derive(Clone, Debug)]
#[debug("RemoteDirectory {{ index: {index} }}")]
pub struct RemoteDirectory {
index: Uuid,
rt: Handle,
cache: Cache,
operator: Operator,
metadata: MetadataStore,
read_chunks: Option<usize>,
write_chunks: Option<usize>,
read_concurrency: Option<usize>,
write_concurrency: Option<usize>,
}
impl RemoteDirectory {
pub async fn open(index: Uuid, operator: opendal::Operator, pool: PgPool) -> Result<Self> {
let metadata = MetadataStore::open(index, pool).await?;
Ok(Self {
index,
rt: Handle::current(),
cache: Cache::default(),
operator: Operator::from(operator),
metadata,
read_chunks: None,
write_chunks: None,
read_concurrency: None,
write_concurrency: None,
})
}
pub fn with_read_chunks(mut self, chunks: usize) -> Self {
self.read_chunks = Some(chunks);
self
}
pub fn with_write_chunks(mut self, chunks: usize) -> Self {
self.write_chunks = Some(chunks);
self
}
pub fn with_read_concurrency(mut self, concurrency: usize) -> Self {
self.read_concurrency = Some(concurrency);
self
}
pub fn with_write_concurrency(mut self, concurrency: usize) -> Self {
self.write_concurrency = Some(concurrency);
self
}
fn path(&self, path: impl AsRef<Path>) -> PathBuf {
let base = format!("idx-{}", self.index);
let mut base = PathBuf::from(base);
base.push(path);
base
}
}
impl Directory for RemoteDirectory {
fn get_file_handle(&self, filepath: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> {
self.rt.block_on(async {
let path = filepath.try_to_str::<OpenReadError>()?;
let filepath = self.path(filepath);
if let Some(file) = self.cache.get_file(&filepath).await {
return Ok(file);
}
let exists = if self.cache.is_created(&filepath).await {
true
} else {
self.metadata
.file_exists(path)
.await
.map_err(OpenReadError::wrapper(path))?
};
if !exists {
return Err(OpenReadError::FileDoesNotExist(path.into()));
}
let open = async || {
let metadata = self
.cache
.metadata(&filepath, || self.operator.metadata(&filepath))
.await?;
let path = filepath.try_to_str::<OpenReadError>()?;
let file = File::open(
path,
metadata,
self.rt.clone(),
self.operator.clone(),
self.read_chunks,
self.read_concurrency,
);
Ok(file)
};
self.cache.file(&filepath, open).await
})
}
fn delete(&self, filepath: &Path) -> Result<(), DeleteError> {
let path = filepath.try_to_str::<DeleteError>()?;
let deleted = self
.rt
.block_on(self.metadata.delete_file(path))
.map_err(DeleteError::wrapper(filepath))?;
if !deleted {
return Err(DeleteError::FileDoesNotExist(filepath.into()));
}
Ok(())
}
fn exists(&self, filepath: &Path) -> Result<bool, OpenReadError> {
let path = filepath.try_to_str::<OpenReadError>()?;
if filepath == *META_JSON || filepath == *MANAGED_JSON {
return self
.rt
.block_on(self.metadata.metadata_exists(path))
.map_err(OpenReadError::wrapper(filepath));
}
self.rt
.block_on(self.metadata.file_exists(path))
.map_err(OpenReadError::wrapper(filepath))
}
fn open_write(&self, filepath: &Path) -> Result<WritePtr, OpenWriteError> {
let path = filepath.try_to_str::<OpenWriteError>()?;
let exists = self
.rt
.block_on(self.metadata.file_exists(path))
.map_err(OpenWriteError::wrapper(filepath))?;
if exists {
return Err(OpenWriteError::FileAlreadyExists(filepath.into()));
}
let filepath = self.path(filepath);
let path = filepath.try_to_str::<OpenWriteError>()?;
let writer = self.rt.block_on(async {
let mut writer = self.operator.writer_with(path).append(false);
if let Some(chunks) = self.write_chunks {
writer = writer.chunk(chunks);
}
if let Some(concurrency) = self.write_concurrency {
writer = writer.concurrent(concurrency);
}
let writer = match writer.await {
Ok(writer) => writer,
Err(error) => {
let filepath = filepath.to_path_buf();
if error.kind() == opendal::ErrorKind::AlreadyExists {
return Err(OpenWriteError::FileAlreadyExists(filepath));
} else {
return Err(OpenWriteError::wrap_other(error, filepath));
}
}
};
let filepath = filepath.to_path_buf();
let entry = self.cache.created(filepath).await?;
Ok(Writer::new(entry, writer, self.rt.clone()))
})?;
let writer = Box::new(writer);
let ptr = WritePtr::new(writer);
Ok(ptr)
}
fn atomic_read(&self, filepath: &Path) -> Result<Vec<u8>, OpenReadError> {
let path = filepath.try_to_str::<OpenReadError>()?;
self.rt
.block_on(self.metadata.read_metadata(path))
.map_err(OpenReadError::wrapper(filepath))?
.ok_or_else(|| OpenReadError::FileDoesNotExist(filepath.into()))
}
fn atomic_write(&self, filepath: &Path, data: &[u8]) -> io::Result<()> {
let path = filepath.try_to_str::<io::Error>()?;
self.rt
.block_on(self.metadata.write_metadata(path, data))
.map_err(io::Error::wrapper(filepath))
}
fn sync_directory(&self) -> io::Result<()> {
let flushed = self.rt.block_on(self.cache.sync());
if flushed.is_empty() {
return Ok(());
}
for path in flushed {
let mut components = path.components();
components.next();
let filepath = components.as_path();
let path = filepath.try_to_str::<io::Error>()?;
self.rt
.block_on(self.metadata.create_file(path))
.map_err(io::Error::wrapper(filepath))?;
}
Ok(())
}
fn watch(&self, _cb: WatchCallback) -> tantivy::Result<WatchHandle> {
let error =
"watching is not supported by this directory, use `ReloadingPolicy::Manual`".into();
Err(TantivyError::InternalError(error))
}
fn acquire_lock(&self, _lock: &Lock) -> Result<DirectoryLock, LockError> {
Ok(DirectoryLock::from(Box::new(())))
}
}