eventbus_core/stream/backend.rs
1use std::{future::Future, sync::Arc, time::Duration};
2
3use crate::{EventBusError, Message, PartialDeliveryState};
4
5#[derive(Debug, Clone)]
6pub struct ClaimedMessage {
7 pub id: String,
8 pub message: Arc<Message>,
9 /// Backend-supplied half of the delivery state. The bus layer combines
10 /// this with the subscription's retry budget to produce the full
11 /// [`crate::DeliveryState`] handed to handlers.
12 pub state: PartialDeliveryState,
13}
14
15/// One result per Redis Stream entry returned from `read_new` / `reclaim_idle`.
16///
17/// Backends report decode failures **per entry** instead of poisoning the
18/// whole batch with a single `Result<Vec<_>, _>` short-circuit. Otherwise a
19/// single corrupt PEL entry would loop forever:
20///
21/// - `XREADGROUP` already moved the entry into the consumer's PEL.
22/// - The bus would receive `Err`, log/observe, back off, retry.
23/// - On the next cycle the same entry would re-decode → fail again.
24///
25/// With per-entry results the bus can ack the bad entry (so it leaves the
26/// PEL), publish a synthetic dead-letter envelope (when `dead_letter_topic`
27/// is configured), and surface the error to the [`crate::ErrorObserver`].
28#[derive(Debug)]
29pub enum FetchedEntry {
30 Decoded(ClaimedMessage),
31 Malformed { id: String, error: EventBusError },
32}
33
34pub trait StreamBackend: Send + Sync + 'static {
35 fn create_group(
36 &self,
37 stream: &str,
38 group: &str,
39 start_id: &str,
40 ) -> impl Future<Output = Result<(), EventBusError>> + Send;
41
42 fn publish(
43 &self,
44 stream: &str,
45 message: Message,
46 ) -> impl Future<Output = Result<String, EventBusError>> + Send;
47
48 fn reclaim_idle(
49 &self,
50 stream: &str,
51 group: &str,
52 consumer: &str,
53 min_idle: Duration,
54 count: usize,
55 ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
56
57 fn read_new(
58 &self,
59 stream: &str,
60 group: &str,
61 consumer: &str,
62 count: usize,
63 timeout: Duration,
64 ) -> impl Future<Output = Result<Vec<FetchedEntry>, EventBusError>> + Send;
65
66 fn ack(
67 &self,
68 stream: &str,
69 group: &str,
70 message_id: &str,
71 ) -> impl Future<Output = Result<(), EventBusError>> + Send;
72
73 /// Batch-acknowledge multiple message IDs in a single round-trip.
74 ///
75 /// Redis Streams `XACK` accepts N IDs in one command, turning what would
76 /// be N RTTs into one. Backends without native batch support can rely on
77 /// the default implementation (a serial loop over [`Self::ack`]), but the point
78 /// of this method is to let the Redis backend collapse the round-trips.
79 ///
80 /// # Contract
81 /// - Returning `Ok(())` means the whole batch was accepted by the server.
82 /// It does *not* prove every ID existed in the PEL — Redis silently
83 /// ignores unknown IDs. Callers must treat this as at-least-once.
84 /// - An `Err` signals the batch did not reach the server, so the caller
85 /// should surface the same error to every waiter; messages will be
86 /// re-claimed on the next cycle.
87 fn ack_many(
88 &self,
89 stream: &str,
90 group: &str,
91 message_ids: &[String],
92 ) -> impl Future<Output = Result<(), EventBusError>> + Send {
93 async move {
94 for id in message_ids {
95 self.ack(stream, group, id).await?;
96 }
97 Ok(())
98 }
99 }
100
101 /// Drop any per-(stream, group, consumer) state cached inside the backend
102 /// (e.g., XAUTOCLAIM cursors). Called by [`crate::stream::StreamBus`] on subscription
103 /// shutdown so backends do not accumulate cursor entries indefinitely
104 /// under churn (auto-generated consumer names, restarting pods, etc.).
105 ///
106 /// The default impl is a no-op; backends without per-consumer state can
107 /// ignore it.
108 #[allow(unused_variables)]
109 fn forget_consumer(
110 &self,
111 stream: &str,
112 group: &str,
113 consumer: &str,
114 ) -> impl Future<Output = ()> + Send {
115 async {}
116 }
117}
118
119pub(crate) type SharedBackend<B> = Arc<B>;