Skip to main content

pyra_streams/
handler.rs

1use async_trait::async_trait;
2
3/// Trait for handling messages from a Redis Stream.
4///
5/// Services implement this trait with their domain-specific processing logic.
6/// The consumer framework handles XREADGROUP, XACK, XCLAIM, and dead-letter
7/// automatically based on the return value:
8///
9/// - `Ok(())` — message is ACKed and removed from pending.
10/// - `Err(_)` — message stays pending for retry via XCLAIM.
11#[async_trait]
12pub trait StreamHandler: Send + Sync + 'static {
13    /// The error type returned by handler processing.
14    type Error: std::fmt::Display + Send + Sync + 'static;
15
16    /// Process a single stream message.
17    ///
18    /// `msg_id` is the Redis stream message ID (e.g. "1234567890-0").
19    /// `data` is the value of the "data" field from the stream entry.
20    ///
21    /// Return `Ok(())` to acknowledge the message, or `Err` to leave it
22    /// pending for retry.
23    async fn handle_message(&self, msg_id: &str, data: &str) -> Result<(), Self::Error>;
24
25    /// Called when a message exceeds max retries and is moved to the dead-letter stream.
26    /// Default implementation is a no-op.
27    async fn on_dead_letter(&self, _msg_id: &str, _data: &str) {}
28}