use super::Error;
use futures::Stream;
use std::{future::Future, num::NonZeroUsize, ops::Range};
use tracing::warn;
pub mod fixed;
pub mod variable;
#[cfg(test)]
mod tests;
pub trait Reader: Send + Sync {
type Item;
fn bounds(&self) -> Range<u64>;
fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send;
fn try_read_sync(&self, _position: u64) -> Option<Self::Item> {
None
}
fn replay(
&self,
buffer: NonZeroUsize,
start_pos: u64,
) -> impl Future<
Output = Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + Send, Error>,
> + Send;
}
pub trait Contiguous: Send + Sync {
type Item;
fn reader(&self) -> impl Future<Output = impl Reader<Item = Self::Item> + '_> + Send;
fn size(&self) -> impl Future<Output = u64> + Send;
}
pub enum Many<'a, T> {
Flat(&'a [T]),
Nested(&'a [&'a [T]]),
}
impl<T> Many<'_, T> {
pub fn is_empty(&self) -> bool {
match self {
Self::Flat(items) => items.is_empty(),
Self::Nested(nested_items) => nested_items.iter().all(|items| items.is_empty()),
}
}
}
pub trait Mutable: Contiguous + Send + Sync {
fn append(
&mut self,
item: &Self::Item,
) -> impl std::future::Future<Output = Result<u64, Error>> + Send;
fn append_many<'a>(
&'a mut self,
items: Many<'a, Self::Item>,
) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
where
Self::Item: Sync,
{
async move {
if items.is_empty() {
return Err(Error::EmptyAppend);
}
let mut last_pos = self.size().await;
match items {
Many::Flat(items) => {
for item in items {
last_pos = self.append(item).await?;
}
}
Many::Nested(nested_items) => {
for items in nested_items {
for item in *items {
last_pos = self.append(item).await?;
}
}
}
}
Ok(last_pos)
}
}
fn prune(
&mut self,
min_position: u64,
) -> impl std::future::Future<Output = Result<bool, Error>> + Send;
fn rewind(&mut self, size: u64) -> impl std::future::Future<Output = Result<(), Error>> + Send;
fn rewind_to<'a, P>(
&'a mut self,
mut predicate: P,
) -> impl std::future::Future<Output = Result<u64, Error>> + Send + 'a
where
P: FnMut(&Self::Item) -> bool + Send + 'a,
{
async move {
let (bounds, rewind_size) = {
let reader = self.reader().await;
let bounds = reader.bounds();
let mut rewind_size = bounds.end;
while rewind_size > bounds.start {
let item = reader.read(rewind_size - 1).await?;
if predicate(&item) {
break;
}
rewind_size -= 1;
}
(bounds, rewind_size)
};
if rewind_size != bounds.end {
let rewound_items = bounds.end - rewind_size;
warn!(
journal_size = bounds.end,
rewound_items, "rewinding journal items"
);
self.rewind(rewind_size).await?;
}
Ok(rewind_size)
}
}
}