pub struct Journal<E: Storage + Metrics, V: Codec> { /* private fields */ }Expand description
A position-based journal for variable-length items.
This journal manages section assignment automatically, allowing callers to append items sequentially without manually tracking section numbers.
§Invariants
§1. Section Fullness
All non-final sections are full (items_per_section items) and synced. This ensures
that on init(), we only need to replay the last section to determine the exact size.
§2. Data Journal is Source of Truth
The data journal is always the source of truth. The offsets journal is an index that may temporarily diverge during crashes. Divergences are automatically aligned during init():
- If offsets.size() < data.size(): Rebuild missing offsets by replaying data. (This can happen if we crash after writing data journal but before writing offsets journal)
- If offsets.size() > data.size(): Rewind offsets to match data size. (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
- If offsets.oldest_retained_pos() < data.oldest_retained_pos(): Prune offsets to match (This can happen if we crash after pruning data journal but before pruning offsets journal)
Note that we don’t recover from the case where offsets.oldest_retained_pos() > data.oldest_retained_pos(). This should never occur because we always prune the data journal before the offsets journal.
Implementations§
Source§impl<E: Storage + Metrics, V: Codec> Journal<E, V>
impl<E: Storage + Metrics, V: Codec> Journal<E, V>
Sourcepub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error>
pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error>
Initialize a contiguous variable journal.
§Crash Recovery
The data journal is the source of truth. If the offsets journal is inconsistent it will be updated to match the data journal.
Sourcepub async fn init_at_size(
context: E,
cfg: Config<V::Cfg>,
size: u64,
) -> Result<Self, Error>
pub async fn init_at_size( context: E, cfg: Config<V::Cfg>, size: u64, ) -> Result<Self, Error>
Initialize a Journal in a fully pruned state at a specific logical size.
This creates a journal that reports size() as size but contains no data.
The oldest_retained_pos() will return None, indicating all positions before
size have been pruned. This is useful for state sync when starting from
a non-zero position without historical data.
§Arguments
size- The logical size to initialize at.
§Post-conditions
size()returnssizeoldest_retained_pos()returnsNone(fully pruned)- Next append receives position
size
Sourcepub async fn rewind(&mut self, size: u64) -> Result<(), Error>
pub async fn rewind(&mut self, size: u64) -> Result<(), Error>
Rewind the journal to the given size, discarding items from the end.
After rewinding to size N, the journal will contain exactly N items, and the next append will receive position N.
§Errors
Returns Error::InvalidRewind if size is invalid (too large or points to pruned data).
Underlying storage operations may leave the journal in an inconsistent state. The journal should be closed and reopened to trigger alignment in Journal::init.
Sourcepub async fn append(&mut self, item: V) -> Result<u64, Error>
pub async fn append(&mut self, item: V) -> Result<u64, Error>
Append a new item to the journal, returning its position.
The position returned is a stable, consecutively increasing value starting from 0. This position remains constant after pruning.
When a section becomes full, both the data journal and offsets journal are synced to maintain the invariant that all non-final sections are full and consistent.
§Errors
Returns an error if the underlying storage operation fails or if the item cannot be encoded.
Errors may leave the journal in an inconsistent state. The journal should be closed and reopened to trigger alignment in Journal::init.
Sourcepub fn size(&self) -> u64
pub fn size(&self) -> u64
Return the total number of items that have been appended to the journal.
This count is NOT affected by pruning. The next appended item will receive this position as its value.
Sourcepub fn oldest_retained_pos(&self) -> Option<u64>
pub fn oldest_retained_pos(&self) -> Option<u64>
Return the position of the oldest item still retained in the journal.
Returns None if the journal is empty or if all items have been pruned.
Sourcepub async fn prune(&mut self, min_position: u64) -> Result<bool, Error>
pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error>
Prune items at positions strictly less than min_position.
Returns true if any data was pruned, false otherwise.
§Errors
Returns an error if the underlying storage operation fails.
Errors may leave the journal in an inconsistent state. The journal should be closed and reopened to trigger alignment in Journal::init.
Sourcepub async fn read(&self, position: u64) -> Result<V, Error>
pub async fn read(&self, position: u64) -> Result<V, Error>
Read the item at the given position.
§Errors
- Returns Error::ItemPruned if the item at
positionhas been pruned. - Returns Error::ItemOutOfRange if
positionis beyond the journal size. - Returns other errors if storage or decoding fails.
Sourcepub async fn sync_data(&mut self) -> Result<(), Error>
pub async fn sync_data(&mut self) -> Result<(), Error>
Sync only the data journal to storage, without syncing the offsets journal.
This is faster than sync() and can be used to ensure data durability without
the overhead of syncing the offsets journal.
We call sync when appending to a section, so the offsets journal will eventually be
synced as well, maintaining the invariant that all nonfinal sections are fully synced.
In other words, the journal will remain in a consistent, recoverable state even if a crash
occurs after calling this method but before calling sync.
Sourcepub async fn sync(&mut self) -> Result<(), Error>
pub async fn sync(&mut self) -> Result<(), Error>
Sync all pending writes to storage.
This syncs both the data journal and the offsets journal concurrently.
Source§impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V>
impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V>
Sourcepub async fn replay(
&self,
start_pos: u64,
buffer_size: NonZeroUsize,
) -> Result<Pin<Box<dyn Stream<Item = Result<(u64, V), Error>> + '_>>, Error>
pub async fn replay( &self, start_pos: u64, buffer_size: NonZeroUsize, ) -> Result<Pin<Box<dyn Stream<Item = Result<(u64, V), Error>> + '_>>, Error>
Return a stream of all items in the journal starting from start_pos.
Each item is yielded as a tuple (position, item) where position is the item’s
position in the journal.
§Errors
Returns an error if start_pos exceeds the journal size or if any storage/decoding
errors occur during replay.
Trait Implementations§
Source§impl<E: Storage + Metrics, V: Codec + Send> Contiguous for Journal<E, V>
impl<E: Storage + Metrics, V: Codec + Send> Contiguous for Journal<E, V>
Source§async fn append(&mut self, item: Self::Item) -> Result<u64, Error>
async fn append(&mut self, item: Self::Item) -> Result<u64, Error>
Source§async fn size(&self) -> u64
async fn size(&self) -> u64
Source§async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error>
async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error>
Source§async fn prune(&mut self, min_position: u64) -> Result<bool, Error>
async fn prune(&mut self, min_position: u64) -> Result<bool, Error>
min_position. Read moreSource§async fn replay(
&self,
start_pos: u64,
buffer: NonZeroUsize,
) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error>
async fn replay( &self, start_pos: u64, buffer: NonZeroUsize, ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error>
start_pos. Read moreSource§async fn read(&self, position: u64) -> Result<Self::Item, Error>
async fn read(&self, position: u64) -> Result<Self::Item, Error>
Source§async fn close(self) -> Result<(), Error>
async fn close(self) -> Result<(), Error>
Auto Trait Implementations§
impl<E, V> Freeze for Journal<E, V>
impl<E, V> !RefUnwindSafe for Journal<E, V>
impl<E, V> Send for Journal<E, V>where
V: Send,
impl<E, V> Sync for Journal<E, V>where
V: Sync,
impl<E, V> Unpin for Journal<E, V>
impl<E, V> !UnwindSafe for Journal<E, V>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more