Skip to main content

StreamBackend

Trait StreamBackend 

Source
pub trait StreamBackend:
    Send
    + Sync
    + 'static {
    // Required methods
    fn create_group(
        &self,
        stream: &str,
        group: &str,
        start_id: &str,
    ) -> impl Future<Output = Result<(), EventBusError>> + Send;
    fn publish(
        &self,
        stream: &str,
        message: Message,
    ) -> impl Future<Output = Result<String, EventBusError>> + Send;
    fn reclaim_idle(
        &self,
        stream: &str,
        group: &str,
        consumer: &str,
        min_idle: Duration,
        count: usize,
    ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
    fn read_new(
        &self,
        stream: &str,
        group: &str,
        consumer: &str,
        count: usize,
        timeout: Duration,
    ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
    fn ack(
        &self,
        stream: &str,
        group: &str,
        message_id: &str,
    ) -> impl Future<Output = Result<(), EventBusError>> + Send;

    // Provided methods
    fn ack_many(
        &self,
        stream: &str,
        group: &str,
        message_ids: &[String],
    ) -> impl Future<Output = Result<(), EventBusError>> + Send { ... }
    fn forget_consumer(
        &self,
        stream: &str,
        group: &str,
        consumer: &str,
    ) -> impl Future<Output = ()> + Send { ... }
}

Required Methods§

Source

fn create_group( &self, stream: &str, group: &str, start_id: &str, ) -> impl Future<Output = Result<(), EventBusError>> + Send

Source

fn publish( &self, stream: &str, message: Message, ) -> impl Future<Output = Result<String, EventBusError>> + Send

Source

fn reclaim_idle( &self, stream: &str, group: &str, consumer: &str, min_idle: Duration, count: usize, ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send

Source

fn read_new( &self, stream: &str, group: &str, consumer: &str, count: usize, timeout: Duration, ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send

Source

fn ack( &self, stream: &str, group: &str, message_id: &str, ) -> impl Future<Output = Result<(), EventBusError>> + Send

Provided Methods§

Source

fn ack_many( &self, stream: &str, group: &str, message_ids: &[String], ) -> impl Future<Output = Result<(), EventBusError>> + Send

Batch-acknowledge multiple message IDs in a single round-trip.

Redis Streams XACK accepts N IDs in one command, turning what would be N RTTs into one. Backends without native batch support can rely on the default implementation (a serial loop over Self::ack), but the point of this method is to let the Redis backend collapse the round-trips.

§Contract
  • Returning Ok(()) means the whole batch was accepted by the server. It does not prove every ID existed in the PEL — Redis silently ignores unknown IDs. Callers must treat this as at-least-once.
  • An Err signals the batch did not reach the server, so the caller should surface the same error to every waiter; messages will be re-claimed on the next cycle.
Source

fn forget_consumer( &self, stream: &str, group: &str, consumer: &str, ) -> impl Future<Output = ()> + Send

Drop any per-(stream, group, consumer) state cached inside the backend (e.g., XAUTOCLAIM cursors). Called by crate::stream::StreamBus on subscription shutdown so backends do not accumulate cursor entries indefinitely under churn (auto-generated consumer names, restarting pods, etc.).

The default impl is a no-op; backends without per-consumer state can ignore it.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§