pub struct Consumer { /* private fields */ }Expand description
Reads batches of ingested entries from object storage via a queue consumer.
The consumer iterates over entries in the queue manifest in ingestion order, fetches the corresponding data batches from object storage, and makes them available to the caller. Epoch-based fencing ensures only a single active consumer processes entries at any time.
Implementations§
Source§impl Consumer
impl Consumer
Sourcepub async fn new(
config: ConsumerConfig,
last_acked_sequence: Option<u64>,
) -> Result<Self>
pub async fn new( config: ConsumerConfig, last_acked_sequence: Option<u64>, ) -> Result<Self>
Create a new consumer from the given configuration.
Initializes the queue consumer (fencing any previous instance) and spawns
the garbage collector. If last_acked_sequence is Some(seq), the
consumer resumes after that sequence; if None, it discovers the first
available entry.
pub async fn with_object_store( config: ConsumerConfig, object_store: Arc<dyn ObjectStore>, last_acked_sequence: Option<u64>, ) -> Result<Self>
Sourcepub async fn next_batch(&mut self) -> Result<Option<ConsumedBatch>>
pub async fn next_batch(&mut self) -> Result<Option<ConsumedBatch>>
Read the next data batch from object storage.
Serial fetch of the next batch: peeks the next manifest entry
past the last handed-out / fetched sequence, fetches the
corresponding object, and returns it. Returns None if no
entry is available. May be called repeatedly to walk successive
batches; the read cursor is independent of the ack frontier.
Cancellation-safe and fetch-failure-safe. The cursor
(last_handed_out_sequence) advances only after a
successful fetch. If the fetch fails, or if the future is
dropped mid-fetch, the cursor is unchanged and a subsequent
next_batch re-fetches the same entry.
This is written as an inline peek-fetch-advance rather than a
wrapper over Consumer::next_descriptors +
Consumer::fetch_descriptor because next_descriptors
advances the cursor at handout time (a handed-out descriptor is
not reissued within a process; see the Descriptor Handout
Contract). Inlining keeps the cursor advance after the fetch
await, so a failed or dropped fetch leaves the cursor
untouched — the behavior next_batch callers expect.
Sourcepub async fn next_descriptors(
&mut self,
max: usize,
) -> Result<Vec<BatchDescriptor>>
pub async fn next_descriptors( &mut self, max: usize, ) -> Result<Vec<BatchDescriptor>>
Read the manifest once and return up to max contiguous
BatchDescriptors past the consumer’s read-ahead cursor.
Does not perform any object-store GET. Does not mutate the
durable ack frontier. Advances the in-memory read-ahead cursor
(last_handed_out_sequence) by the number of descriptors
returned.
Returns an empty Vec if no new entries are available;
returns Err(Error::Fenced) if the consumer’s epoch no longer
matches the manifest’s.
Caller contract: once a descriptor is returned, the caller
is responsible for either fetching and processing it or
accepting that it will be re-handed-out only via process
restart (Consumer::new with a last_acked_sequence argument).
Lost descriptors are not reissued within a process. See RFC
0003 “Descriptor Handout Contract” for the full rules.
Sourcepub fn fetch_handle(&self) -> ConsumerFetchHandle
pub fn fetch_handle(&self) -> ConsumerFetchHandle
Construct a cloneable fetch handle. O(1); each call returns a
fresh handle, and the handle itself implements Clone for
further duplication into worker tasks.
Sourcepub async fn fetch_descriptor(
&mut self,
descriptor: BatchDescriptor,
) -> Result<ConsumedBatch>
pub async fn fetch_descriptor( &mut self, descriptor: BatchDescriptor, ) -> Result<ConsumedBatch>
Fetch and decode a single batch via the consumer’s serial
wrapper. Equivalent to self.fetch_handle().fetch(descriptor),
but additionally maintains the consumer’s serial lag cursor
(last_fetched_sequence + the consumer_lag_seconds gauge)
used by the legacy next_batch path.
Parallel-fetch callers should use Consumer::fetch_handle
directly; the runtime owns its own per-stage latency
histograms (RFC 0002).
Sourcepub async fn ack(&mut self, sequence: u64) -> Result<()>
pub async fn ack(&mut self, sequence: u64) -> Result<()>
Acknowledge that the batch with the given sequence number has been processed.
Acks must be in order — the sequence must immediately follow the last acked
sequence, otherwise an error is returned. To amortize manifest writes, the
consumer only calls dequeue() on the queue consumer every
100 acks.
Sourcepub async fn ack_through(&mut self, sequence: u64) -> Result<()>
pub async fn ack_through(&mut self, sequence: u64) -> Result<()>
Advance the durable ack frontier through (and including)
sequence. Performs dequeue(sequence) against the manifest
first, then updates in-memory state on success.
On error (storage, fence), last_acked_sequence and the
buffer.acks counter remain at their pre-call values. A retry
against the same sequence is safe.
Errors if sequence <= last_acked_sequence (the frontier is
monotonic).
Sourcepub async fn flush(&mut self) -> Result<()>
pub async fn flush(&mut self) -> Result<()>
Flush any pending acks by dequeueing up to the last acked sequence.
Sourcepub async fn close(self) -> Result<()>
pub async fn close(self) -> Result<()>
Flush pending acks, shut down the garbage collector, and consume the handle.
Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Return the number of entries in the queue as of the last manifest read or write.
Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Return true if the queue had no entries as of the last manifest read or write.
Sourcepub fn conflict_rate(&self) -> f64
pub fn conflict_rate(&self) -> f64
Return the percentage of manifest writes that encountered a conflict.
Auto Trait Implementations§
impl !Freeze for Consumer
impl !RefUnwindSafe for Consumer
impl Send for Consumer
impl Sync for Consumer
impl Unpin for Consumer
impl UnsafeUnpin for Consumer
impl !UnwindSafe for Consumer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> Paint for Twhere
T: ?Sized,
impl<T> Paint for Twhere
T: ?Sized,
Source§fn fg(&self, value: Color) -> Painted<&T>
fn fg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the foreground set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like red() and
green(), which have the same functionality but are
pithier.
§Example
Set foreground color to white using fg():
use yansi::{Paint, Color};
painted.fg(Color::White);Set foreground color to white using white().
use yansi::Paint;
painted.white();Source§fn bright_black(&self) -> Painted<&T>
fn bright_black(&self) -> Painted<&T>
Source§fn bright_red(&self) -> Painted<&T>
fn bright_red(&self) -> Painted<&T>
Source§fn bright_green(&self) -> Painted<&T>
fn bright_green(&self) -> Painted<&T>
Source§fn bright_yellow(&self) -> Painted<&T>
fn bright_yellow(&self) -> Painted<&T>
Source§fn bright_blue(&self) -> Painted<&T>
fn bright_blue(&self) -> Painted<&T>
Source§fn bright_magenta(&self) -> Painted<&T>
fn bright_magenta(&self) -> Painted<&T>
Source§fn bright_cyan(&self) -> Painted<&T>
fn bright_cyan(&self) -> Painted<&T>
Source§fn bright_white(&self) -> Painted<&T>
fn bright_white(&self) -> Painted<&T>
Source§fn bg(&self, value: Color) -> Painted<&T>
fn bg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the background set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like on_red() and
on_green(), which have the same functionality but
are pithier.
§Example
Set background color to red using fg():
use yansi::{Paint, Color};
painted.bg(Color::Red);Set background color to red using on_red().
use yansi::Paint;
painted.on_red();Source§fn on_primary(&self) -> Painted<&T>
fn on_primary(&self) -> Painted<&T>
Source§fn on_magenta(&self) -> Painted<&T>
fn on_magenta(&self) -> Painted<&T>
Source§fn on_bright_black(&self) -> Painted<&T>
fn on_bright_black(&self) -> Painted<&T>
Source§fn on_bright_red(&self) -> Painted<&T>
fn on_bright_red(&self) -> Painted<&T>
Source§fn on_bright_green(&self) -> Painted<&T>
fn on_bright_green(&self) -> Painted<&T>
Source§fn on_bright_yellow(&self) -> Painted<&T>
fn on_bright_yellow(&self) -> Painted<&T>
Source§fn on_bright_blue(&self) -> Painted<&T>
fn on_bright_blue(&self) -> Painted<&T>
Source§fn on_bright_magenta(&self) -> Painted<&T>
fn on_bright_magenta(&self) -> Painted<&T>
Source§fn on_bright_cyan(&self) -> Painted<&T>
fn on_bright_cyan(&self) -> Painted<&T>
Source§fn on_bright_white(&self) -> Painted<&T>
fn on_bright_white(&self) -> Painted<&T>
Source§fn attr(&self, value: Attribute) -> Painted<&T>
fn attr(&self, value: Attribute) -> Painted<&T>
Enables the styling Attribute value.
This method should be used rarely. Instead, prefer to use
attribute-specific builder methods like bold() and
underline(), which have the same functionality
but are pithier.
§Example
Make text bold using attr():
use yansi::{Paint, Attribute};
painted.attr(Attribute::Bold);Make text bold using using bold().
use yansi::Paint;
painted.bold();Source§fn rapid_blink(&self) -> Painted<&T>
fn rapid_blink(&self) -> Painted<&T>
Source§fn quirk(&self, value: Quirk) -> Painted<&T>
fn quirk(&self, value: Quirk) -> Painted<&T>
Enables the yansi Quirk value.
This method should be used rarely. Instead, prefer to use quirk-specific
builder methods like mask() and
wrap(), which have the same functionality but are
pithier.
§Example
Enable wrapping using .quirk():
use yansi::{Paint, Quirk};
painted.quirk(Quirk::Wrap);Enable wrapping using wrap().
use yansi::Paint;
painted.wrap();Source§fn clear(&self) -> Painted<&T>
👎Deprecated since 1.0.1: renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
fn clear(&self) -> Painted<&T>
renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
Source§fn whenever(&self, value: Condition) -> Painted<&T>
fn whenever(&self, value: Condition) -> Painted<&T>
Conditionally enable styling based on whether the Condition value
applies. Replaces any previous condition.
See the crate level docs for more details.
§Example
Enable styling painted only when both stdout and stderr are TTYs:
use yansi::{Paint, Condition};
painted.red().on_yellow().whenever(Condition::STDOUTERR_ARE_TTY);