1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
use redis::AsyncTypedCommands;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use anyhow::Error;
use crate::{
Job,
runtime::{Runtime, SelectedRuntime},
task::{
Claimer, Queue, QueueBuilder, QueueHandle, Task,
helpers::{ack, parse_message, process_and_ack},
},
};
impl<I, E, F, Fut, DE, DF, DFut> Queue<I, E, F, Fut, DE, DF, DFut>
where
I: Job + Send + Sync + 'static,
F: Fn(I) -> Fut + 'static + Send + Sync,
E: std::fmt::Display + Send + 'static,
Fut: Future<Output = Result<(), E>> + Send,
DF: Fn(I, usize) -> DFut + 'static + Send + Sync,
DE: std::fmt::Display + Send + 'static,
DFut: Future<Output = Result<(), DE>> + Send,
{
/// Construct a new `Queue` from a [`QueueBuilder`].
pub fn new(builder: QueueBuilder<I, E, F, Fut, DE, DF, DFut>) -> Self {
Queue {
name: builder.name,
consumer_group: builder.consumer_group,
consumer_id: builder.consumer_id,
block_timeout: builder.block_timeout,
max_concurrent_tasks: builder.max_concurrent_tasks,
worker: builder.worker,
claimer: builder.claimer.map(Claimer::from),
conn: builder.conn,
read_conn: builder.read_conn,
_marker: builder._marker,
starting_id: builder.starting_id,
}
}
// ── Cross-submodule accessors ───────────────────────────────────────────
//
// `pub(crate)` getters that let other submodules in the crate read
// these fields without widening the field visibility past `pub(super)`.
/// Stream key.
pub(crate) fn name(&self) -> &str {
&self.name
}
/// Consumer group name.
pub(crate) fn consumer_group(&self) -> &str {
&self.consumer_group
}
/// Configured starting ID, if any.
pub(crate) fn starting_id(&self) -> Option<&str> {
self.starting_id.as_deref()
}
/// Clone of the shared (non-blocking) connection.
pub(crate) fn conn(&self) -> redis::aio::MultiplexedConnection {
self.conn.clone()
}
/// Create the consumer group on the stream (via `XGROUP CREATE`).
///
/// Safe to call multiple times — silently ignores the `BUSYGROUP` error
/// if the group already exists.
///
/// Where the group begins reading is controlled by the `starting_id`
/// field on the queue (see [`QueueBuilder::starting_id`]). When unset,
/// it defaults to `"0"` (read from the beginning).
pub async fn init(&self) -> Result<(), Error> {
let mut conn = self.conn();
// Create the consumer group (and the stream itself if it doesn't exist)
let _: () = match conn
.xgroup_create_mkstream(
self.name(),
self.consumer_group(),
self.starting_id().unwrap_or("0"),
)
.await
{
Ok(_) => (),
// BUSYGROUP means the group already exists — safe to ignore
Err(e) => {
if e.to_string().contains("BUSYGROUP") {
()
} else {
return Err(anyhow::anyhow!(e));
}
}
};
Ok(())
}
/// Publish a [`Task`] to the stream via `XADD`.
///
/// The task's payload is serialized to pairs via [`Job::try_to_pairs()`],
/// and each `BulkString` value is written as a raw field.
pub async fn enqueue(&mut self, task: Task<I>) -> Result<(), Error> {
// Serialize the payload struct into (field_name, redis::Value) pairs
let pairs = task
.payload
.try_to_pairs()
.map_err(|e| anyhow::anyhow!(e))?;
// Extract raw bytes from each BulkString value for XADD
let items: Vec<(&str, Vec<u8>)> = pairs
.iter()
.filter_map(|(k, v)| match v {
redis::Value::BulkString(bytes) => Some((k.as_str(), bytes.clone())),
_ => None,
})
.collect();
// Append the message to the stream
self.conn.xadd(&self.name, &task.id, &items).await?;
Ok(())
}
/// Publish multiple [`Task`]s to the stream in a single Redis pipeline.
///
/// Each task is serialized and added via `XADD`. The entire batch is sent
/// as one round-trip, making this significantly more efficient than
/// calling [`enqueue()`](Self::enqueue) in a loop.
pub async fn enqueue_bulk(&mut self, tasks: Vec<Task<I>>) -> Result<(), Error> {
// Pre-serialize all tasks so we can reference their data when building
// the pipeline commands
let serialized: Vec<(String, Vec<(String, Vec<u8>)>)> = tasks
.into_iter()
.map(|task| {
let pairs = task
.payload
.try_to_pairs()
.map_err(|e| anyhow::anyhow!(e))?;
let items: Vec<(String, Vec<u8>)> = pairs
.into_iter()
.filter_map(|(k, v)| match v {
redis::Value::BulkString(bytes) => Some((k, bytes)),
_ => None,
})
.collect();
Ok((task.id, items))
})
.collect::<Result<_, Error>>()?;
let mut pipe = redis::pipe();
for (id, items) in &serialized {
let refs: Vec<(&str, &[u8])> = items
.iter()
.map(|(k, v)| (k.as_str(), v.as_slice()))
.collect();
pipe.xadd(&self.name, id.as_str(), &refs);
}
// Send all XADDs in a single round-trip
pipe.query_async::<()>(&mut self.conn).await?;
Ok(())
}
/// Consume the `Queue` and spawn the consumer (and optional claimer) loops.
///
/// Returns a [`QueueHandle`] that can be used to trigger graceful
/// shutdown. The consumer loop:
///
/// 1. Checks the shutdown flag each iteration.
/// 2. Queries available semaphore permits to determine `COUNT`.
/// 3. Calls `XREADGROUP` with blocking.
/// 4. For each received message, acquires a permit and spawns a task
/// that parses the payload, calls the worker, and `XACK`s on success.
/// 5. On shutdown, drains all in-flight tasks before returning.
///
/// If a [`Claimer`] is configured, a second loop is spawned that uses
/// `XAUTOCLAIM` to reclaim idle messages and retries or dead-letters them.
pub fn run(self) -> QueueHandle {
let shutdown = Arc::new(AtomicBool::new(false));
// Wrap shared state in Arcs so both loops and their spawned tasks
// can reference them without lifetime issues
let name = Arc::new(self.name);
let consumer_group = Arc::new(self.consumer_group);
let consumer_id = Arc::new(self.consumer_id);
let worker = self.worker;
let conn = self.conn;
// Dedicated connection for the blocking XREADGROUP — kept separate
// from `conn` so that blocking the reader does not stall XACKs and
// other non-blocking ops queued on the multiplexer.
let read_conn = self.read_conn;
// ── Main consumer loop ────────────────────────────────────────────────
let main_join = {
// Clone Arcs for the spawned loop task to own
let shutdown_flag = Arc::clone(&shutdown);
let semaphore = SelectedRuntime::new_semaphore(self.max_concurrent_tasks);
let name = Arc::clone(&name);
let consumer_group = Arc::clone(&consumer_group);
let consumer_id = Arc::clone(&consumer_id);
let worker = Arc::clone(&worker);
let conn = conn.clone();
let mut read_conn = read_conn;
let block_timeout = self.block_timeout;
SelectedRuntime::spawn(async move {
use redis::streams::{StreamReadOptions, StreamReadReply};
let mut set = SelectedRuntime::new_task_set();
loop {
// Step 1: Check if shutdown was requested
if shutdown_flag.load(Ordering::Relaxed) {
break;
}
// Step 2: Check how many tasks we can accept right now
let available = SelectedRuntime::available_permits(&semaphore);
if available == 0 {
// All permits in use — block until one frees up, then re-check
SelectedRuntime::wait_for_permit(&semaphore).await;
continue;
}
// Step 3: XREADGROUP — read up to `available` new messages,
// blocking for `block_timeout` ms if none are ready. Uses the
// dedicated `read_conn` so this block does not stall XACKs.
let opts = StreamReadOptions::default()
.count(available)
.block(block_timeout)
.group(consumer_group.as_str(), consumer_id.as_str());
let reply: Option<StreamReadReply> = match read_conn
.xread_options(&[name.as_str()], &[">"], &opts)
.await
{
Ok(r) => r,
Err(e) => {
// An empty stream surfaces the BLOCK timeout as an Err;
// that's expected, not a real failure, so swallow it.
if !e.is_timeout() {
eprintln!("failed to read from stream: {e}");
}
continue;
}
};
// Step 4: Dispatch each message as a concurrent task
if let Some(reply) = reply {
for stream_key in reply.keys {
for message in stream_key.ids {
// Acquire a semaphore permit before spawning — this
// reserves a concurrency slot for the task
let permit = SelectedRuntime::acquire_permit(Arc::clone(&semaphore)).await;
let mut conn = conn.clone();
let name = Arc::clone(&name);
let consumer_group = Arc::clone(&consumer_group);
let worker = Arc::clone(&worker);
SelectedRuntime::spawn_task(&mut set, async move {
// Hold the permit until the task finishes
let _permit = permit;
// Parse stream fields into the payload struct
let Some(input) = parse_message::<I>(message.map) else {
return;
};
// Run the worker and XACK on success
process_and_ack(
input,
worker.as_ref(),
&mut conn,
name.as_str(),
consumer_group.as_str(),
&message.id,
)
.await;
});
}
}
}
}
// Step 5: Shutdown — wait for all in-flight tasks to finish
SelectedRuntime::drain_task_set(&mut set).await;
})
};
// ── Claimer loop ──────────────────────────────────────────────────────
let claimer_join = if let Some(claimer) = self.claimer {
let dlq_worker = claimer.dlq_worker();
let max_retries = claimer.max_retries();
let claimer_block_timeout = claimer.block_timeout();
let min_idle_time = claimer.min_idle_time();
// Clone Arcs for the claimer's own spawned loop
let shutdown_flag = Arc::clone(&shutdown);
let semaphore = SelectedRuntime::new_semaphore(claimer.max_concurrent_tasks());
let name = Arc::clone(&name);
let consumer_group = Arc::clone(&consumer_group);
let consumer_id = Arc::clone(&consumer_id);
let worker = Arc::clone(&worker);
let dlq_worker = dlq_worker.as_ref().map(Arc::clone);
let conn = conn.clone();
Some(SelectedRuntime::spawn(async move {
use redis::streams::{StreamAutoClaimOptions, StreamAutoClaimReply};
let mut set = SelectedRuntime::new_task_set();
loop {
// Step 1: Check if shutdown was requested
if shutdown_flag.load(Ordering::Relaxed) {
break;
}
// Step 2: Backpressure — wait if all concurrency slots are in use
let available = SelectedRuntime::available_permits(&semaphore);
if available == 0 {
SelectedRuntime::wait_for_permit(&semaphore).await;
continue;
}
// Step 3: XAUTOCLAIM — try to claim messages idle for
// longer than `min_idle_time`, starting from ID "0-0"
let mut claim_conn = conn.clone();
let opts = StreamAutoClaimOptions::default().count(available);
let reply: StreamAutoClaimReply = match claim_conn
.xautoclaim_options(
name.as_str(),
consumer_group.as_str(),
consumer_id.as_str(),
min_idle_time,
"0-0",
opts,
)
.await
{
Ok(r) => r,
Err(e) => {
eprintln!("failed to autoclaim from stream: {e}");
// Back off before retrying
SelectedRuntime::sleep(std::time::Duration::from_millis(
claimer_block_timeout as u64,
))
.await;
continue;
}
};
// Nothing to claim — sleep before polling again
if reply.claimed.is_empty() {
SelectedRuntime::sleep(std::time::Duration::from_millis(
claimer_block_timeout as u64,
))
.await;
continue;
}
// Step 4: XPENDING — fetch delivery counts for the claimed
// range so we know how many times each message was attempted
let claimed_ids: Vec<&str> = reply.claimed.iter().map(|m| m.id.as_str()).collect();
let first_id = claimed_ids.first().unwrap().to_string();
let last_id = claimed_ids.last().unwrap().to_string();
let mut pending_conn = conn.clone();
let pending: redis::streams::StreamPendingCountReply = match pending_conn
.xpending_count(
name.as_str(),
consumer_group.as_str(),
&first_id,
&last_id,
reply.claimed.len(),
)
.await
{
Ok(p) => p,
Err(e) => {
eprintln!("failed to get pending info: {e}");
continue;
}
};
// Build id → delivery count lookup for O(1) access per message
let delivery_counts: std::collections::HashMap<&str, usize> = pending
.ids
.iter()
.map(|p| (p.id.as_str(), p.times_delivered))
.collect();
// Step 5: Dispatch each claimed message as a concurrent task
for message in reply.claimed {
let times_delivered = delivery_counts
.get(message.id.as_str())
.copied()
.unwrap_or(1);
let permit = SelectedRuntime::acquire_permit(Arc::clone(&semaphore)).await;
let mut conn = conn.clone();
let name = Arc::clone(&name);
let consumer_group = Arc::clone(&consumer_group);
let worker = Arc::clone(&worker);
let dlq_worker = dlq_worker.clone();
SelectedRuntime::spawn_task(&mut set, async move {
let _permit = permit;
// Parse stream fields into the payload struct
let Some(input) = parse_message::<I>(message.map) else {
return;
};
// Dead-letter path: too many delivery attempts
if times_delivered > max_retries {
// Invoke the DLQ callback if configured
if let Some(dlq) = &dlq_worker {
if let Err(e) = (dlq)(input, times_delivered).await {
eprintln!("dlq worker failed: {e}");
}
}
// XACK to remove from PEL regardless of DLQ outcome
ack(
&mut conn,
name.as_str(),
consumer_group.as_str(),
&message.id,
)
.await;
return;
}
// Normal retry path: run the worker, XACK on success
process_and_ack(
input,
worker.as_ref(),
&mut conn,
name.as_str(),
consumer_group.as_str(),
&message.id,
)
.await;
});
}
}
// Step 6: Shutdown — drain in-flight claimer tasks
SelectedRuntime::drain_task_set(&mut set).await;
}))
} else {
None
};
QueueHandle {
shutdown,
main_join,
claimer_join,
}
}
}