Skip to main content

StreamingBackend

Trait StreamingBackend 

Source
pub trait StreamingBackend: Send + Sync {
    // Required methods
    fn stream_add<'a>(
        &'a self,
        stream_key: &'a str,
        fields: Vec<(String, String)>,
        maxlen: Option<usize>,
    ) -> BoxFuture<'a, CacheResult<String>>;
    fn stream_read_latest<'a>(
        &'a self,
        stream_key: &'a str,
        count: usize,
    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>;
    fn stream_read<'a>(
        &'a self,
        stream_key: &'a str,
        last_id: &'a str,
        count: usize,
        block_ms: Option<usize>,
    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>;
    fn stream_create_group<'a>(
        &'a self,
        stream_key: &'a str,
        group_name: &'a str,
        id: &'a str,
    ) -> BoxFuture<'a, CacheResult<()>>;
    fn stream_read_group<'a>(
        &'a self,
        stream_key: &'a str,
        group_name: &'a str,
        consumer_name: &'a str,
        count: usize,
        block_ms: Option<usize>,
    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>;
    fn stream_ack<'a>(
        &'a self,
        stream_key: &'a str,
        group_name: &'a str,
        ids: &'a [String],
    ) -> BoxFuture<'a, CacheResult<()>>;
}
Expand description

Optional trait for cache backends that support event streaming

This trait defines operations for event-driven architectures using streaming data structures like Redis Streams.

§Capabilities

  • Publish events to streams with automatic trimming
  • Read latest entries (newest first)
  • Read entries with blocking support

§Backend Requirements

Not all cache backends support streaming. This trait is optional and should only be implemented by backends with native streaming support (e.g., Redis Streams, Kafka, Pulsar).

§Example

```rust,ignore
use multi_tier_cache::error::{CacheError, CacheResult};
use multi_tier_cache::{StreamingBackend, StreamEntry};
use futures_util::future::BoxFuture;

struct MyStreamingCache;

impl StreamingBackend for MyStreamingCache {
    fn stream_add<'a>(
        &'a self,
        _stream_key: &'a str,
        _fields: Vec<(String, String)>,
        _maxlen: Option<usize>,
    ) -> BoxFuture<'a, CacheResult<String>> {
        Box::pin(async move { Ok("entry-id".to_string()) })
    }

    fn stream_read_latest<'a>(
        &'a self,
        _stream_key: &'a str,
        _count: usize,
    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
        Box::pin(async move { Ok(vec![]) })
    }

    fn stream_read<'a>(
        &'a self,
        _stream_key: &'a str,
        _last_id: &'a str,
        _count: usize,
        _block_ms: Option<usize>,
    ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
        Box::pin(async move { Ok(vec![]) })
    }

    fn stream_create_group<'a>(&'a self, _: &'a str, _: &'a str, _: &'a str) -> BoxFuture<'a, CacheResult<()>> {
        Box::pin(async move { Ok(()) })
    }

    fn stream_read_group<'a>(&'a self, _: &'a str, _: &'a str, _: &'a str, _: usize, _: Option<usize>) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>> {
        Box::pin(async move { Ok(vec![]) })
    }

    fn stream_ack<'a>(&'a self, _: &'a str, _: &'a str, _: &'a [String]) -> BoxFuture<'a, CacheResult<()>> {
        Box::pin(async move { Ok(()) })
    }
}

Required Methods§

Source

fn stream_add<'a>( &'a self, stream_key: &'a str, fields: Vec<(String, String)>, maxlen: Option<usize>, ) -> BoxFuture<'a, CacheResult<String>>

Add an entry to a stream

Source

fn stream_read_latest<'a>( &'a self, stream_key: &'a str, count: usize, ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>

Read the latest N entries from a stream (newest first)

Source

fn stream_read<'a>( &'a self, stream_key: &'a str, last_id: &'a str, count: usize, block_ms: Option<usize>, ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>

Read entries from a stream with optional blocking

Source

fn stream_create_group<'a>( &'a self, stream_key: &'a str, group_name: &'a str, id: &'a str, ) -> BoxFuture<'a, CacheResult<()>>

Create a consumer group for a stream

Source

fn stream_read_group<'a>( &'a self, stream_key: &'a str, group_name: &'a str, consumer_name: &'a str, count: usize, block_ms: Option<usize>, ) -> BoxFuture<'a, CacheResult<Vec<StreamEntry>>>

Read entries from a stream as a consumer group

Source

fn stream_ack<'a>( &'a self, stream_key: &'a str, group_name: &'a str, ids: &'a [String], ) -> BoxFuture<'a, CacheResult<()>>

Acknowledge entry processing

Implementors§

Source§

impl StreamingBackend for RedisStreams

Implement StreamingBackend trait for RedisStreams