Journal

Struct Journal 

Source
pub struct Journal<E: Storage + Metrics, V: Codec> { /* private fields */ }
Expand description

A position-based journal for variable-length items.

This journal manages section assignment automatically, allowing callers to append items sequentially without manually tracking section numbers.

§Invariants

§1. Section Fullness

All non-final sections are full (items_per_section items) and synced. This ensures that on init(), we only need to replay the last section to determine the exact size.

§2. Data Journal is Source of Truth

The data journal is always the source of truth. The offsets journal is an index that may temporarily diverge during crashes. Divergences are automatically aligned during init():

  • If offsets.size() < data.size(): Rebuild missing offsets by replaying data. (This can happen if we crash after writing data journal but before writing offsets journal)
  • If offsets.size() > data.size(): Rewind offsets to match data size. (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
  • If offsets.oldest_retained_pos() < data.oldest_retained_pos(): Prune offsets to match (This can happen if we crash after pruning data journal but before pruning offsets journal)

Note that we don’t recover from the case where offsets.oldest_retained_pos() > data.oldest_retained_pos(). This should never occur because we always prune the data journal before the offsets journal.

Implementations§

Source§

impl<E: Storage + Metrics, V: Codec> Journal<E, V>

Source

pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error>

Initialize a contiguous variable journal.

§Crash Recovery

The data journal is the source of truth. If the offsets journal is inconsistent it will be updated to match the data journal.

Source

pub async fn init_at_size( context: E, cfg: Config<V::Cfg>, size: u64, ) -> Result<Self, Error>

Initialize a Journal in a fully pruned state at a specific logical size.

This creates a journal that reports size() as size but contains no data. The oldest_retained_pos() will return None, indicating all positions before size have been pruned. This is useful for state sync when starting from a non-zero position without historical data.

§Arguments
  • size - The logical size to initialize at.
§Post-conditions
  • size() returns size
  • oldest_retained_pos() returns None (fully pruned)
  • Next append receives position size
Source

pub async fn rewind(&mut self, size: u64) -> Result<(), Error>

Rewind the journal to the given size, discarding items from the end.

After rewinding to size N, the journal will contain exactly N items, and the next append will receive position N.

§Errors

Returns Error::InvalidRewind if size is invalid (too large or points to pruned data).

Underlying storage operations may leave the journal in an inconsistent state. The journal should be closed and reopened to trigger alignment in Journal::init.

Source

pub async fn append(&mut self, item: V) -> Result<u64, Error>

Append a new item to the journal, returning its position.

The position returned is a stable, consecutively increasing value starting from 0. This position remains constant after pruning.

When a section becomes full, both the data journal and offsets journal are synced to maintain the invariant that all non-final sections are full and consistent.

§Errors

Returns an error if the underlying storage operation fails or if the item cannot be encoded.

Errors may leave the journal in an inconsistent state. The journal should be closed and reopened to trigger alignment in Journal::init.

Source

pub fn size(&self) -> u64

Return the total number of items that have been appended to the journal.

This count is NOT affected by pruning. The next appended item will receive this position as its value.

Source

pub fn oldest_retained_pos(&self) -> Option<u64>

Return the position of the oldest item still retained in the journal.

Returns None if the journal is empty or if all items have been pruned.

Source

pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error>

Prune items at positions strictly less than min_position.

Returns true if any data was pruned, false otherwise.

§Errors

Returns an error if the underlying storage operation fails.

Errors may leave the journal in an inconsistent state. The journal should be closed and reopened to trigger alignment in Journal::init.

Source

pub async fn read(&self, position: u64) -> Result<V, Error>

Read the item at the given position.

§Errors
Source

pub async fn sync_data(&mut self) -> Result<(), Error>

Sync only the data journal to storage, without syncing the offsets journal.

This is faster than sync() and can be used to ensure data durability without the overhead of syncing the offsets journal.

We call sync when appending to a section, so the offsets journal will eventually be synced as well, maintaining the invariant that all nonfinal sections are fully synced. In other words, the journal will remain in a consistent, recoverable state even if a crash occurs after calling this method but before calling sync.

Source

pub async fn sync(&mut self) -> Result<(), Error>

Sync all pending writes to storage.

This syncs both the data journal and the offsets journal concurrently.

Source

pub async fn close(self) -> Result<(), Error>

Close the journal, syncing all pending writes.

This closes both the data journal and the offsets journal.

Source

pub async fn destroy(self) -> Result<(), Error>

Remove any underlying blobs created by the journal.

This destroys both the data journal and the offsets journal.

Source§

impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V>

Source

pub async fn replay( &self, start_pos: u64, buffer_size: NonZeroUsize, ) -> Result<Pin<Box<dyn Stream<Item = Result<(u64, V), Error>> + '_>>, Error>

Return a stream of all items in the journal starting from start_pos.

Each item is yielded as a tuple (position, item) where position is the item’s position in the journal.

§Errors

Returns an error if start_pos exceeds the journal size or if any storage/decoding errors occur during replay.

Trait Implementations§

Source§

impl<E: Storage + Metrics, V: Codec + Send> Contiguous for Journal<E, V>

Source§

type Item = V

The type of items stored in the journal.
Source§

async fn append(&mut self, item: Self::Item) -> Result<u64, Error>

Append a new item to the journal, returning its position. Read more
Source§

async fn size(&self) -> u64

Return the total number of items that have been appended to the journal. Read more
Source§

async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error>

Return the position of the oldest item still retained in the journal. Read more
Source§

async fn prune(&mut self, min_position: u64) -> Result<bool, Error>

Prune items at positions strictly less than min_position. Read more
Source§

async fn replay( &self, start_pos: u64, buffer: NonZeroUsize, ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error>

Return a stream of all items in the journal starting from start_pos. Read more
Source§

async fn read(&self, position: u64) -> Result<Self::Item, Error>

Read the item at the given position. Read more
Source§

async fn sync(&mut self) -> Result<(), Error>

Sync all pending writes to storage. Read more
Source§

async fn close(self) -> Result<(), Error>

Close the journal, syncing all pending writes and releasing resources.
Source§

async fn destroy(self) -> Result<(), Error>

Destroy the journal, removing all associated storage. Read more
Source§

async fn rewind(&mut self, size: u64) -> Result<(), Error>

Rewind the journal to the given size, discarding items from the end. Read more

Auto Trait Implementations§

§

impl<E, V> Freeze for Journal<E, V>
where E: Freeze, <V as Read>::Cfg: Freeze, <E as Storage>::Blob: Freeze,

§

impl<E, V> !RefUnwindSafe for Journal<E, V>

§

impl<E, V> Send for Journal<E, V>
where V: Send,

§

impl<E, V> Sync for Journal<E, V>
where V: Sync,

§

impl<E, V> Unpin for Journal<E, V>
where E: Unpin, <V as Read>::Cfg: Unpin, V: Unpin, <E as Storage>::Blob: Unpin,

§

impl<E, V> !UnwindSafe for Journal<E, V>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,