pub struct Consumer { /* private fields */ }Expand description
A consumer of events from a stream
Implementations§
Source§impl Consumer
impl Consumer
Sourcepub fn new(
db: Arc<AzothDb>,
stream: String,
name: String,
wake_strategy: WakeStrategy,
) -> Result<Self>
pub fn new( db: Arc<AzothDb>, stream: String, name: String, wake_strategy: WakeStrategy, ) -> Result<Self>
Create a new consumer
Sourcepub fn with_filter(self, filter: EventFilter) -> Self
pub fn with_filter(self, filter: EventFilter) -> Self
Add an additional filter on top of the stream filter
Sourcepub fn position(&self) -> Result<Option<u64>>
pub fn position(&self) -> Result<Option<u64>>
Get the current cursor position (last acknowledged event ID)
Returns None if no events have been acknowledged yet.
Sourcepub fn seek(&mut self, event_id: u64) -> Result<()>
pub fn seek(&mut self, event_id: u64) -> Result<()>
Seek to read from a specific event ID
The next call to next() will return the event at event_id.
If event_id is 0, resets to the beginning.
Sourcepub fn next(&mut self) -> Result<Option<Event>>
pub fn next(&mut self) -> Result<Option<Event>>
Read the next event (blocking poll)
Returns the next unprocessed event that matches the filter.
Call ack() to advance the cursor after processing.
Note: This method is intentionally not implementing Iterator because it returns Result and requires error handling.
Sourcepub fn ack(&mut self, event_id: EventId) -> Result<()>
pub fn ack(&mut self, event_id: EventId) -> Result<()>
Acknowledge processing of an event (update cursor)
Sourcepub async fn ack_async(&mut self, event_id: EventId) -> Result<()>
pub async fn ack_async(&mut self, event_id: EventId) -> Result<()>
Acknowledge processing of an event asynchronously (safe for async context)
This is the async-safe version of ack(). Use this when calling from
async code to avoid blocking the runtime.
Sourcepub fn metadata(&self) -> Result<ConsumerMetadata>
pub fn metadata(&self) -> Result<ConsumerMetadata>
Get consumer metadata
Sourcepub async fn next_async(&mut self) -> Result<Option<Event>>
pub async fn next_async(&mut self) -> Result<Option<Event>>
Read the next event asynchronously (non-blocking)
This method will wait for new events using the configured wake strategy. With polling (default), it checks periodically. With notifications, it waits for explicit wake-up signals.
Returns the next unprocessed event that matches the filter.
Call ack() to advance the cursor after processing.
Source§impl Consumer
impl Consumer
Sourcepub fn into_stream(self) -> ConsumerStream
pub fn into_stream(self) -> ConsumerStream
Convert this consumer into an async Stream
The returned stream yields Result<Event> items.
After processing each event, call ack() on the stream.
Note: For true async wake-up behavior with notifications,
use next_async() directly in a loop instead.
§Example
use futures::StreamExt;
let consumer = bus.subscribe("orders", "processor")?;
let mut stream = consumer.into_stream();
while let Some(result) = stream.next().await {
let event = result?;
process(&event).await?;
stream.ack(event.id)?;
}