pub mod config;
pub mod factory;
pub mod in_memory;
pub mod loader;
pub mod slate;
pub mod util;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use crate::BytesRange;
#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
pub enum Ttl {
#[default]
Default,
NoExpiry,
ExpireAfter(u64),
}
#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
pub struct PutOptions {
pub ttl: Ttl,
}
#[derive(Clone, Debug)]
pub struct PutRecordOp {
pub record: Record,
pub options: PutOptions,
}
impl PutRecordOp {
pub fn new(record: Record) -> Self {
Self {
record,
options: PutOptions::default(),
}
}
pub fn new_with_options(record: Record, options: PutOptions) -> Self {
Self { record, options }
}
pub fn with_options(self, options: PutOptions) -> Self {
Self {
record: self.record,
options,
}
}
}
impl From<Record> for PutRecordOp {
fn from(record: Record) -> Self {
Self::new(record)
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct MergeOptions {
pub ttl: Ttl,
}
#[derive(Clone, Debug)]
pub struct MergeRecordOp {
pub record: Record,
pub options: MergeOptions,
}
impl MergeRecordOp {
pub fn new(record: Record) -> Self {
Self {
record,
options: MergeOptions::default(),
}
}
pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
Self { record, options }
}
}
impl From<Record> for MergeRecordOp {
fn from(record: Record) -> Self {
Self::new(record)
}
}
#[derive(Clone, Debug)]
pub struct Record {
pub key: Bytes,
pub value: Bytes,
}
impl Record {
pub fn new(key: Bytes, value: Bytes) -> Self {
Self { key, value }
}
pub fn empty(key: Bytes) -> Self {
Self::new(key, Bytes::new())
}
}
#[derive(Clone, Debug)]
pub enum RecordOp {
Put(PutRecordOp),
Merge(MergeRecordOp),
Delete(Bytes),
}
#[derive(Debug, Clone, Default)]
pub struct WriteOptions {
pub await_durable: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StorageError {
Storage(String),
Internal(String),
}
impl std::error::Error for StorageError {}
impl std::fmt::Display for StorageError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
}
}
}
impl StorageError {
pub fn from_storage(e: impl std::fmt::Display) -> Self {
StorageError::Storage(e.to_string())
}
}
pub type StorageResult<T> = std::result::Result<T, StorageError>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct WriteResult {
pub seqnum: u64,
}
pub trait MergeOperator: Send + Sync {
fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
}
pub fn default_merge_batch(
key: &Bytes,
existing_value: Option<Bytes>,
operands: &[Bytes],
merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
) -> Bytes {
let mut result = existing_value;
for operand in operands {
result = Some(merge(key, result, operand.clone()));
}
result.expect("merge_batch called with no existing value and no operands")
}
#[async_trait]
pub trait StorageIterator {
async fn next(&mut self) -> StorageResult<Option<Record>>;
}
#[async_trait]
pub trait StorageRead: Send + Sync {
async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
async fn scan_iter(
&self,
range: BytesRange,
) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
#[tracing::instrument(level = "trace", skip_all)]
async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
let mut iter = self.scan_iter(range).await?;
let mut records = Vec::new();
while let Some(record) = iter.next().await? {
records.push(record);
}
Ok(records)
}
}
#[async_trait]
pub trait StorageSnapshot: StorageRead {}
#[async_trait]
pub trait Storage: StorageRead {
async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
self.apply_with_options(ops, WriteOptions::default()).await
}
async fn apply_with_options(
&self,
ops: Vec<RecordOp>,
options: WriteOptions,
) -> StorageResult<WriteResult>;
async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
self.put_with_options(records, WriteOptions::default())
.await
}
async fn put_with_options(
&self,
records: Vec<PutRecordOp>,
options: WriteOptions,
) -> StorageResult<WriteResult>;
async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
self.merge_with_options(records, WriteOptions::default())
.await
}
async fn merge_with_options(
&self,
records: Vec<MergeRecordOp>,
options: WriteOptions,
) -> StorageResult<WriteResult>;
async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
async fn flush(&self) -> StorageResult<()>;
async fn close(&self) -> StorageResult<()>;
fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
#[cfg(feature = "metrics")]
fn register_metrics(&self, _registry: &mut prometheus_client::registry::Registry) {}
}