use std::collections::HashSet;
use std::fmt::Debug;
use std::io::{Cursor, ErrorKind};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use feldera_types::checkpoint::{CheckpointMetadata, PSpineBatches};
use feldera_types::config::{StorageBackendConfig, StorageConfig, StorageOptions};
use feldera_types::constants::CREATE_FILE_EXTENSION;
use serde::de::DeserializeOwned;
use tracing::warn;
use uuid::Uuid;
use crate::block::BlockLocation;
use crate::error::StorageError;
use crate::fbuf::FBuf;
use crate::file::FileId;
pub use object_store::path::{Path as StoragePath, PathPart as StoragePathPart};
pub mod block;
pub mod checkpoint_synchronizer;
pub mod error;
pub mod fbuf;
pub mod file;
pub mod histogram;
pub mod metrics;
pub mod tokio;
pub fn append_to_path(p: PathBuf, s: &str) -> PathBuf {
let mut p = p.into_os_string();
p.push(s);
p.into()
}
pub trait StorageBackendFactory: Sync {
fn backend(&self) -> &'static str;
fn create(
&self,
storage_config: &StorageConfig,
backend_config: &StorageBackendConfig,
) -> Result<Arc<dyn StorageBackend>, StorageError>;
}
inventory::collect!(&'static dyn StorageBackendFactory);
pub trait StorageBackend: Send + Sync {
fn create_named(&self, name: &StoragePath) -> Result<Box<dyn FileWriter>, StorageError>;
fn create(&self) -> Result<Box<dyn FileWriter>, StorageError> {
self.create_with_prefix(&StoragePath::default())
}
fn create_with_prefix(
&self,
prefix: &StoragePath,
) -> Result<Box<dyn FileWriter>, StorageError> {
let uuid = Uuid::now_v7();
let name = format!("{}{}{}", prefix, uuid, CREATE_FILE_EXTENSION);
self.create_named(&name.into())
}
fn open(&self, name: &StoragePath) -> Result<Arc<dyn FileReader>, StorageError>;
fn file_system_path(&self) -> Option<&Path> {
None
}
fn list(
&self,
parent: &StoragePath,
cb: &mut dyn FnMut(&StoragePath, StorageFileType),
) -> Result<(), StorageError>;
fn delete(&self, name: &StoragePath) -> Result<(), StorageError>;
fn delete_recursive(&self, name: &StoragePath) -> Result<(), StorageError>;
fn delete_if_exists(&self, name: &StoragePath) -> Result<(), StorageError> {
match self.delete(name) {
Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
other => other,
}
}
fn exists(&self, name: &StoragePath) -> Result<bool, StorageError> {
match self.open(name) {
Ok(_) => Ok(true),
Err(error) if error.kind() == ErrorKind::NotFound => Ok(false),
Err(error) => Err(error),
}
}
fn read(&self, name: &StoragePath) -> Result<Arc<FBuf>, StorageError> {
let reader = self.open(name)?;
let size = reader.get_size()?.try_into().unwrap();
reader.read_block(BlockLocation { offset: 0, size })
}
fn write(
&self,
name: &StoragePath,
content: FBuf,
) -> Result<Arc<dyn FileCommitter>, StorageError> {
let mut writer = self.create_named(name)?;
writer.write_block(content)?;
let reader = writer.complete()?;
reader.mark_for_checkpoint();
Ok(reader)
}
fn usage(&self) -> Arc<AtomicI64>;
}
impl dyn StorageBackend {
pub fn new(
config: &StorageConfig,
options: &StorageOptions,
) -> Result<Arc<Self>, StorageError> {
Self::warn_about_tmpfs(config.path());
for variable_provider in inventory::iter::<&dyn StorageBackendFactory> {
if variable_provider.backend() == options.backend.to_string() {
return variable_provider.create(config, &options.backend);
}
}
Err(StorageError::BackendNotSupported(Box::new(
options.backend.clone(),
)))
}
fn is_tmpfs(_path: &Path) -> bool {
#[cfg(target_os = "linux")]
{
use nix::sys::statfs;
statfs::statfs(_path).is_ok_and(|s| s.filesystem_type() == statfs::TMPFS_MAGIC)
}
#[cfg(not(target_os = "linux"))]
false
}
fn warn_about_tmpfs(path: &Path) {
if Self::is_tmpfs(path) {
static ONCE: std::sync::Once = std::sync::Once::new();
ONCE.call_once(|| {
warn!("initializing storage on in-memory tmpfs filesystem at {}; consider configuring physical storage", path.display())
});
}
}
pub fn gather_batches_for_checkpoint_uuid(
&self,
cpm: uuid::Uuid,
) -> Result<HashSet<StoragePath>, StorageError> {
assert!(!cpm.is_nil());
let mut spines = Vec::new();
self.list(&cpm.to_string().into(), &mut |path, _file_type| {
if path
.filename()
.is_some_and(|filename| filename.starts_with("pspine-batches"))
{
spines.push(path.clone());
}
})?;
let mut batch_files_in_commit: HashSet<StoragePath> = HashSet::new();
for spine in spines {
let pspine_batches = self.read_json::<PSpineBatches>(&spine)?;
for file in pspine_batches.files {
batch_files_in_commit.insert(file.into());
}
}
Ok(batch_files_in_commit)
}
pub fn gather_batches_for_checkpoint(
&self,
cpm: &CheckpointMetadata,
) -> Result<HashSet<StoragePath>, StorageError> {
self.gather_batches_for_checkpoint_uuid(cpm.uuid)
}
}
impl dyn StorageBackend + '_ {
pub fn write_json<V>(
&self,
name: &StoragePath,
value: &V,
) -> Result<Arc<dyn FileCommitter>, StorageError>
where
V: serde::Serialize,
{
let mut content = FBuf::new();
serde_json::to_writer(&mut content, value).unwrap();
self.write(name, content)
}
pub fn read_json<V>(&self, name: &StoragePath) -> Result<V, StorageError>
where
V: DeserializeOwned,
{
let content = self.read(name)?;
serde_json::from_reader(Cursor::new(content.as_ref()))
.map_err(|e| StorageError::JsonError(e.to_string()))
}
}
pub trait FileRw {
fn file_id(&self) -> FileId;
fn path(&self) -> &StoragePath;
}
pub trait FileWriter: Send + Sync + FileRw {
fn write_block(&mut self, data: FBuf) -> Result<Arc<FBuf>, StorageError>;
fn complete(self: Box<Self>) -> Result<Arc<dyn FileReader>, StorageError>;
}
pub trait FileCommitter: Send + Sync + Debug + FileRw {
fn commit(&self) -> Result<(), StorageError>;
}
pub trait FileReader: Send + Sync + Debug + FileRw + FileCommitter {
fn mark_for_checkpoint(&self);
fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, StorageError>;
#[allow(clippy::type_complexity)]
fn read_async(
&self,
blocks: Vec<BlockLocation>,
callback: Box<dyn FnOnce(Vec<Result<Arc<FBuf>, StorageError>>) + Send>,
) {
default_read_async(self, blocks, callback);
}
fn get_size(&self) -> Result<u64, StorageError>;
}
#[allow(clippy::type_complexity)]
pub fn default_read_async<R>(
reader: &R,
blocks: Vec<BlockLocation>,
callback: Box<dyn FnOnce(Vec<Result<Arc<FBuf>, StorageError>>) + Send>,
) where
R: FileReader + ?Sized,
{
callback(
blocks
.into_iter()
.map(|location| reader.read_block(location))
.collect(),
)
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum StorageFileType {
File {
size: u64,
},
Directory,
Other,
}