shove 0.10.4

Async tasks via pubsub on steroids. Comes with built-in support for complex queue configurations, audit logs, autoscaling consumer groups and more.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
//! Hold queue requeuer for delayed message redelivery.
//!
//! Hold queues are modelled as Redis Sorted Sets where:
//!   key   = `{hold_queue_name}:pending`  (see `RedisTopologyDeclarer::hold_set_name`)
//!   score = Unix timestamp in milliseconds at which the entry should be redelivered
//!   value = JSON-serialized `HoldEntry`
//!
//! This task polls each hold set every `POLL_INTERVAL`, moves all entries
//! whose score ≤ now_ms back to the appropriate stream via XADD, and removes
//! them from the set only after successful XADD (at-least-once delivery).
//!
//! ## Concurrent requeuer instances
//!
//! This module provides **at-least-once** redelivery semantics. If two requeuer
//! instances run concurrently (e.g., during a rolling restart), both may XADD
//! the same entry before either ZREM completes, resulting in duplicate delivery.
//! This is expected behaviour: consumers must be idempotent, which is already
//! required by the broader shove at-least-once contract.

use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio_util::sync::CancellationToken;

use super::client::{RedisClient, RedisConnection};
use super::constants::{REQUEUE_BATCH_SIZE, REQUEUE_POLL_MS};
use super::topology::RedisTopologyDeclarer;
use crate::error::{Result, ShoveError};
use crate::metrics::{BackendErrorKind, BackendLabel, record_backend_error};
use crate::retry::Backoff;

// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------

/// Interval between requeue polling ticks.
const POLL_INTERVAL: Duration = Duration::from_millis(REQUEUE_POLL_MS);

// ---------------------------------------------------------------------------
// Data model
// ---------------------------------------------------------------------------

/// A message entry in a hold sorted set, awaiting delayed redelivery.
///
/// Serialized as JSON and stored in a Redis Sorted Set with a score
/// equal to the Unix timestamp (in milliseconds) when the message should
/// be redelivered.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct HoldEntry {
    /// Target stream to XADD back into (the main stream or shard stream).
    pub stream: String,
    /// All fields (payload + metadata) to restore on redeliver, as key-value pairs.
    pub fields: Vec<(String, String)>,
}

// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------

/// Push a message to a hold sorted set for delayed redelivery.
///
/// Stores the entry as JSON in the sorted set keyed by `hold_queue_name`,
/// with a score equal to the current time plus `delay`.
///
/// # Errors
///
/// Returns an error if JSON encoding fails or the Redis ZADD fails.
pub(crate) async fn enqueue_hold(
    conn: &mut RedisConnection,
    hold_queue_name: &str,
    entry: HoldEntry,
    delay: Duration,
) -> Result<()> {
    let set_key = RedisTopologyDeclarer::hold_set_name(hold_queue_name);
    let delay_ms = u64::try_from(delay.as_millis()).unwrap_or(u64::MAX);
    let redeliver_at_ms = now_ms().saturating_add(delay_ms);
    let value = serde_json::to_string(&entry)?;

    let mut cmd = redis::cmd("ZADD");
    cmd.arg(set_key).arg(redeliver_at_ms as f64).arg(&value);

    let _: i64 = conn.query(&mut cmd).await?;
    Ok(())
}

/// Spawn a background requeuer task that periodically drains due entries
/// from all hold sets back to their target streams.
///
/// The task acquires one connection and reuses it across poll ticks. On
/// connection error it backs off with jitter and retries until the shutdown
/// token is cancelled, preventing silent abandonment of hold-queue messages.
/// The task runs until `shutdown` is cancelled.
///
/// **At-least-once semantics:** see module-level documentation for the
/// concurrent-instance duplicate-delivery note.
pub(crate) fn spawn_requeuer(
    client: RedisClient,
    hold_queue_names: Vec<String>,
    shutdown: CancellationToken,
) -> tokio::task::JoinHandle<()> {
    // Pre-compute the sorted-set key for each hold queue once.
    let hold_set_keys: Vec<String> = hold_queue_names
        .iter()
        .map(|n| RedisTopologyDeclarer::hold_set_name(n))
        .collect();

    tokio::spawn(async move {
        let mut conn = match acquire_conn_with_retry(&client, &shutdown).await {
            Some(c) => c,
            None => return,
        };

        loop {
            let mut needs_reconnect = false;
            for (hold_queue_name, set_key) in hold_queue_names.iter().zip(hold_set_keys.iter()) {
                if let Err(e) = poll_hold_set(&mut conn, hold_queue_name, set_key).await {
                    tracing::warn!("requeuer: poll failed for {}: {}", hold_queue_name, e);
                    needs_reconnect = true;
                    break;
                }
            }

            if needs_reconnect {
                match acquire_conn_with_retry(&client, &shutdown).await {
                    Some(c) => {
                        conn = c;
                        continue;
                    }
                    None => break,
                }
            }

            tokio::select! {
                _ = shutdown.cancelled() => break,
                _ = tokio::time::sleep(POLL_INTERVAL) => {}
            }
        }
    })
}

// ---------------------------------------------------------------------------
// Private helpers
// ---------------------------------------------------------------------------

/// Acquire a multiplexed Redis connection, retrying with exponential backoff
/// (1 s → 30 s, full jitter) until the shutdown token is cancelled.
async fn acquire_conn_with_retry(
    client: &RedisClient,
    shutdown: &CancellationToken,
) -> Option<RedisConnection> {
    let mut backoff = Backoff::default();
    loop {
        match client.multiplexed_conn().await {
            Ok(c) => return Some(c),
            Err(e) => {
                if shutdown.is_cancelled() {
                    return None;
                }
                let delay = backoff.next().expect("backoff is infinite");
                tracing::warn!(
                    "requeuer: connection failed ({}), retrying in {:.1}s",
                    e,
                    delay.as_secs_f64()
                );
                tokio::select! {
                    _ = tokio::time::sleep(delay) => {}
                    _ = shutdown.cancelled() => return None,
                }
            }
        }
    }
}

/// Current Unix timestamp in milliseconds, saturating at `u64::MAX`.
fn now_ms() -> u64 {
    u64::try_from(
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap_or_default()
            .as_millis(),
    )
    .unwrap_or(u64::MAX)
}

/// Poll a single hold set for due entries and requeue them.
///
/// Fetches up to `REQUEUE_BATCH_SIZE` entries with score ≤ now_ms,
/// XADDs each to its origin stream, and removes it from the hold set only on
/// success. Corrupt entries (JSON parse failures) are removed from the set to
/// avoid perpetual re-fetch and logged as warnings.
async fn poll_hold_set(
    conn: &mut RedisConnection,
    hold_queue_name: &str,
    set_key: &str,
) -> Result<()> {
    let now = now_ms();

    let entries: Vec<String> = match conn
        .query(
            // ZRANGE with BYSCORE replaces the deprecated ZRANGEBYSCORE (Redis 6.2+).
            redis::cmd("ZRANGE")
                .arg(set_key)
                .arg(0f64)
                .arg(now as f64)
                .arg("BYSCORE")
                .arg("LIMIT")
                .arg(0i64)
                .arg(REQUEUE_BATCH_SIZE),
        )
        .await
    {
        Ok(entries) => entries,
        Err(e) => {
            tracing::error!(
                hold_queue = hold_queue_name,
                set_key = set_key,
                error = %e,
                "requeuer: ZRANGE failed, cannot poll hold set"
            );
            return Err(ShoveError::Connection(format!(
                "ZRANGE failed for hold set '{set_key}': {e}"
            )));
        }
    };

    for raw_json in entries {
        let entry: HoldEntry = match serde_json::from_str(&raw_json) {
            Ok(e) => e,
            Err(e) => {
                tracing::warn!(
                    "requeuer: corrupt hold entry in {} (removing): {}",
                    hold_queue_name,
                    e
                );
                let _: i64 = conn
                    .query(redis::cmd("ZREM").arg(set_key).arg(&raw_json))
                    .await
                    .unwrap_or(0);
                continue;
            }
        };

        let mut cmd = redis::cmd("XADD");
        cmd.arg(&entry.stream).arg("*");
        for (k, v) in &entry.fields {
            cmd.arg(k).arg(v);
        }

        match conn.query::<String>(&mut cmd).await {
            Ok(_) => {
                if let Err(e) = conn
                    .query::<i64>(redis::cmd("ZREM").arg(set_key).arg(&raw_json))
                    .await
                {
                    tracing::warn!(
                        "requeuer: ZREM failed for entry in {}: {}",
                        hold_queue_name,
                        e
                    );
                    record_backend_error(BackendLabel::Redis, BackendErrorKind::Ack);
                }
            }
            Err(e) => {
                tracing::warn!("requeuer: XADD failed for stream {}: {}", entry.stream, e);
            }
        }
    }

    Ok(())
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use crate::error::ShoveError;

    #[test]
    fn hold_entry_roundtrips() {
        use super::super::constants::{PAYLOAD_FIELD, X_RETRY_COUNT};
        let entry = HoldEntry {
            stream: "orders".into(),
            fields: vec![
                (PAYLOAD_FIELD.into(), "{}".into()),
                (X_RETRY_COUNT.into(), "1".into()),
            ],
        };
        let json = serde_json::to_string(&entry).unwrap();
        let decoded: HoldEntry = serde_json::from_str(&json).unwrap();
        assert_eq!(decoded.stream, "orders");
        assert_eq!(decoded.fields[0], (PAYLOAD_FIELD.into(), "{}".into()));
        assert_eq!(decoded.fields[1], (X_RETRY_COUNT.into(), "1".into()));
    }

    #[test]
    fn now_ms_is_nonzero() {
        assert!(now_ms() > 0);
    }

    #[test]
    fn hold_entry_with_empty_fields_roundtrips() {
        let entry = HoldEntry {
            stream: "my-stream".into(),
            fields: vec![],
        };
        let json = serde_json::to_string(&entry).unwrap();
        let decoded: HoldEntry = serde_json::from_str(&json).unwrap();
        assert_eq!(decoded.stream, "my-stream");
        assert!(decoded.fields.is_empty());
    }

    #[test]
    fn hold_entry_fields_order_preserved() {
        // JSON round-trip must preserve insertion order of fields.
        let entry = HoldEntry {
            stream: "order-stream".into(),
            fields: vec![
                ("alpha".into(), "1".into()),
                ("beta".into(), "2".into()),
                ("gamma".into(), "3".into()),
            ],
        };
        let json = serde_json::to_string(&entry).unwrap();
        let decoded: HoldEntry = serde_json::from_str(&json).unwrap();
        assert_eq!(decoded.fields[0], ("alpha".into(), "1".into()));
        assert_eq!(decoded.fields[1], ("beta".into(), "2".into()));
        assert_eq!(decoded.fields[2], ("gamma".into(), "3".into()));
    }

    #[test]
    fn now_ms_is_positive_and_recent() {
        let before = now_ms();
        let after = now_ms();
        // Monotonically non-decreasing between two consecutive calls.
        assert!(after >= before);
        // Should be in the range of a plausible Unix timestamp (year 2020+).
        // 2020-01-01 00:00:00 UTC in ms = 1_577_836_800_000
        assert!(
            before > 1_577_836_800_000u64,
            "timestamp too small: {before}"
        );
    }

    // -----------------------------------------------------------------------
    // Fix #14 — ZRANGE error propagation
    // -----------------------------------------------------------------------

    #[test]
    fn zrange_error_is_connection_variant() {
        // poll_hold_set wraps ZRANGE failures as ShoveError::Connection so that
        // spawn_requeuer's error branch fires and sets needs_reconnect = true.
        let set_key = "orders-hold-5s:pending";
        let err = ShoveError::Connection(format!(
            "ZRANGE failed for hold set '{set_key}': connection refused"
        ));
        assert!(
            matches!(err, ShoveError::Connection(_)),
            "ZRANGE error must be ShoveError::Connection"
        );
    }

    #[test]
    fn zrange_error_message_contains_set_key_and_cause() {
        // The error message must include both the hold set name (operator context)
        // and the original Redis error (root-cause diagnosis).
        let set_key = "orders-hold-5s:pending";
        let cause = "connection timed out";
        let msg = format!("ZRANGE failed for hold set '{set_key}': {cause}");
        assert!(
            msg.contains(set_key),
            "error message must name the hold set; got: {msg}"
        );
        assert!(
            msg.contains(cause),
            "error message must preserve the original error; got: {msg}"
        );
    }

    // -----------------------------------------------------------------------
    // Fix #24 — acquire_conn_with_retry backoff contract
    // -----------------------------------------------------------------------

    #[test]
    fn backoff_is_infinite_for_retry_loop() {
        // acquire_conn_with_retry calls backoff.next().expect("backoff is infinite").
        // Verify that Backoff::default() never yields None — even after the delay
        // reaches the 30 s cap it keeps returning Some(delay).
        let delays: Vec<_> = Backoff::default().take(500).collect();
        assert_eq!(
            delays.len(),
            500,
            "Backoff must never return None; the .expect() in acquire_conn_with_retry would panic"
        );
    }

    #[test]
    fn backoff_default_delay_stays_within_bounds() {
        // acquire_conn_with_retry uses Backoff::default() which has initial=1s, max=30s,
        // factor=0.5. Every yielded delay must be in [0.5s, 45s] (max * 1.5 upper bound).
        let max_expected = std::time::Duration::from_millis(45_000);
        let min_expected = std::time::Duration::from_millis(500);
        for delay in Backoff::default().take(50) {
            assert!(
                delay >= min_expected,
                "delay {delay:?} is below the minimum expected bound"
            );
            assert!(
                delay <= max_expected,
                "delay {delay:?} exceeds the maximum expected bound"
            );
        }
    }
}