Skip to main content

Queue

Struct Queue 

Source
pub struct Queue<E: Clock + Storage + Metrics, V: CodecShared> { /* private fields */ }
Expand description

A durable, at-least-once delivery queue with per-item acknowledgment.

Items are durably stored in a journal and survive crashes. The reader must acknowledge each item individually after processing. Items can be acknowledged out of order, enabling parallel processing.

§Operations

  • append / commit: Write items to the journal buffer, then persist. Items are readable immediately after append (before commit), but are lost on restart if not committed.
  • enqueue: Append + commit in one step; the item is durable before return.
  • dequeue: Return the next unacked item in FIFO order.
  • ack / ack_up_to: Mark items as processed (in-memory only).
  • sync: Commit, then prune completed sections below the ack floor.

§Acknowledgment

Acks are tracked in-memory with an ack_floor (all positions below are acked) plus an RMap of acked positions above the floor. When items are acked contiguously from the floor, the floor advances automatically.

Acks are not persisted. The durable equivalent is the journal’s pruning boundary, advanced by sync. On restart, all non-pruned items are re-delivered regardless of prior ack state.

§Crash Recovery

On restart, ack_floor is set to the journal’s pruning boundary. Items that were pruned are gone; everything else is re-delivered. Applications must handle duplicates (idempotent processing).

Implementations§

Source§

impl<E: Clock + Storage + Metrics, V: CodecShared> Queue<E, V>

Source

pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error>

Initialize a queue from storage.

On first initialization, creates an empty queue. On restart, begins reading from the journal’s pruning boundary (providing at-least-once delivery for all non-pruned items).

§Errors

Returns an error if the underlying journal cannot be initialized.

Source

pub fn is_acked(&self, position: u64) -> bool

Returns whether a specific position has been acknowledged.

Source

pub async fn append(&mut self, item: V) -> Result<u64, Error>

Append an item without persisting. Call Self::commit or Self::sync afterwards to make it durable. The item is readable immediately but is not guaranteed to survive a crash until committed or the journal auto-syncs at a section boundary (see variable::Journal invariant 1).

§Errors

Returns an error if the underlying storage operation fails.

Source

pub async fn enqueue(&mut self, item: V) -> Result<u64, Error>

Append and commit an item in one step, returning its position. The item is durable before this method returns.

§Errors

Returns an error if the underlying storage operation fails.

Source

pub async fn dequeue(&mut self) -> Result<Option<(u64, V)>, Error>

Dequeue the next unacknowledged item, returning its position and value. Returns None when all items have been read or acknowledged. Already-acked items are skipped automatically.

§Errors

Returns an error if the underlying storage operation fails.

Source

pub async fn ack(&mut self, position: u64) -> Result<(), Error>

Mark the item at position as processed (in-memory only). The item will be skipped on subsequent dequeues. If this creates a contiguous run from the ack floor, the floor advances automatically.

§Errors

Returns Error::PositionOutOfRange if position >= queue size.

Source

pub async fn ack_up_to(&mut self, up_to: u64) -> Result<(), Error>

Acknowledge all items in [ack_floor, up_to) by advancing the floor directly. More efficient than calling Self::ack in a loop.

§Errors

Returns Error::PositionOutOfRange if up_to > queue size.

Source

pub const fn read_position(&self) -> u64

Returns the current read position.

This is the position of the next item that will be checked by Queue::dequeue.

Source

pub const fn ack_floor(&self) -> u64

Returns the current ack floor.

All items at positions less than this value are considered acknowledged.

Source

pub async fn size(&self) -> u64

Returns the total number of items that have been enqueued.

This count is not affected by pruning. It represents the position that the next enqueued item will receive.

Source

pub async fn is_empty(&self) -> bool

Returns whether all enqueued items have been acknowledged.

Source

pub fn reset(&mut self)

Reset the read position to the ack floor so Self::dequeue re-delivers all unacknowledged items from the beginning.

Trait Implementations§

Source§

impl<E: Clock + Storage + Metrics + Send, V: CodecShared + Send> Persistable for Queue<E, V>

Source§

type Error = Error

The error type returned when there is a failure from the underlying storage system.
Source§

async fn commit(&self) -> Result<(), Error>

Durably persist the structure, guaranteeing the current state will survive a crash. Read more
Source§

async fn sync(&self) -> Result<(), Error>

Durably persist the structure, guaranteeing the current state will survive a crash, and that no recovery will be needed on startup. Read more
Source§

async fn destroy(self) -> Result<(), Error>

Destroy the structure, removing all associated storage. Read more

Auto Trait Implementations§

§

impl<E, V> !Freeze for Queue<E, V>

§

impl<E, V> !RefUnwindSafe for Queue<E, V>

§

impl<E, V> Send for Queue<E, V>

§

impl<E, V> Sync for Queue<E, V>

§

impl<E, V> Unpin for Queue<E, V>
where <V as Read>::Cfg: Unpin, E: Unpin, <E as Storage>::Blob: Unpin,

§

impl<E, V> UnsafeUnpin for Queue<E, V>
where <V as Read>::Cfg: UnsafeUnpin, E: UnsafeUnpin, <E as Storage>::Blob: UnsafeUnpin,

§

impl<E, V> !UnwindSafe for Queue<E, V>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,