1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use crate::error::{Result, ShoveError};
use crate::handler::MessageHandler;
use crate::topic::{SequencedTopic, Topic};
/// Default maximum message payload size: 10 MiB.
pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
/// Default handler timeout: 30 seconds.
pub const DEFAULT_HANDLER_TIMEOUT: Duration = Duration::from_secs(30);
/// Default per-key pending buffer limit for sequenced consumers.
pub const DEFAULT_MAX_PENDING_PER_KEY: usize = 1_000;
/// Validates that `len` does not exceed the optional `max` limit.
///
/// Used by [`ConsumerOptions::validate_payload_message_size`] and directly by
/// backends that destructure options before entering reconnect closures.
pub(crate) fn validate_message_size(len: usize, max: Option<usize>) -> Result<()> {
match max {
Some(max) if len > max => Err(ShoveError::Validation(format!(
"message size {len} exceeds max_message_size {max}"
))),
_ => Ok(()),
}
}
/// Options for consumer behavior.
#[derive(Clone)]
pub struct ConsumerOptions {
/// Maximum retries before automatically rejecting to DLQ.
///
/// Each time a handler returns [`Outcome::Retry`](crate::Outcome::Retry),
/// the retry counter increments and the message is routed to a hold queue
/// selected by `hold_queues[min(retry_count, len - 1)]`. This gives
/// escalating backoff when multiple hold queues are defined — once the
/// counter exceeds the number of hold queues, retries keep using the
/// last (longest-delay) hold queue.
///
/// When `retry_count >= max_retries`, the message is sent to the DLQ
/// instead of another hold queue.
///
/// # Example
///
/// With `max_retries = 5` and hold queues `[1s, 5s, 30s]`:
///
/// | retry | hold queue | delay |
/// |-------|-----------|-------|
/// | 0 | `[0]` | 1s |
/// | 1 | `[1]` | 5s |
/// | 2 | `[2]` | 30s |
/// | 3 | `[2]` | 30s |
/// | 4 | `[2]` | 30s |
/// | 5 | — DLQ — | |
pub max_retries: u32,
/// Prefetch count (number of unacked messages the broker will deliver).
pub prefetch_count: u16,
/// Cancellation token for graceful shutdown. When triggered, the consumer
/// finishes processing the current message, acks it, and returns `Ok(())`.
pub shutdown: CancellationToken,
/// Flag that indicates the consumer is currently processing a message.
/// Used by the autoscaler to avoid scaling down busy consumers.
pub processing: Arc<AtomicBool>,
/// Maximum time a handler may spend processing a single message.
/// If the handler exceeds this duration the message is retried.
///
/// Default: [`DEFAULT_HANDLER_TIMEOUT`] (30 s). Set to `None` to disable.
pub handler_timeout: Option<Duration>,
/// Maximum number of locally buffered messages per sequence key in
/// concurrent-sequenced consumers. When the limit is reached, new
/// deliveries for that key are rejected to the DLQ.
///
/// Default: [`DEFAULT_MAX_PENDING_PER_KEY`] (1 000). Set to `None` to
/// disable.
pub max_pending_per_key: Option<usize>,
/// Maximum allowed message payload size in bytes. Messages exceeding this
/// limit are rejected to the DLQ (or discarded in DLQ consumers) **before**
/// deserialization, preventing JSON-bomb OOM attacks.
///
/// Default: [`DEFAULT_MAX_MESSAGE_SIZE`] (10 MiB). Set to `None` to
/// disable the check.
pub max_message_size: Option<usize>,
/// Enable exactly-once delivery via AMQP transactions (RabbitMQ only).
///
/// Requires the `rabbitmq-transactional` Cargo feature. When enabled, the
/// consumer channel is put in AMQP transaction mode (`tx_select`). Every
/// routing decision (retry, defer, ack, reject) is wrapped in a `tx_commit`,
/// making publish-to-hold-queue and ack/nack of the original delivery
/// **atomic**. This eliminates the publish-then-ack race that can produce a
/// duplicate delivery under at-least-once semantics.
///
/// **Trade-off**: AMQP transactions disable publisher confirms and add a
/// round-trip per message. Expect roughly 10–15× lower throughput per channel
/// compared to the default confirm mode. Use [`ConsumerOptions::with_exactly_once`]
/// to opt in.
#[cfg(feature = "rabbitmq-transactional")]
pub exactly_once: bool,
/// Number of messages to request per SQS `ReceiveMessage` poll, independent
/// of how many handlers may run concurrently (`prefetch_count`).
///
/// When non-zero, the SQS consumer fetches this many messages per API call
/// and buffers them locally, dispatching them to handlers one-by-one (serial
/// mode) or in parallel (concurrent mode) up to `prefetch_count` at a time.
/// This allows batching SQS receives even in non-concurrent mode, reducing
/// API call overhead significantly when multiple consumers share the same queue.
///
/// Zero means "use `prefetch_count`" (the default).
#[cfg(feature = "aws-sns-sqs")]
pub(crate) receive_batch_size: u16,
/// Override for JetStream `max_ack_pending` on the durable consumer.
///
/// When multiple consumer tasks share a single JetStream durable consumer
/// (as in consumer groups), `max_ack_pending` must account for the total
/// in-flight capacity across all tasks — not just the per-task prefetch.
/// `None` means use `prefetch_count` (the default for standalone consumers).
#[cfg(feature = "nats")]
pub(crate) max_ack_pending: Option<i64>,
}
impl ConsumerOptions {
/// Create consumer options with the given shutdown token.
/// Uses defaults: `max_retries = 10`, `prefetch_count = 10`.
pub fn new(shutdown: CancellationToken) -> Self {
Self {
max_retries: 10,
prefetch_count: 10,
shutdown,
processing: Arc::new(AtomicBool::new(false)),
handler_timeout: Some(DEFAULT_HANDLER_TIMEOUT),
max_pending_per_key: Some(DEFAULT_MAX_PENDING_PER_KEY),
max_message_size: Some(DEFAULT_MAX_MESSAGE_SIZE),
#[cfg(feature = "rabbitmq-transactional")]
exactly_once: false,
#[cfg(feature = "aws-sns-sqs")]
receive_batch_size: 0,
#[cfg(feature = "nats")]
max_ack_pending: None,
}
}
/// Set the maximum number of retries before rejecting to DLQ.
///
/// This controls the total retry budget, independent of how many hold
/// queues are configured. See [`max_retries`](Self::max_retries) for
/// details on how retries escalate through hold queues.
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
/// Set the prefetch count (number of unacked messages the broker will deliver).
pub fn with_prefetch_count(mut self, prefetch_count: u16) -> Self {
self.prefetch_count = prefetch_count;
self
}
/// Set the handler timeout. If a handler exceeds this duration the message
/// is retried automatically.
pub fn with_handler_timeout(mut self, timeout: Duration) -> Self {
self.handler_timeout = Some(timeout);
self
}
/// Disable the handler timeout entirely (handlers may run indefinitely).
pub fn without_handler_timeout(mut self) -> Self {
self.handler_timeout = None;
self
}
/// Set the maximum number of locally buffered messages per sequence key.
/// When exceeded, new deliveries for that key are rejected to the DLQ.
pub fn with_max_pending_per_key(mut self, limit: usize) -> Self {
self.max_pending_per_key = Some(limit);
self
}
/// Disable the per-key pending buffer limit entirely (unbounded).
pub fn without_max_pending_per_key(mut self) -> Self {
self.max_pending_per_key = None;
self
}
/// Set the maximum allowed message payload size in bytes.
/// Messages exceeding this limit are rejected before deserialization.
pub fn with_max_message_size(mut self, max: usize) -> Self {
self.max_message_size = Some(max);
self
}
/// Disable the message size limit entirely.
pub fn without_message_size_limit(mut self) -> Self {
self.max_message_size = None;
self
}
/// Returns `Ok(())` if the payload is within the configured
/// [`max_message_size`](Self::max_message_size), or an error if it
/// exceeds the limit. Always succeeds when no limit is set.
pub(crate) fn validate_payload_message_size(&self, len: usize) -> Result<()> {
validate_message_size(len, self.max_message_size)
}
/// Enable exactly-once delivery via AMQP transactions.
///
/// Requires the `rabbitmq-transactional` Cargo feature. See
/// [`ConsumerOptions::exactly_once`] for the full trade-off description.
///
/// # Example
/// ```rust,ignore
/// let options = ConsumerOptions::new(shutdown)
/// .with_exactly_once();
/// ```
#[cfg(feature = "rabbitmq-transactional")]
pub fn with_exactly_once(mut self) -> Self {
self.exactly_once = true;
self
}
}
/// Consume messages from a topic's queues.
///
/// This trait is intentionally **not object-safe** — methods are generic over
/// `T: Topic`. Backends are always concrete types (e.g., `RabbitMqConsumer`),
/// not `dyn Consumer`. For test doubles, implement the trait on a mock struct
/// or use an in-memory backend.
pub trait Consumer: Send + Sync + 'static {
/// Run the consumer loop — the default mode. Blocks until shutdown signal.
///
/// Processes up to `prefetch_count` messages concurrently within the same
/// consumer task, while **always acknowledging messages in delivery order**.
/// Set `prefetch_count = 1` for sequential processing.
///
/// This significantly improves throughput for handlers with I/O latency
/// (HTTP calls, database queries, etc.) without requiring additional
/// consumer instances.
fn run<T: Topic>(
&self,
handler: impl MessageHandler<T>,
options: ConsumerOptions,
) -> impl Future<Output = Result<()>> + Send;
/// Run the consumer loop with FIFO (per-key ordered) delivery.
/// Blocks until shutdown signal.
///
/// Messages sharing the same sequence key are delivered in strict order.
/// Different sequence keys are independent and may be processed concurrently
/// within the same shard.
///
/// Each shard prefetches up to `ConsumerOptions::prefetch_count` messages.
/// Messages for idle keys are processed immediately; messages for busy keys
/// (in-flight handler or awaiting retry) are buffered locally and drained
/// sequentially when the key becomes free. This avoids redelivery storms
/// while consuming prefetch slots as natural back-pressure.
///
/// Returns `Err(ShoveError::Topology)` if `T::topology().sequencing` is `None`.
fn run_fifo<T: SequencedTopic>(
&self,
handler: impl MessageHandler<T>,
options: ConsumerOptions,
) -> impl Future<Output = Result<()>> + Send;
/// Run a DLQ consumer loop for the topic. Blocks until shutdown signal.
///
/// Calls `handler.handle_dead()` for each message, then always acks.
///
/// Returns `Err(ShoveError::Topology)` if `T::topology().dlq` is `None`.
fn run_dlq<T: Topic>(
&self,
handler: impl MessageHandler<T>,
) -> impl Future<Output = Result<()>> + Send;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn defaults_are_correct() {
let opts = ConsumerOptions::new(CancellationToken::new());
assert_eq!(opts.max_retries, 10);
assert_eq!(opts.prefetch_count, 10);
assert_eq!(opts.handler_timeout, Some(DEFAULT_HANDLER_TIMEOUT));
assert_eq!(opts.max_pending_per_key, Some(DEFAULT_MAX_PENDING_PER_KEY));
assert_eq!(opts.max_message_size, Some(DEFAULT_MAX_MESSAGE_SIZE));
assert!(!opts.processing.load(std::sync::atomic::Ordering::Acquire));
}
#[test]
fn with_max_retries_overrides() {
let opts = ConsumerOptions::new(CancellationToken::new()).with_max_retries(5);
assert_eq!(opts.max_retries, 5);
}
#[test]
fn with_prefetch_count_overrides() {
let opts = ConsumerOptions::new(CancellationToken::new()).with_prefetch_count(50);
assert_eq!(opts.prefetch_count, 50);
}
#[test]
fn with_handler_timeout_sets_timeout() {
let opts = ConsumerOptions::new(CancellationToken::new())
.with_handler_timeout(Duration::from_secs(30));
assert_eq!(opts.handler_timeout, Some(Duration::from_secs(30)));
}
#[test]
fn builder_chains() {
let opts = ConsumerOptions::new(CancellationToken::new())
.with_max_retries(3)
.with_prefetch_count(20)
.with_handler_timeout(Duration::from_secs(5))
.with_max_pending_per_key(100)
.with_max_message_size(5 * 1024 * 1024);
assert_eq!(opts.max_retries, 3);
assert_eq!(opts.prefetch_count, 20);
assert_eq!(opts.handler_timeout, Some(Duration::from_secs(5)));
assert_eq!(opts.max_pending_per_key, Some(100));
assert_eq!(opts.max_message_size, Some(5 * 1024 * 1024));
}
#[test]
fn shutdown_token_propagated() {
let token = CancellationToken::new();
let opts = ConsumerOptions::new(token.clone());
assert!(!opts.shutdown.is_cancelled());
token.cancel();
assert!(opts.shutdown.is_cancelled());
}
#[test]
fn with_max_pending_per_key_sets_value() {
let opts = ConsumerOptions::new(CancellationToken::new()).with_max_pending_per_key(50);
assert_eq!(opts.max_pending_per_key, Some(50));
}
#[test]
fn with_max_message_size_overrides_default() {
let opts =
ConsumerOptions::new(CancellationToken::new()).with_max_message_size(1024 * 1024);
assert_eq!(opts.max_message_size, Some(1024 * 1024));
}
#[test]
fn without_message_size_limit_disables_check() {
let opts = ConsumerOptions::new(CancellationToken::new()).without_message_size_limit();
assert_eq!(opts.max_message_size, None);
}
#[cfg(feature = "rabbitmq-transactional")]
#[test]
fn exactly_once_defaults_to_false() {
let opts = ConsumerOptions::new(CancellationToken::new());
assert!(!opts.exactly_once);
}
#[cfg(feature = "rabbitmq-transactional")]
#[test]
fn with_exactly_once_sets_flag() {
let opts = ConsumerOptions::new(CancellationToken::new()).with_exactly_once();
assert!(opts.exactly_once);
}
#[cfg(feature = "rabbitmq-transactional")]
#[test]
fn exactly_once_chains_with_other_builders() {
let opts = ConsumerOptions::new(CancellationToken::new())
.with_max_retries(5)
.with_exactly_once()
.with_prefetch_count(1);
assert!(opts.exactly_once);
assert_eq!(opts.max_retries, 5);
assert_eq!(opts.prefetch_count, 1);
}
}