pub struct Queue<E: Clock + Storage + Metrics, V: CodecShared> { /* private fields */ }Expand description
A durable, at-least-once delivery queue with per-item acknowledgment.
Items are durably stored in a journal and survive crashes. The reader must acknowledge each item individually after processing. Items can be acknowledged out of order, enabling parallel processing.
§Operations
- append / commit: Write items to the journal buffer, then persist. Items are readable immediately after append (before commit), but are lost on restart if not committed.
- enqueue: Append + commit in one step; the item is durable before return.
- dequeue: Return the next unacked item in FIFO order.
- ack / ack_up_to: Mark items as processed (in-memory only).
- sync: Commit, then prune completed sections below the ack floor.
§Acknowledgment
Acks are tracked in-memory with an ack_floor (all positions below are acked)
plus an RMap of acked positions above the floor. When items are acked
contiguously from the floor, the floor advances automatically.
Acks are not persisted. The durable equivalent is the journal’s pruning boundary, advanced by sync. On restart, all non-pruned items are re-delivered regardless of prior ack state.
§Crash Recovery
On restart, ack_floor is set to the journal’s pruning boundary.
Items that were pruned are gone; everything else is re-delivered.
Applications must handle duplicates (idempotent processing).
Implementations§
Source§impl<E: Clock + Storage + Metrics, V: CodecShared> Queue<E, V>
impl<E: Clock + Storage + Metrics, V: CodecShared> Queue<E, V>
Sourcepub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error>
pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error>
Initialize a queue from storage.
On first initialization, creates an empty queue. On restart, begins reading from the journal’s pruning boundary (providing at-least-once delivery for all non-pruned items).
§Errors
Returns an error if the underlying journal cannot be initialized.
Sourcepub fn is_acked(&self, position: u64) -> bool
pub fn is_acked(&self, position: u64) -> bool
Returns whether a specific position has been acknowledged.
Sourcepub async fn append(&mut self, item: V) -> Result<u64, Error>
pub async fn append(&mut self, item: V) -> Result<u64, Error>
Append an item without persisting. Call Self::commit or Self::sync
afterwards to make it durable. The item is readable immediately but
is not guaranteed to survive a crash until committed or the journal
auto-syncs at a section boundary (see variable::Journal invariant 1).
§Errors
Returns an error if the underlying storage operation fails.
Sourcepub async fn enqueue(&mut self, item: V) -> Result<u64, Error>
pub async fn enqueue(&mut self, item: V) -> Result<u64, Error>
Append and commit an item in one step, returning its position. The item is durable before this method returns.
§Errors
Returns an error if the underlying storage operation fails.
Sourcepub async fn dequeue(&mut self) -> Result<Option<(u64, V)>, Error>
pub async fn dequeue(&mut self) -> Result<Option<(u64, V)>, Error>
Dequeue the next unacknowledged item, returning its position and value.
Returns None when all items have been read or acknowledged.
Already-acked items are skipped automatically.
§Errors
Returns an error if the underlying storage operation fails.
Sourcepub async fn ack(&mut self, position: u64) -> Result<(), Error>
pub async fn ack(&mut self, position: u64) -> Result<(), Error>
Mark the item at position as processed (in-memory only).
The item will be skipped on subsequent dequeues. If this creates a
contiguous run from the ack floor, the floor advances automatically.
§Errors
Returns Error::PositionOutOfRange if position >= queue size.
Sourcepub async fn ack_up_to(&mut self, up_to: u64) -> Result<(), Error>
pub async fn ack_up_to(&mut self, up_to: u64) -> Result<(), Error>
Acknowledge all items in [ack_floor, up_to) by advancing the floor
directly. More efficient than calling Self::ack in a loop.
§Errors
Returns Error::PositionOutOfRange if up_to > queue size.
Sourcepub const fn read_position(&self) -> u64
pub const fn read_position(&self) -> u64
Returns the current read position.
This is the position of the next item that will be checked by Queue::dequeue.
Sourcepub const fn ack_floor(&self) -> u64
pub const fn ack_floor(&self) -> u64
Returns the current ack floor.
All items at positions less than this value are considered acknowledged.
Sourcepub async fn size(&self) -> u64
pub async fn size(&self) -> u64
Returns the total number of items that have been enqueued.
This count is not affected by pruning. It represents the position that the next enqueued item will receive.
Sourcepub async fn is_empty(&self) -> bool
pub async fn is_empty(&self) -> bool
Returns whether all enqueued items have been acknowledged.
Sourcepub fn reset(&mut self)
pub fn reset(&mut self)
Reset the read position to the ack floor so Self::dequeue re-delivers all unacknowledged items from the beginning.
Trait Implementations§
Source§impl<E: Clock + Storage + Metrics + Send, V: CodecShared + Send> Persistable for Queue<E, V>
impl<E: Clock + Storage + Metrics + Send, V: CodecShared + Send> Persistable for Queue<E, V>
Source§type Error = Error
type Error = Error
Source§async fn commit(&self) -> Result<(), Error>
async fn commit(&self) -> Result<(), Error>
Auto Trait Implementations§
impl<E, V> !Freeze for Queue<E, V>
impl<E, V> !RefUnwindSafe for Queue<E, V>
impl<E, V> Send for Queue<E, V>
impl<E, V> Sync for Queue<E, V>
impl<E, V> Unpin for Queue<E, V>
impl<E, V> UnsafeUnpin for Queue<E, V>
impl<E, V> !UnwindSafe for Queue<E, V>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more