Skip to main content

StreamBackend

Trait StreamBackend 

Source
pub trait StreamBackend:
    Send
    + Sync
    + 'static {
    // Required methods
    fn create_group(
        &self,
        stream: &str,
        group: &str,
        start_id: &str,
    ) -> impl Future<Output = Result<(), EventBusError>> + Send;
    fn publish(
        &self,
        stream: &str,
        message: Message,
    ) -> impl Future<Output = Result<String, EventBusError>> + Send;
    fn reclaim_idle(
        &self,
        stream: &str,
        group: &str,
        consumer: &str,
        min_idle: Duration,
        count: usize,
    ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
    fn read_new(
        &self,
        stream: &str,
        group: &str,
        consumer: &str,
        count: usize,
        timeout: Duration,
    ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
    fn ack(
        &self,
        stream: &str,
        group: &str,
        message_id: &str,
    ) -> impl Future<Output = Result<(), EventBusError>> + Send;

    // Provided methods
    fn ack_many(
        &self,
        stream: &str,
        group: &str,
        message_ids: &[String],
    ) -> impl Future<Output = Result<(), EventBusError>> + Send { ... }
    fn forget_consumer(
        &self,
        stream: &str,
        group: &str,
        consumer: &str,
    ) -> impl Future<Output = ()> + Send { ... }
}

Required Methods§

Source

fn create_group( &self, stream: &str, group: &str, start_id: &str, ) -> impl Future<Output = Result<(), EventBusError>> + Send

Source

fn publish( &self, stream: &str, message: Message, ) -> impl Future<Output = Result<String, EventBusError>> + Send

Source

fn reclaim_idle( &self, stream: &str, group: &str, consumer: &str, min_idle: Duration, count: usize, ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send

Source

fn read_new( &self, stream: &str, group: &str, consumer: &str, count: usize, timeout: Duration, ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send

Source

fn ack( &self, stream: &str, group: &str, message_id: &str, ) -> impl Future<Output = Result<(), EventBusError>> + Send

Provided Methods§

Source

fn ack_many( &self, stream: &str, group: &str, message_ids: &[String], ) -> impl Future<Output = Result<(), EventBusError>> + Send

Batch-acknowledge multiple message IDs in a single round-trip.

Redis Streams XACK accepts N IDs in one command, turning what would be N RTTs into one. Backends without native batch support can rely on the default implementation (a serial loop over Self::ack), but the point of this method is to let the Redis backend collapse the round-trips.

§Contract
  • Returning Ok(()) means the whole batch was accepted by the server. It does not prove every ID existed in the PEL — Redis silently ignores unknown IDs. Callers must treat this as at-least-once.
  • An Err signals the batch did not reach the server, so the caller should surface the same error to every waiter; messages will be re-claimed on the next cycle.
Source

fn forget_consumer( &self, stream: &str, group: &str, consumer: &str, ) -> impl Future<Output = ()> + Send

Drop any per-(stream, group, consumer) state cached inside the backend (e.g., XAUTOCLAIM cursors). Called by crate::stream::StreamBus on subscription shutdown so backends do not accumulate cursor entries indefinitely under churn (auto-generated consumer names, restarting pods, etc.).

The default impl is a no-op; backends without per-consumer state can ignore it.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§