1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
//! Observability hooks for engine internals.
//!
//! The engine emits structured numeric events through the [`MetricsSink`] trait
//! and ships only a no-op default plus an in-memory sink for tests. Concrete
//! Prometheus / OTel adapters live outside this crate so we don't pin downstream
//! users to a specific metrics ecosystem.
//!
//! # Module layout
//!
//! - This module defines the public event types and the [`MetricsSink`] trait.
//! - [`dispatch`] is the engine-internal panic-safe sink invocation helper —
//! not part of the public surface, but documented here because all engine
//! call sites must use it.
//! - [`testing::InMemorySink`] accumulates every event for assertions in
//! integration tests; not intended for production.
//!
//! # Event surface
//!
//! - [`PromoterTick`] / [`LockOutcome`]: delayed-job promoter telemetry.
//! - [`ReaderBatch`]: per-`XREADGROUP` batch shape (size, reclaimed count).
//! - [`JobOutcome`]: per-handler-invocation outcome (Ok / Err / Panic) +
//! wall-clock duration.
//! - [`RetryScheduled`]: emitted only when a retry was actually rescheduled
//! (the atomic gate in `RETRY_RESCHEDULE_SCRIPT` returned `1`).
//! - [`DlqRouted`]: emitted after a DLQ relocation succeeds, carrying the
//! reason and the attempt count that just gave up.
//!
//! # What counts as "the job"
//!
//! Operators reading dashboards: the metrics map onto handler invocations,
//! not raw inbound jobs. Specifically:
//!
//! - **Reader-side DLQ paths** (malformed entry, oversize payload, decode
//! failure, retries-exhausted-on-arrival): emit [`DlqRouted`] only.
//! No [`JobOutcome`] — the handler never ran. These events carry
//! `attempt: 0`.
//! - **Worker handler runs**: emit exactly one [`JobOutcome`].
//! - **Worker-side DLQ** (handler failed enough times): emit
//! [`JobOutcome::Err`] (or [`JobOutcomeKind::Panic`]) AND a downstream
//! [`DlqRouted`].
//!
//! So `chasquimq_jobs_completed_total + chasquimq_jobs_failed_total` is the
//! count of handler invocations; total inbound jobs = handler invocations +
//! `chasquimq_dlq_routed_total{reason!="retries_exhausted"}` (roughly —
//! `retries_exhausted` shows up on both worker-side and reader-side paths).
use Arc;
pub use dispatch;
/// Shape of the most recent non-empty `XREADGROUP` response. Empty responses
/// are not emitted — at low traffic with `block_ms` polling, that would
/// produce tens of events per second per consumer with no useful signal.
/// Track Redis-side metrics if you need raw poll rate.
/// Outcome of a single handler invocation.
///
/// `name` carries the job's dispatch name (the `n` field on the source
/// stream entry). Adapters typically render this as a `name="..."`
/// Prometheus / OTel label so operators can histogram handler duration by
/// job kind without msgpack-decoding payload bytes — this is the primary
/// observability payoff of putting `name` at the Streams framing layer
/// (Option B in `docs/name-on-wire-design.md`). Empty for jobs without a
/// name on the wire; adapters should render an empty label rather than
/// dropping the metric.
/// Emitted after the retry relocator has atomically removed the in-flight
/// stream entry and re-added it to the delayed ZSET (the
/// `RETRY_RESCHEDULE_SCRIPT` returned `1`). When the script returns `0`
/// (XACKDEL race lost — entry was already removed by a concurrent path),
/// no event fires.
/// Emitted after the DLQ relocator has atomically moved an entry to the
/// DLQ stream and acked it from the main group.
/// Why an entry ended up in the DLQ. Lives in the metrics module because it
/// is part of the public observability surface; the consumer module
/// re-exports it for internal use at the routing call sites.
///
/// `Malformed { reason }` carries `&'static str` (not `String`) so the whole
/// enum is `Copy`.
/// Receiver of engine-internal observability events.
///
/// **Implementations must be cheap and non-blocking.** Events fire on hot
/// loops — promoter ticks, every handler invocation, every retry/DLQ
/// transition — and any latency added here directly raises end-to-end
/// scheduling lag and lowers throughput. Specifically:
///
/// - Do **not** call `block_on` or anything else that parks the runtime.
/// - Do **not** acquire long-held locks; per-call work should be O(1).
/// - **Safe**: incrementing a Prometheus `Counter` / setting a `Gauge`,
/// recording into a `Histogram`, pushing into an unbounded
/// `crossbeam::channel`, atomic increments.
/// - **Unsafe**: synchronous network I/O, `tokio::sync::Mutex::lock().await`
/// via `block_on`, anything that allocates per-event in the millions.
///
/// The default [`NoopSink`] is zero-cost; use it (the configs default to it)
/// unless you've wired a real metrics backend.
///
/// All trait methods have default no-op bodies so adding new events here
/// in future does not break downstream `MetricsSink` implementations.
/// Default sink — drops every event.
;