kiomq 0.2.0

An all-in-one task-queue and orchestration library for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
use std::sync::Arc;

use crate::{
    error::QueueError, BackOffJobOptions, FailedDetails, JobMetrics, JobState, JobToken,
    RemoveOnCompletionOrFailure, Repeat, Trace,
};
#[cfg(feature = "redis-store")]
use redis::{FromRedisValue, ParsingError, ToRedisArgs, ToSingleRedisArg, Value};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
/// The outcome of a single processor invocation.
pub enum ProcessedResult<R> {
    /// The processor returned an error.
    Failed(FailedDetails),
    #[debug("{_1:?}")]
    /// The processor succeeded, returning a value and timing metrics.
    Success(R, JobMetrics),
}
/// A typed field update applied to a job record in the store.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
pub enum JobField<R> {
    /// Worker lock token.
    Token(JobToken),
    /// Processor outcome (success value or failure details).
    Payload(ProcessedResult<R>),
    /// Unix timestamp (µs) when the processor started.
    ProcessedOn(u64),
    /// Unix timestamp (µs) when the job reached a terminal state.
    FinishedOn(u64),
    /// New lifecycle state.
    State(JobState),
    /// Stack-trace entry captured on failure.
    BackTrace(Trace),
}
impl<R> JobField<R> {
    /// Returns the store field name (key) for this variant.
    pub const fn name(&self) -> &'static str {
        match self {
            Self::Token(_) => "token",
            Self::Payload(processed_result) => {
                if let ProcessedResult::Success(_, _) = processed_result {
                    "returnedValue"
                } else {
                    "failedReason"
                }
            }
            Self::ProcessedOn(_) => "processedOn",
            Self::FinishedOn(_) => "finishedOn",
            Self::State(_) => "state",
            Self::BackTrace(_) => "stackTrace",
        }
    }
}

use derive_more::{Debug, Display};
/// Identifies a named collection (list, set, sorted-set, hash, or key) in the
/// backing store.
///
/// Each queue owns a set of collections whose keys are formed as
/// `{prefix}:{name}:{suffix}`.  The suffix comes from this enum's `Display`
/// implementation via [`CollectionSuffix::to_collection_name`].
#[derive(Display, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
pub enum CollectionSuffix {
    /// List of jobs that are ready to be processed.
    Active,
    /// Sorted set of completed jobs ordered by finish time.
    Completed,
    /// Sorted set of delayed jobs ordered by scheduled time.
    Delayed,
    /// Set of jobs whose lock has expired (stalled).
    Stalled,
    /// Sorted set of high-priority jobs waiting for a worker slot.
    Prioritized,
    /// Hash storing the monotonically-increasing priority counter.
    PriorityCounter,
    /// Hash storing the auto-increment job ID counter.
    Id,
    /// Hash storing queue metadata (processing count, pause flag, etc.).
    Meta,
    /// The event stream or pub-sub channel for this queue.
    Events,
    /// List of jobs waiting to be picked up by a worker.
    Wait,
    /// List of jobs held while the queue is paused.
    Paused,
    /// Sorted set of permanently failed jobs ordered by failure time.
    Failed,
    /// Sentinel marker used internally for queue state signalling.
    Marker,
    /// The hash that stores all fields for a single job.
    #[display("{_0}")]
    Job(u64),
    /// The queue's top-level prefix key.
    #[display("")]
    Prefix,
    /// The distributed lock key for a specific job.
    #[display("{_0}:lock")]
    Lock(u64),
    /// Key storing the last stall-check timestamp.
    #[display("stalled_check")]
    StalledCheck,
    /// Key storing serialised metrics for a specific worker.
    #[display("worker_metrics")]
    WorkerMetrics,
}

impl CollectionSuffix {
    /// Builds the full collection key as `{prefix}:{name}:{self}` (lowercased).
    #[must_use]
    pub fn to_collection_name(&self, prefix: &str, name: &str) -> String {
        format!("{}:{}:{}", prefix, name, &self).to_lowercase()
    }
    /// create an identifier for this enum
    const fn discriminant(&self) -> u8 {
        match self {
            Self::Active => 1,
            Self::Completed => 2,
            Self::Delayed => 3,
            Self::Stalled => 4,
            Self::Prioritized => 5,
            Self::PriorityCounter => 6,
            Self::Id => 7,
            Self::Meta => 8,
            Self::Events => 9,
            Self::Wait => 10,
            Self::Paused => 11,
            Self::Failed => 12,
            Self::Marker => 13,
            Self::Job(_) => 14,
            Self::Prefix => 15,
            Self::Lock(_) => 16,
            Self::StalledCheck => 17,
            Self::WorkerMetrics => 18,
        }
    }
    /// Encodes this variant as a compact `u64` tag.
    ///
    /// The top 8 bits identify the variant and the lower 56 bits hold any
    /// payload (job ID, UUID fragment, etc.).  Used for O(1) membership checks
    /// in in-memory sets.
    #[must_use]
    pub fn tag(&self) -> u64 {
        let top = u64::from(self.discriminant()) << 56; // high 8 bits for variant id
        match self {
            // Fieldless variants → just top bits
            Self::Active
            | Self::Completed
            | Self::Delayed
            | Self::Stalled
            | Self::Prioritized
            | Self::PriorityCounter
            | Self::Id
            | Self::Meta
            | Self::Events
            | Self::Wait
            | Self::Paused
            | Self::Failed
            | Self::Marker
            | Self::Prefix
            | Self::StalledCheck
            | Self::WorkerMetrics => top,

            // Tagged variants → combine variant id + payload in lower 56 bits
            Self::Job(id) | Self::Lock(id) => top | (id & 0x00FF_FFFF_FFFF_FFFF),
        }
    }
    /// Returns the tag as a big-endian byte array.
    #[must_use]
    pub fn to_bytes(&self) -> [u8; 8] {
        self.tag().to_be_bytes()
    }
    /// Decodes a tag produced by [`CollectionSuffix::tag`] back into the
    /// corresponding enum variant, or `None` if the discriminant is unknown.
    #[must_use]
    pub const fn from_tag(tag: u64) -> Option<Self> {
        let disc = (tag >> 56) as u8;
        let payload = tag & 0x00FF_FFFF_FFFF_FFFF;

        Some(match disc {
            1 => Self::Active,
            2 => Self::Completed,
            3 => Self::Delayed,
            4 => Self::Stalled,
            5 => Self::Prioritized,
            6 => Self::PriorityCounter,
            7 => Self::Id,
            8 => Self::Meta,
            9 => Self::Events,
            10 => Self::Wait,
            11 => Self::Paused,
            12 => Self::Failed,
            13 => Self::Marker,
            14 => Self::Job(payload),
            15 => Self::Prefix,
            16 => Self::Lock(payload),
            17 => Self::StalledCheck,
            _ => return None,
        })
    }
}
impl From<JobState> for CollectionSuffix {
    fn from(val: JobState) -> Self {
        match val {
            JobState::Wait => Self::Wait,
            JobState::Stalled | JobState::Paused => Self::Paused,
            JobState::Active | JobState::Resumed => Self::Active,
            JobState::Completed => Self::Completed,
            JobState::Failed => Self::Failed,
            JobState::Delayed => Self::Delayed,
            JobState::Progress => Self::Prefix,
            JobState::Prioritized => Self::Prioritized,
            JobState::Processing => Self::Meta,
            JobState::Obliterated => Self::Events,
        }
    }
}

#[cfg(feature = "redis-store")]
use redis::RedisWrite;
#[cfg(feature = "redis-store")]
impl ToRedisArgs for CollectionSuffix {
    fn write_redis_args<W>(&self, out: &mut W)
    where
        W: ?Sized + RedisWrite,
    {
        out.write_arg_fmt(self.to_string().to_lowercase());
    }
}
#[cfg(feature = "redis-store")]
impl ToSingleRedisArg for CollectionSuffix {}
#[cfg(feature = "redis-store")]
impl ToSingleRedisArg for QueueEventMode {}
/// Controls how events are published and consumed within a queue.
///
/// Set this via [`QueueOpts::event_mode`].
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, Eq, PartialEq)]
#[repr(u8)]
pub enum QueueEventMode {
    /// Broadcast-only delivery. Listeners that connect after an event is fired
    /// will not receive it.
    PubSub = 1,
    /// Persistent append-only stream (default). New listeners can replay past
    /// events.
    #[default]
    Stream = 0,
}
impl TryFrom<u8> for QueueEventMode {
    type Error = QueueError;

    fn try_from(value: u8) -> Result<Self, Self::Error> {
        match value {
            1 => Ok(Self::PubSub),
            0 => Ok(Self::Stream),
            _ => Err(QueueError::UnKnownEventMode),
        }
    }
}
#[cfg(feature = "redis-store")]
impl FromRedisValue for QueueEventMode {
    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
        let value = if matches!(v, Value::Nil) {
            0
        } else {
            u8::from_redis_value(v)?
        };
        let mode = value.try_into().unwrap_or_default();
        Ok(mode)
    }
}
#[cfg(feature = "redis-store")]
impl ToRedisArgs for QueueEventMode {
    fn write_redis_args<W>(&self, out: &mut W)
    where
        W: ?Sized + redis::RedisWrite,
    {
        let value = *self as u8;
        out.write_arg_fmt(value);
    }
}

/// Specifies how a job should be retried after a failure or completion.
///
/// Passed to [`crate::Queue::retry_job`].
#[derive(Clone, Debug)]
pub enum RetryOptions<'a> {
    /// Retry a failed job using the given backoff options.
    Failed(&'a BackOffJobOptions),
    /// Re-enqueue a job according to a [`Repeat`] policy.
    WithRepeat(&'a Repeat),
}
impl<'a> From<&'a BackOffJobOptions> for RetryOptions<'a> {
    fn from(value: &'a BackOffJobOptions) -> Self {
        RetryOptions::Failed(value)
    }
}
impl<'a> From<&'a Repeat> for RetryOptions<'a> {
    fn from(value: &'a Repeat) -> Self {
        Self::WithRepeat(value)
    }
}
/// Queue-level configuration.
///
/// Pass this to [`crate::Queue::new`] to customise the queue's behaviour.
///
/// # Examples
///
/// ```rust
/// use kiomq::{BackOffJobOptions, BackOffOptions, KeepJobs, QueueEventMode, QueueOpts,
///             RemoveOnCompletionOrFailure};
///
/// let opts = QueueOpts {
///     attempts: 3,
///     default_backoff: Some(BackOffJobOptions::Opts(BackOffOptions {
///         type_: Some("exponential".to_owned()),
///         delay: Some(500),
///     })),
///     remove_on_complete: Some(RemoveOnCompletionOrFailure::Bool(true)),
///     event_mode: Some(QueueEventMode::Stream),
///     ..Default::default()
/// };
/// ```
#[derive(Debug, Clone)]
pub struct QueueOpts {
    /// Policy for removing jobs after they fail.  `None` keeps them forever.
    pub remove_on_fail: Option<RemoveOnCompletionOrFailure>,
    /// Policy for removing jobs after they complete.  `None` keeps them forever.
    pub remove_on_complete: Option<RemoveOnCompletionOrFailure>,
    /// Default number of attempts for jobs that don't specify their own.
    /// Defaults to `1`.
    pub attempts: u64,
    /// Default backoff strategy applied to all jobs in this queue unless
    /// overridden at the job level.
    pub default_backoff: Option<BackOffJobOptions>,
    /// Controls how events are delivered (stream vs pub/sub).
    pub event_mode: Option<QueueEventMode>,
    /// Default repeat policy applied to all jobs unless overridden.
    pub repeat: Option<Repeat>,
}
impl Default for QueueOpts {
    fn default() -> Self {
        Self {
            event_mode: Some(QueueEventMode::default()),
            remove_on_fail: Option::default(),
            remove_on_complete: Option::default(),
            repeat: None,
            attempts: 1,
            default_backoff: None,
        }
    }
}
use crossbeam::atomic::AtomicCell;

/// A shared atomic counter used to track job IDs and other queue counters.
pub type Counter = Arc<AtomicCell<u64>>;
fn create_counter(count: u64) -> Counter {
    Counter::new(count.into())
}
/// A live snapshot of queue state counts.
///
/// Counters are stored as `Arc<AtomicU64>` so they can be cheaply shared and
/// updated across threads. The values are refreshed from the backing store
/// whenever [`crate::Queue::get_metrics`] is called; between calls the counts
/// may be slightly stale.
///
/// Prefer the helper methods like [`all_jobs_completed`](QueueMetrics::all_jobs_completed)
/// and [`is_idle`](QueueMetrics::is_idle) over reading individual fields directly.
#[derive(Debug, Clone, Default)]
pub struct QueueMetrics {
    /// The highest job ID ever assigned in this queue.
    pub last_id: Counter,
    /// Number of jobs currently being processed by workers.
    pub processing: Counter,
    /// Number of jobs in the priority sorted-set waiting to become active.
    pub prioritized: Counter,
    /// Number of jobs currently in the `Active` state.
    pub active: Counter,
    /// Number of jobs in the `Stalled` state pending recovery.
    pub stalled: Counter,
    /// Number of jobs scheduled to run in the future.
    pub delayed: Counter,
    /// Total number of jobs that have completed successfully.
    pub completed: Counter,
    /// Total number of jobs that have permanently failed.
    pub failed: Counter,
    /// Number of jobs in the paused list (queue is paused).
    pub paused: Counter,
    /// Number of jobs waiting to be picked up by a worker.
    pub waiting: Counter,
    /// Whether the queue is currently in the paused state.
    pub is_paused: Arc<AtomicCell<bool>>,
    /// The active event-delivery mode for this queue.
    pub event_mode: Arc<AtomicCell<QueueEventMode>>,
}
impl QueueMetrics {
    /// Returns `true` when every enqueued job has completed.
    ///
    /// Specifically this is `true` when:
    /// - `last_id > 0` (at least one job was ever enqueued),
    /// - `completed == last_id` (all jobs have finished),
    /// - `active == 0`, and
    /// - the queue is otherwise idle (no waiting, delayed, stalled, or
    ///   prioritized jobs and no in-flight workers).
    #[must_use]
    pub fn all_jobs_completed(&self) -> bool {
        let last_id = self.last_id.load();
        last_id > 0 && self.completed.load() == last_id && self.active.load() == 0 && self.is_idle()
    }
    #[allow(clippy::too_many_arguments)]
    /// Constructs a `QueueMetrics` from raw counter values read from the store.
    #[must_use]
    pub fn new(
        last_id: u64,
        processing: u64,
        active: u64,
        stalled: u64,
        completed: u64,
        delayed: u64,
        prioritized: u64,
        paused: u64,
        failed: u64,
        waiting: u64,
        is_paused: bool,
        event_mode: QueueEventMode,
    ) -> Self {
        Self {
            last_id: create_counter(last_id),
            prioritized: create_counter(prioritized),
            processing: create_counter(processing),
            active: create_counter(active),
            stalled: create_counter(stalled),
            completed: create_counter(completed),
            waiting: create_counter(waiting),
            delayed: create_counter(delayed),
            paused: create_counter(paused),
            failed: create_counter(failed),
            is_paused: Arc::new(is_paused.into()),
            event_mode: Arc::new(AtomicCell::new(event_mode)),
        }
    }
    /// Atomically replaces all counters with the values from `other`.
    pub fn update(&self, other: &Self) {
        self.paused.swap(other.paused.load());
        self.completed.swap(other.completed.load());
        self.stalled.swap(other.stalled.load());
        self.active.swap(other.active.load());
        self.last_id.swap(other.last_id.load());
        self.delayed.swap(other.delayed.load());
        self.failed.swap(other.failed.load());
        self.waiting.swap(other.waiting.load());
        self.processing.swap(other.processing.load());
        self.prioritized.swap(other.prioritized.load());
        self.event_mode.swap(other.event_mode.load());
    }
    /// Returns `true` if there are delayed jobs ready or waiting to run.
    #[must_use]
    pub fn has_delayed(&self) -> bool {
        self.delayed.load() > 0
    }
    /// Returns `true` if there are jobs waiting to be picked up by a worker.
    #[must_use]
    pub fn queue_has_work(&self) -> bool {
        self.waiting.load() > 0
            || self.delayed.load() > 0
            || self.stalled.load() > 0
            || self.prioritized.load() > 0
    }
    /// Returns `true` if the queue is currently in the paused state.
    #[must_use]
    pub fn queue_is_paused(&self) -> bool {
        self.is_paused.load()
    }
    /// Returns `true` when no workers are currently processing a job.
    #[must_use]
    pub fn workers_idle(&self) -> bool {
        self.processing.load() == 0
    }
    /// Returns `true` if at least one job is in the active state.
    #[must_use]
    pub fn has_active_jobs(&self) -> bool {
        self.active.load() > 0
    }
    /// Returns `true` when the queue is in a fully quiescent state:
    /// no work waiting, no active jobs, and no workers are processing.
    ///
    /// Also requires that `last_id > 0` (i.e. at least one job was ever enqueued).
    #[must_use]
    pub fn is_idle(&self) -> bool {
        !self.queue_has_work()
            && !self.has_active_jobs()
            && self.workers_idle()
            && self.last_id.load() > 0
    }
    /// Resets all counters to zero (equivalent to a freshly created queue).
    pub fn clear(&self) {
        let default = Self::default();
        self.update(&default);
    }
}