Skip to main content

eventbus_core/stream/
backend.rs

1use std::{future::Future, sync::Arc, time::Duration};
2
3use crate::{EventBusError, Message, PartialDeliveryState};
4
5#[derive(Debug, Clone)]
6pub struct ClaimedMessage {
7    pub id: String,
8    pub message: Arc<Message>,
9    /// Backend-supplied half of the delivery state. The bus layer combines
10    /// this with the subscription's retry budget to produce the full
11    /// [`crate::DeliveryState`] handed to handlers.
12    pub state: PartialDeliveryState,
13}
14
15/// One result per Redis Stream entry returned from `read_new` / `reclaim_idle`.
16///
17/// Backends report decode failures **per entry** instead of poisoning the
18/// whole batch with a single `Result<Vec<_>, _>` short-circuit. Otherwise a
19/// single corrupt PEL entry would loop forever:
20///
21/// - `XREADGROUP` already moved the entry into the consumer's PEL.
22/// - The bus would receive `Err`, log/observe, back off, retry.
23/// - On the next cycle the same entry would re-decode → fail again.
24///
25/// With per-entry results the bus can ack the bad entry (so it leaves the
26/// PEL), publish a synthetic dead-letter envelope (when `dead_letter_topic`
27/// is configured), and surface the error to the [`crate::ErrorObserver`].
28#[derive(Debug)]
29pub enum FetchedEntry {
30    Decoded(ClaimedMessage),
31    Malformed { id: String, error: EventBusError },
32}
33
34pub trait StreamBackend: Send + Sync + 'static {
35    fn create_group(
36        &self,
37        stream: &str,
38        group: &str,
39        start_id: &str,
40    ) -> impl Future<Output = Result<(), EventBusError>> + Send;
41
42    fn publish(
43        &self,
44        stream: &str,
45        message: Message,
46    ) -> impl Future<Output = Result<String, EventBusError>> + Send;
47
48    fn reclaim_idle(
49        &self,
50        stream: &str,
51        group: &str,
52        consumer: &str,
53        min_idle: Duration,
54        count: usize,
55    ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
56
57    fn read_new(
58        &self,
59        stream: &str,
60        group: &str,
61        consumer: &str,
62        count: usize,
63        timeout: Duration,
64    ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
65
66    fn ack(
67        &self,
68        stream: &str,
69        group: &str,
70        message_id: &str,
71    ) -> impl Future<Output = Result<(), EventBusError>> + Send;
72
73    /// Batch-acknowledge multiple message IDs in a single round-trip.
74    ///
75    /// Redis Streams `XACK` accepts N IDs in one command, turning what would
76    /// be N RTTs into one. Backends without native batch support can rely on
77    /// the default implementation (a serial loop over [`Self::ack`]), but the point
78    /// of this method is to let the Redis backend collapse the round-trips.
79    ///
80    /// # Contract
81    /// - Returning `Ok(())` means the whole batch was accepted by the server.
82    ///   It does *not* prove every ID existed in the PEL — Redis silently
83    ///   ignores unknown IDs. Callers must treat this as at-least-once.
84    /// - An `Err` signals the batch did not reach the server, so the caller
85    ///   should surface the same error to every waiter; messages will be
86    ///   re-claimed on the next cycle.
87    fn ack_many(
88        &self,
89        stream: &str,
90        group: &str,
91        message_ids: &[String],
92    ) -> impl Future<Output = Result<(), EventBusError>> + Send {
93        async move {
94            for id in message_ids {
95                self.ack(stream, group, id).await?;
96            }
97            Ok(())
98        }
99    }
100
101    /// Drop any per-(stream, group, consumer) state cached inside the backend
102    /// (e.g., XAUTOCLAIM cursors). Called by [`crate::stream::StreamBus`] on subscription
103    /// shutdown so backends do not accumulate cursor entries indefinitely
104    /// under churn (auto-generated consumer names, restarting pods, etc.).
105    ///
106    /// The default impl is a no-op; backends without per-consumer state can
107    /// ignore it.
108    #[allow(unused_variables)]
109    fn forget_consumer(
110        &self,
111        stream: &str,
112        group: &str,
113        consumer: &str,
114    ) -> impl Future<Output = ()> + Send {
115        async {}
116    }
117}
118
119pub(crate) type SharedBackend<B> = Arc<B>;