1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#[derive(Debug, Clone)]
pub struct ProducerConfig {
pub queue_name: String,
pub pool_size: usize,
pub max_stream_len: u64,
pub max_delay_secs: u64,
}
impl Default for ProducerConfig {
fn default() -> Self {
Self {
queue_name: "default".to_string(),
pool_size: 8,
max_stream_len: 1_000_000,
max_delay_secs: 30 * 24 * 3600,
}
}
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
pub multiplier: f64,
pub jitter_ms: u64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
initial_backoff_ms: 100,
max_backoff_ms: 30_000,
multiplier: 2.0,
jitter_ms: 100,
}
}
}
#[derive(Clone)]
pub struct ConsumerConfig {
pub queue_name: String,
pub group: String,
pub consumer_id: String,
pub batch: usize,
pub block_ms: u64,
pub claim_min_idle_ms: u64,
pub concurrency: usize,
pub max_attempts: u32,
pub ack_batch: usize,
pub ack_idle_ms: u64,
pub shutdown_deadline_secs: u64,
pub max_payload_bytes: usize,
pub dlq_inflight: usize,
pub dlq_max_stream_len: u64,
pub retry: RetryConfig,
pub retry_inflight: usize,
pub delayed_enabled: bool,
pub delayed_poll_interval_ms: u64,
pub delayed_promote_batch: usize,
pub delayed_max_stream_len: u64,
pub delayed_lock_ttl_secs: u64,
/// Whether the engine writes to the per-queue events stream
/// (`{chasqui:<queue>}:events`). Sibling to `MetricsSink`: events fire on
/// the same hot-path occurrences but cross-process subscribers can
/// observe them with a plain `XREAD`. Default `true` so the BullMQ
/// `QueueEvents` class works against a default-config consumer; set
/// `false` to skip every events-stream `XADD`.
pub events_enabled: bool,
/// `MAXLEN ~` cap applied to the events stream's `XADD`. Trim is
/// approximate (the `~`) so Redis does it cheaply; expect actual length
/// to oscillate up to a few hundred entries above the cap.
pub events_max_stream_len: u64,
/// Whether `Consumer::run` auto-spawns an embedded [`crate::Scheduler`]
/// task alongside the reader / promoter / relocators. Mirrors
/// `delayed_enabled` for the [`crate::Promoter`]: a worker process that
/// loads the consumer also gets repeatable / cron firing for free,
/// without the user managing a second task. Default `true`. Set to
/// `false` for deployments that run a separate scheduler process —
/// the standalone [`crate::Scheduler`] API is unaffected. Multiple
/// in-process schedulers cooperate via the engine's existing leader
/// election (`SET NX EX` on `{chasqui:<queue>}:scheduler:lock`).
pub run_scheduler: bool,
/// Configuration for the embedded scheduler when `run_scheduler` is
/// `true`. The `queue_name` field is overridden from
/// `ConsumerConfig::queue_name` at spawn time; everything else
/// (`tick_interval_ms`, `batch`, `max_stream_len`, `lock_ttl_secs`,
/// `holder_id`, `metrics`) is forwarded as-is. Defaults to
/// [`SchedulerConfig::default`].
pub scheduler: SchedulerConfig,
/// Opt-in result backend. When `true`, the engine writes each handler's
/// non-empty `Bytes` return value to a per-job result key
/// (`{chasqui:<queue>}:result:<job_id>`) with TTL `result_ttl_secs`,
/// readable via [`crate::Producer::get_result`]. The write is gated on
/// the same XACKDEL inside a single Lua round trip — no orphan results
/// when CLAIM removed the entry first. Default `false` so the BullMQ-
/// style "discard handler return" default holds and the hot path stays
/// a plain batched XACKDEL for users who never call `get_result`.
pub store_results: bool,
/// TTL applied to result keys when `store_results = true`. Default
/// `3600` (one hour). `Producer::get_result` returns `None` for
/// expired keys (indistinguishable from "never existed" / "not yet
/// completed"), so set this to comfortably exceed any
/// `wait_for_result` polling timeout your shim uses.
pub result_ttl_secs: u64,
/// Forwarded to the inline promoter the consumer spawns when
/// `delayed_enabled` is true. Defaults to [`crate::metrics::NoopSink`].
pub metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
}
impl std::fmt::Debug for ConsumerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConsumerConfig")
.field("queue_name", &self.queue_name)
.field("group", &self.group)
.field("consumer_id", &self.consumer_id)
.field("batch", &self.batch)
.field("block_ms", &self.block_ms)
.field("claim_min_idle_ms", &self.claim_min_idle_ms)
.field("concurrency", &self.concurrency)
.field("max_attempts", &self.max_attempts)
.field("ack_batch", &self.ack_batch)
.field("ack_idle_ms", &self.ack_idle_ms)
.field("shutdown_deadline_secs", &self.shutdown_deadline_secs)
.field("max_payload_bytes", &self.max_payload_bytes)
.field("dlq_inflight", &self.dlq_inflight)
.field("dlq_max_stream_len", &self.dlq_max_stream_len)
.field("retry", &self.retry)
.field("retry_inflight", &self.retry_inflight)
.field("delayed_enabled", &self.delayed_enabled)
.field("delayed_poll_interval_ms", &self.delayed_poll_interval_ms)
.field("delayed_promote_batch", &self.delayed_promote_batch)
.field("delayed_max_stream_len", &self.delayed_max_stream_len)
.field("delayed_lock_ttl_secs", &self.delayed_lock_ttl_secs)
.field("events_enabled", &self.events_enabled)
.field("events_max_stream_len", &self.events_max_stream_len)
.field("run_scheduler", &self.run_scheduler)
.field("scheduler", &self.scheduler)
.field("store_results", &self.store_results)
.field("result_ttl_secs", &self.result_ttl_secs)
.field("metrics", &"<dyn MetricsSink>")
.finish()
}
}
impl Default for ConsumerConfig {
fn default() -> Self {
Self {
queue_name: "default".to_string(),
group: "default".to_string(),
consumer_id: format!("c-{}", uuid::Uuid::new_v4()),
batch: 64,
block_ms: 5_000,
claim_min_idle_ms: 30_000,
concurrency: 100,
max_attempts: 3,
ack_batch: 256,
ack_idle_ms: 5,
shutdown_deadline_secs: 30,
max_payload_bytes: 1_048_576,
dlq_inflight: 32,
dlq_max_stream_len: 100_000,
retry: RetryConfig::default(),
retry_inflight: 64,
delayed_enabled: true,
delayed_poll_interval_ms: 100,
delayed_promote_batch: 256,
delayed_max_stream_len: 1_000_000,
delayed_lock_ttl_secs: 5,
events_enabled: true,
events_max_stream_len: 100_000,
run_scheduler: true,
scheduler: SchedulerConfig::default(),
store_results: false,
result_ttl_secs: 3600,
metrics: crate::metrics::noop_sink(),
}
}
}
#[derive(Clone)]
pub struct PromoterConfig {
pub queue_name: String,
pub poll_interval_ms: u64,
pub promote_batch: usize,
pub max_stream_len: u64,
pub lock_ttl_secs: u64,
pub holder_id: String,
/// Mirrors [`ConsumerConfig::events_enabled`]: when `true` the promoter
/// writes a `waiting` event to `{chasqui:<queue>}:events` for each job
/// it just promoted from the delayed ZSET into the stream. When the
/// promoter is spawned by `Consumer::run`, this field is forwarded from
/// `ConsumerConfig::events_enabled`.
pub events_enabled: bool,
/// `MAXLEN ~` cap for events-stream `XADD`s. Forwarded from
/// `ConsumerConfig::events_max_stream_len` by the embedded promoter.
pub events_max_stream_len: u64,
/// Receiver for promoter tick / lock-outcome events. Defaults to
/// [`crate::metrics::NoopSink`]; swap in your own [`MetricsSink`] to
/// bridge into Prometheus, OpenTelemetry, etc.
pub metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
}
impl std::fmt::Debug for PromoterConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PromoterConfig")
.field("queue_name", &self.queue_name)
.field("poll_interval_ms", &self.poll_interval_ms)
.field("promote_batch", &self.promote_batch)
.field("max_stream_len", &self.max_stream_len)
.field("lock_ttl_secs", &self.lock_ttl_secs)
.field("holder_id", &self.holder_id)
.field("events_enabled", &self.events_enabled)
.field("events_max_stream_len", &self.events_max_stream_len)
.field("metrics", &"<dyn MetricsSink>")
.finish()
}
}
/// Configuration for the standalone [`crate::Scheduler`] (slice 10).
///
/// The scheduler tails the per-queue repeat ZSET (`{chasqui:<queue>}:repeat`)
/// at `tick_interval_ms`, materializes one fire of each due spec, schedules
/// the resulting job (immediately to the stream or to the delayed ZSET if
/// dispatch should still wait), and updates the spec's next-fire score in
/// the same Lua round trip. Leader-elected via `SET NX EX` on
/// `{chasqui:<queue>}:scheduler:lock` so multiple replicas can hot-spare
/// without double-firing.
#[derive(Clone)]
pub struct SchedulerConfig {
pub queue_name: String,
/// How often the leader drains due specs from the repeat ZSET. Default
/// 1000ms — the lower bound on per-spec fire jitter is roughly this
/// interval (a spec scheduled for 100ms-from-now still has to wait for
/// the next tick to be picked up).
pub tick_interval_ms: u64,
/// Max specs hydrated per tick. Specs beyond this batch wait for the
/// next tick — keeps a single fat tick from monopolizing the leader.
pub batch: usize,
/// `MAXLEN ~` cap forwarded to the script's XADD on the immediate-
/// dispatch path.
pub max_stream_len: u64,
pub lock_ttl_secs: u64,
pub holder_id: String,
/// Forwarded into ack of metrics for tick / lock-outcome events.
/// Defaults to [`crate::metrics::NoopSink`]. The scheduler currently
/// emits the same `LockOutcome` events as the promoter; spec-level
/// metrics (fires per tick, exhaustion events) are intentionally
/// reserved for a follow-up slice.
pub metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
}
impl std::fmt::Debug for SchedulerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SchedulerConfig")
.field("queue_name", &self.queue_name)
.field("tick_interval_ms", &self.tick_interval_ms)
.field("batch", &self.batch)
.field("max_stream_len", &self.max_stream_len)
.field("lock_ttl_secs", &self.lock_ttl_secs)
.field("holder_id", &self.holder_id)
.field("metrics", &"<dyn MetricsSink>")
.finish()
}
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
queue_name: "default".to_string(),
tick_interval_ms: 1_000,
batch: 256,
max_stream_len: 1_000_000,
lock_ttl_secs: 5,
holder_id: format!("s-{}", uuid::Uuid::new_v4()),
metrics: crate::metrics::noop_sink(),
}
}
}
impl Default for PromoterConfig {
fn default() -> Self {
Self {
queue_name: "default".to_string(),
poll_interval_ms: 100,
promote_batch: 256,
max_stream_len: 1_000_000,
lock_ttl_secs: 5,
holder_id: format!("p-{}", uuid::Uuid::new_v4()),
events_enabled: true,
events_max_stream_len: 100_000,
metrics: crate::metrics::noop_sink(),
}
}
}