net/consumer/merge.rs
1//! Cross-shard poll merge layer.
2//!
3//! This module handles polling from multiple shards and merging the results
4//! into a unified stream with proper cursor management.
5//!
6//! # Composite Cursor
7//!
8//! When polling multiple shards, we track position in each shard using a
9//! composite cursor encoded as base64 JSON:
10//!
11//! ```json
12//! {"0": "1702123456789-0", "1": "1702123456790-0", ...}
13//! ```
14
15use std::cmp::Ordering as CmpOrdering;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
20use serde::{Deserialize, Serialize};
21
22use crate::adapter::{Adapter, ShardPollResult};
23use crate::consumer::filter::{CompiledFilter, Filter};
24use crate::error::{AdapterError, ConsumerError};
25use crate::event::StoredEvent;
26
27/// Compare two adapter-emitted stream ids using numeric semantics for
28/// the formats both built-in adapters produce, with a lex fallback for
29/// already-lex-comparable opaque ids (ULID, UUIDv7, fixed-width hex).
30///
31/// A raw `str::cmp` would invert ordering on the unpadded numeric
32/// ids the JetStream adapter emits (`seq.to_string()`) and the
33/// Redis Streams server-side `{ms}-{seq}` format — `"9" > "10"`
34/// lexicographically would wedge the cursor at every decade
35/// boundary. The structured comparator below handles both formats
36/// numerically. Mixed-padding comparisons across upgrades still
37/// compare correctly because parse-then-compare ignores leading
38/// zeros.
39///
40/// Order of attempts:
41/// 1. Both ids parse as `<u64>-<u64>` (Redis Streams).
42/// 2. Both ids parse as `<u128>` (raw numeric, padded or unpadded).
43/// 3. Lex compare (ULID, UUID, hex digests, etc.).
44///
45/// We deliberately do not try to mix formats — if one side is `123`
46/// and the other is `456-0`, the lex fallback kicks in. In practice
47/// a single adapter emits a single format, so this only matters for
48/// pathological mixed-source streams.
49pub(crate) fn compare_stream_ids(a: &str, b: &str) -> CmpOrdering {
50 // Redis Streams `<ms>-<seq>`.
51 if let (Some((a_ms, a_seq)), Some((b_ms, b_seq))) = (split_redis_id(a), split_redis_id(b)) {
52 return (a_ms, a_seq).cmp(&(b_ms, b_seq));
53 }
54 // Plain numeric (JetStream `seq.to_string()` or future zero-padded form).
55 if let (Ok(an), Ok(bn)) = (a.parse::<u128>(), b.parse::<u128>()) {
56 return an.cmp(&bn);
57 }
58 // Opaque id — assumed already lex-comparable.
59 a.cmp(b)
60}
61
62fn split_redis_id(s: &str) -> Option<(u64, u64)> {
63 let (ms, seq) = s.split_once('-')?;
64 Some((ms.parse().ok()?, seq.parse().ok()?))
65}
66
67/// Coarse classifier for a stream id. Two ids of the same
68/// format compare safely via `compare_stream_ids`; two ids of
69/// different formats fall through to a lex compare that may
70/// wedge the cursor (see `update_from_events`).
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub(crate) enum IdFormat {
73 /// Redis Streams `<ms>-<seq>`.
74 Redis,
75 /// Plain numeric (JetStream `seq.to_string()`).
76 Numeric,
77 /// Anything else — assumed lex-comparable.
78 Opaque,
79}
80
81pub(crate) fn id_format(s: &str) -> IdFormat {
82 if split_redis_id(s).is_some() {
83 IdFormat::Redis
84 } else if s.parse::<u128>().is_ok() {
85 IdFormat::Numeric
86 } else {
87 IdFormat::Opaque
88 }
89}
90
91/// Backing type for per-shard cursor positions. `Arc<str>` makes
92/// cursor clones (and internal copies during poll merging) cheap by
93/// reference-counting the id bytes rather than copying them.
94type CursorPos = Arc<str>;
95
96/// Ordering mode for consumed events.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum Ordering {
99 /// Return events in arbitrary order (fastest).
100 ///
101 /// **Cross-shard order is non-deterministic.** Events from a
102 /// given shard preserve their per-shard arrival order, but the
103 /// interleaving across shards depends on `futures::join_all`'s
104 /// completion order — which varies with runtime scheduling,
105 /// adapter latency, and concurrent load. Callers that need a
106 /// stable order across polls must use [`Ordering::InsertionTs`]
107 /// or sort the response themselves (e.g. by `(shard_id, id)`).
108 #[default]
109 None,
110 /// Sort events by insertion timestamp (cross-shard ordering).
111 InsertionTs,
112}
113
114/// Composite cursor tracking position across multiple shards.
115#[derive(Debug, Clone, Default, Serialize, Deserialize)]
116pub struct CompositeCursor {
117 /// Per-shard positions (shard_id -> stream_id).
118 ///
119 /// Stored as `Arc<str>` so internal copies (e.g. `cursor.clone()`
120 /// inside the poll merger) bump a refcount instead of duplicating
121 /// each id's bytes.
122 #[serde(flatten)]
123 pub positions: HashMap<u16, CursorPos>,
124}
125
126impl CompositeCursor {
127 /// Create an empty cursor.
128 pub fn new() -> Self {
129 Self::default()
130 }
131
132 /// Encode the cursor as a base64 string.
133 ///
134 /// Pre-fix used `unwrap_or_default()`, which silently
135 /// produced an empty string on serialization failure. The
136 /// empty cursor then base64-encoded to an empty string and
137 /// the consumer's next poll restarted from the beginning of
138 /// the stream — silent rewind. For the current `positions`
139 /// schema (`HashMap<u16, Arc<str>>`), serialization is
140 /// infallible, so the failure path is unreachable.
141 ///
142 /// We surface that as a `ConsumerError::InvalidCursor` rather
143 /// than a panic. `poll()` is an `async fn`, and a panic that
144 /// propagates from there can abort the surrounding tokio
145 /// runtime worker. Returning `Err` lets `poll()` stay
146 /// non-panicking even if a future schema change breaks the
147 /// invariant; the caller will see a structured error and can
148 /// retry / log instead of taking down a worker.
149 pub fn encode(&self) -> Result<String, ConsumerError> {
150 let json = serde_json::to_string(&self.positions).map_err(|e| {
151 ConsumerError::InvalidCursor(format!(
152 "CompositeCursor::encode failed to serialize positions \
153 (HashMap<u16, Arc<str>> should be infallible): {e}"
154 ))
155 })?;
156 Ok(BASE64.encode(json.as_bytes()))
157 }
158
159 /// Decode a cursor from a base64 string.
160 pub fn decode(s: &str) -> Result<Self, ConsumerError> {
161 let bytes = BASE64
162 .decode(s)
163 .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
164
165 // Two-pass parse so non-canonical shard-id keys (e.g.
166 // `"00"` aliasing `"0"`) are rejected explicitly. Pre-fix
167 // we deserialized straight into `HashMap<u16, _>`; serde
168 // parses each string key as u16, so `"0"` and `"00"`
169 // both produce key 0 and the second insert silently
170 // overwrites the first. The collision is benign in
171 // production (no caller emits non-canonical keys) but
172 // a malicious or buggy producer could inject a hostile
173 // cursor that ambiguates which shard's position a
174 // consumer ended up with on round-trip.
175 let raw_positions: HashMap<String, CursorPos> = serde_json::from_slice(&bytes)
176 .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
177 let mut positions: HashMap<u16, CursorPos> = HashMap::with_capacity(raw_positions.len());
178 for (key, val) in raw_positions {
179 let id: u16 = key.parse().map_err(|_| {
180 ConsumerError::InvalidCursor(format!("shard key {key:?} is not a valid u16"))
181 })?;
182 // Reject non-canonical stringifications. The
183 // round-trip `u16 → String` is the canonical form;
184 // any other string that parses to the same u16 is
185 // a non-canonical alias.
186 if id.to_string() != key {
187 return Err(ConsumerError::InvalidCursor(format!(
188 "non-canonical shard key {key:?} (parses to {id}, \
189 canonical form is {id})"
190 )));
191 }
192 if positions.insert(id, val).is_some() {
193 // Defensive: if non-canonical detection above
194 // missed something (it shouldn't), a duplicate
195 // canonical key is also a structural error.
196 return Err(ConsumerError::InvalidCursor(format!(
197 "duplicate shard key {id} after canonicalization"
198 )));
199 }
200 }
201
202 Ok(Self { positions })
203 }
204
205 /// Get the position for a specific shard.
206 pub fn get(&self, shard_id: u16) -> Option<&str> {
207 self.positions.get(&shard_id).map(|s| s.as_ref())
208 }
209
210 /// Set the position for a specific shard.
211 ///
212 /// Accepts anything that converts into an `Arc<str>` — notably
213 /// `String`, `&str`, and `Arc<str>` itself. This lets adapters
214 /// hand us a freshly-allocated `String` (becomes a single boxed
215 /// allocation) without forcing a second copy for the cursor.
216 ///
217 /// **Unconditional**: skips the backend-format guard and the
218 /// monotonicity guard. Use [`Self::set_checked`] from production
219 /// poll paths — the documented "refuse to advance across a
220 /// JetStream → Redis cursor format change" protection lives in
221 /// the checked variant, not here. The unchecked variant is kept
222 /// public for tests that need to seed a cursor directly.
223 pub fn set(&mut self, shard_id: u16, position: impl Into<CursorPos>) {
224 self.positions.insert(shard_id, position.into());
225 }
226
227 /// Set the position for a specific shard, applying the same
228 /// backend-format mismatch guard as [`Self::update_from_events`].
229 /// Returns `true` if the write happened, `false` if it was
230 /// refused (format mismatch — error-logged so operators see the
231 /// migration). No-existing-position is treated as accept (first
232 /// write).
233 ///
234 /// Production poll paths must use this instead of [`Self::set`]
235 /// so a mid-stream backend migration (JetStream → Redis or vice
236 /// versa) is surfaced and refused at the cursor write site
237 /// rather than silently overwriting the format and stalling the
238 /// caller.
239 pub fn set_checked(&mut self, shard_id: u16, new_id: &str) -> bool {
240 match self.positions.get(&shard_id) {
241 Some(existing) => {
242 let existing_fmt = id_format(existing.as_ref());
243 let new_fmt = id_format(new_id);
244 if existing_fmt != new_fmt {
245 tracing::error!(
246 shard_id,
247 existing = %existing,
248 new = %new_id,
249 existing_format = ?existing_fmt,
250 new_format = ?new_fmt,
251 "stream id format change detected — likely a \
252 backend migration (e.g. JetStream → Redis). \
253 Refusing to advance the cursor; operator must \
254 explicitly reset to consume from the new \
255 backend.",
256 );
257 return false;
258 }
259 self.positions.insert(shard_id, Arc::from(new_id));
260 true
261 }
262 None => {
263 self.positions.insert(shard_id, Arc::from(new_id));
264 true
265 }
266 }
267 }
268
269 /// Update positions from consumed events.
270 ///
271 /// Per-shard CAS routed through `compare_stream_ids`, which
272 /// understands the Redis (`<ms>-<seq>`) and JetStream (`<u64>`)
273 /// formats numerically and falls back to lex for opaque ids
274 /// (ULID, UUIDv7, hex digests). The cursor cannot regress and
275 /// decade-rollovers cannot freeze it. Unconditional inserts
276 /// would let whichever event for a given `shard_id` appeared
277 /// *last* in the slice win regardless of stream order; a plain
278 /// `str::cmp` CAS would wedge on the unpadded numeric ids both
279 /// built-in adapters emit (`"9" > "10"` lexicographically).
280 pub fn update_from_events(&mut self, events: &[StoredEvent]) {
281 for event in events {
282 let new_id = event.id.as_str();
283 match self.positions.get(&event.shard_id) {
284 Some(existing) => {
285 // Detect a backend-format change before
286 // calling the comparator. Pre-fix a cursor
287 // at `"42"` (JetStream numeric) confronted
288 // with a new `"1700-0"` (Redis) fell through
289 // both structured branches of
290 // `compare_stream_ids`, hit the lex fallback
291 // (`'4' > '1'`), and the CAS guard refused
292 // to update — silent stall requiring manual
293 // cursor reset. Detect the mismatch
294 // explicitly: surface a loud error and
295 // refuse the update so operators see the
296 // backend migration in logs and reset the
297 // cursor deliberately. Keeping the existing
298 // value (rather than blindly accepting the
299 // new one) avoids a potential regression in
300 // the other direction.
301 let existing_fmt = id_format(existing.as_ref());
302 let new_fmt = id_format(new_id);
303 if existing_fmt != new_fmt {
304 tracing::error!(
305 shard_id = event.shard_id,
306 existing = %existing,
307 new = %new_id,
308 existing_format = ?existing_fmt,
309 new_format = ?new_fmt,
310 "stream id format change detected — likely a \
311 backend migration (e.g. JetStream → Redis). \
312 Refusing to advance the cursor; operator must \
313 explicitly reset to consume from the new \
314 backend.",
315 );
316 continue;
317 }
318 if compare_stream_ids(existing.as_ref(), new_id) == CmpOrdering::Less {
319 self.positions.insert(event.shard_id, Arc::from(new_id));
320 }
321 // Existing is >= new_id under the structured
322 // comparator — don't regress.
323 }
324 None => {
325 self.positions.insert(event.shard_id, Arc::from(new_id));
326 }
327 }
328 }
329 }
330}
331
332/// Request for consuming events.
333#[derive(Debug, Clone, Default)]
334pub struct ConsumeRequest {
335 /// Start cursor (opaque to caller). None means from the beginning.
336 pub from_id: Option<String>,
337 /// Maximum number of events to return.
338 pub limit: usize,
339 /// Optional filter to apply.
340 pub filter: Option<Filter>,
341 /// Ordering mode.
342 pub ordering: Ordering,
343 /// Specific shards to poll. None means all shards.
344 pub shards: Option<Vec<u16>>,
345}
346
347impl ConsumeRequest {
348 /// Create a new consume request.
349 pub fn new(limit: usize) -> Self {
350 Self {
351 limit,
352 ..Default::default()
353 }
354 }
355
356 /// Set the starting cursor.
357 pub fn from(mut self, cursor: impl Into<String>) -> Self {
358 self.from_id = Some(cursor.into());
359 self
360 }
361
362 /// Set the filter.
363 pub fn filter(mut self, filter: Filter) -> Self {
364 self.filter = Some(filter);
365 self
366 }
367
368 /// Set the ordering mode.
369 pub fn ordering(mut self, ordering: Ordering) -> Self {
370 self.ordering = ordering;
371 self
372 }
373
374 /// Set specific shards to poll.
375 pub fn shards(mut self, shards: Vec<u16>) -> Self {
376 self.shards = Some(shards);
377 self
378 }
379}
380
381/// Response from consuming events.
382#[derive(Debug, Clone)]
383pub struct ConsumeResponse {
384 /// Events matching the request.
385 pub events: Vec<StoredEvent>,
386 /// Cursor for the next poll. None if no events returned.
387 pub next_id: Option<String>,
388 /// True if there are more events available.
389 pub has_more: bool,
390 /// `true` if the per-shard fetch was clamped by the internal
391 /// `PER_SHARD_FETCH_CAP` (10 000). Callers requesting very
392 /// large `limit` values across few shards may receive fewer
393 /// events than `limit` per `poll()` even when the underlying
394 /// streams have more — pagination via `next_id` still works.
395 ///
396 /// Pre-fix this clamp was silent. The default is
397 /// `false`; tools building observability around large polls
398 /// can detect under-delivery via this flag.
399 pub truncated_at_per_shard_cap: bool,
400 /// Shards that reported `has_more=true` but contributed no
401 /// events and no cursor advance to this poll. The merger
402 /// suppresses the aggregate `has_more` flag for caller-
403 /// protection (preventing infinite loops), but operators
404 /// monitoring adapter health should know which shards are
405 /// stuck.
406 ///
407 /// Pre-fix the suppression was logged at warn but
408 /// invisible to callers. Empty on the happy path; populated
409 /// only when a stall was detected and suppressed.
410 pub stalled_shards: Vec<u16>,
411 /// Shards whose adapter call returned an error during this
412 /// poll. The merger logs each error at WARN and continues
413 /// with the surviving shards, so the response's `events`
414 /// can come from a strict subset of the configured shards
415 /// and silently miss data the operator expected to see.
416 /// Operators monitoring adapter health need to know WHICH
417 /// shards failed (not just that something logged a warn) so
418 /// they can correlate alerts with specific Redis / JetStream
419 /// nodes.
420 ///
421 /// Pre-fix this signal lived only in the warn log; an
422 /// observer parsing `ConsumeResponse` saw a clean partial-
423 /// shards response with no field indicating *which* shards
424 /// were missing, in contrast to `stalled_shards` which IS
425 /// surfaced. Empty on the happy path; populated only when at
426 /// least one shard's poll errored.
427 ///
428 /// **Recovery-latency note:** when a previously-failed shard
429 /// returns to health, its backlog drains at the per-shard limit
430 /// of the next poll (`limit / shards.len() * over_fetch_factor`).
431 /// A 4-shard poll where one shard was down for 60 s and the
432 /// others kept ingesting can take several normal-cadence polls
433 /// to fully drain the recovered shard. No backlog-size hint is
434 /// surfaced on the response; callers that need to detect a
435 /// stuck-recovery condition should monitor cursor advance per
436 /// shard across consecutive polls.
437 pub failed_shards: Vec<u16>,
438}
439
440impl ConsumeResponse {
441 /// Create an empty response.
442 pub fn empty() -> Self {
443 Self {
444 events: Vec::new(),
445 next_id: None,
446 has_more: false,
447 truncated_at_per_shard_cap: false,
448 stalled_shards: Vec::new(),
449 failed_shards: Vec::new(),
450 }
451 }
452}
453
454/// Internal cap on per-shard `direct_get` / `XRANGE` fetch sizes,
455/// applied in `PollMerger::poll`. Bounds the adapter's per-call
456/// memory pressure for a single poll. Callers needing larger
457/// effective limits should paginate.
458///
459/// Marked `#[doc(hidden)]` because the value is an internal
460/// tuning knob, not part of the consumer's public API. Surfacing
461/// it on the docs would invite downstreams to match against it
462/// and turn a silent tuning change into a breaking-change
463/// negotiation. Callers that need to know whether a poll was
464/// truncated should read `ConsumeResponse::truncated_at_per_shard_cap`
465/// rather than comparing against this constant directly.
466#[doc(hidden)]
467pub const PER_SHARD_FETCH_CAP: usize = 10_000;
468
469/// Match a `StoredEvent` against a filter, surfacing parse failures.
470///
471/// Returns `true` iff the event parses as JSON AND the filter matches
472/// the parsed value. A parse failure is logged at WARN with the event's
473/// id and shard so on-disk corruption or framing bugs in upstream
474/// adapters are observable from the filtered-poll path; without this,
475/// corrupt events were silently dropped from filtered results while the
476/// unfiltered path still returned them — a confusing inconsistency.
477///
478/// Takes a [`CompiledFilter`] rather than a [`Filter`] so the dot-path
479/// split + per-segment integer parse happens once per poll, not once
480/// per event — see perf #15 / #16.
481fn event_matches_filter(event: &StoredEvent, filter: &CompiledFilter) -> bool {
482 match event.parse() {
483 Ok(value) => filter.matches(&value),
484 Err(e) => {
485 tracing::warn!(
486 event_id = %event.id,
487 shard_id = event.shard_id,
488 error = %e,
489 "dropping unparseable event from filtered poll result"
490 );
491 false
492 }
493 }
494}
495
496/// Poll merger for cross-shard aggregation.
497pub struct PollMerger {
498 /// Adapter for polling shards.
499 adapter: Arc<dyn Adapter>,
500 /// Active shard IDs to poll when the request omits an explicit
501 /// `shards` list.
502 ///
503 /// Previously stored only `num_shards: u16` and generated
504 /// `(0..num_shards)` on every default-shards poll. After a dynamic
505 /// scale-down (`ShardMapper::scale_down` evicts the lowest-weight
506 /// shard, not necessarily the highest id), the active id set can
507 /// become sparse — e.g. `{1, 2}` after id 0 was drained — but
508 /// `num_shards == 2` still produces `[0, 1]`, polling a stale or
509 /// nonexistent shard 0 and skipping the live shard 2 entirely.
510 /// Captured at construction; the bus replaces the merger via
511 /// `ArcSwap` whenever topology changes (`add_shard`,
512 /// `remove_shard_internal`).
513 shard_ids: Vec<u16>,
514}
515
516impl PollMerger {
517 /// Create a new poll merger.
518 ///
519 /// `shard_ids` should be the snapshot of currently-active shard IDs
520 /// (e.g. `ShardManager::shard_ids()`). Passing `0..num_shards` is
521 /// only correct when ids are guaranteed dense from 0 — i.e. the
522 /// static-shards path with no scaling.
523 pub fn new(adapter: Arc<dyn Adapter>, shard_ids: Vec<u16>) -> Self {
524 Self { adapter, shard_ids }
525 }
526
527 /// Poll events according to the request.
528 pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
529 if request.limit == 0 {
530 return Ok(ConsumeResponse::empty());
531 }
532
533 // Decode cursor
534 let cursor = match &request.from_id {
535 Some(s) => CompositeCursor::decode(s)?,
536 None => CompositeCursor::new(),
537 };
538
539 // Determine which shards to poll. Dedup the list so a caller
540 // that supplies a non-deduped slice (e.g. forwarded from a
541 // fan-out source) doesn't double-fetch a shard and return
542 // duplicate events in the response payload. The cursor would
543 // stay correct either way, but the duplicate events would
544 // survive filtering + ordering + the per-poll limit and reach
545 // the caller.
546 let mut shards: Vec<u16> = request
547 .shards
548 .clone()
549 .unwrap_or_else(|| self.shard_ids.clone());
550 shards.sort_unstable();
551 shards.dedup();
552
553 if shards.is_empty() {
554 return Ok(ConsumeResponse::empty());
555 }
556
557 // Calculate per-shard limit (over-fetch to account for filtering)
558 // Use ceiling division to avoid truncating to 0 when limit < shard count.
559 //
560 // Pre-fix this `min(10_000)` clamp was silent —
561 // a caller with `limit=200_000` over 10 shards expected
562 // 20 000/shard plus over-fetch but got 10 000/shard with
563 // no diagnostic. Track whether the clamp triggered and
564 // surface it on the response so callers building
565 // observability around large polls can detect under-
566 // delivery.
567 let over_fetch_factor = if request.filter.is_some() { 3 } else { 2 };
568 let unclamped_per_shard = request
569 .limit
570 .div_ceil(shards.len())
571 .max(1)
572 .saturating_mul(over_fetch_factor);
573 let per_shard_limit = unclamped_per_shard.min(PER_SHARD_FETCH_CAP);
574 let truncated_at_per_shard_cap = unclamped_per_shard > PER_SHARD_FETCH_CAP;
575
576 // Poll all shards in parallel. Each future borrows its start
577 // position directly from `cursor` (which outlives `join_all` below),
578 // avoiding a per-shard `String` allocation on every poll.
579 let poll_futures: Vec<_> = shards
580 .iter()
581 .map(|&shard_id| {
582 let adapter = self.adapter.clone();
583 let from: Option<&str> = cursor.get(shard_id);
584 async move {
585 let result = adapter.poll_shard(shard_id, from, per_shard_limit).await;
586 (shard_id, result)
587 }
588 })
589 .collect();
590
591 let shard_results: Vec<(u16, Result<ShardPollResult, AdapterError>)> =
592 futures::future::join_all(poll_futures).await;
593
594 // Collect results, tracking errors. Pre-allocate to the exact total
595 // event count so extend() below never reallocates.
596 let total_events: usize = shard_results
597 .iter()
598 .filter_map(|(_, r)| r.as_ref().ok().map(|sr| sr.events.len()))
599 .sum();
600 let mut all_events = Vec::with_capacity(total_events);
601 let mut any_has_more = false;
602 // Track which shards reported `has_more=true` so
603 // we can surface them on the response when the merger
604 // suppresses has_more for caller-protection. Pre-fix, an
605 // adapter stuck reporting `has_more=true` with no events
606 // and no cursor advance was logged at warn but invisible
607 // to callers — they saw a clean "no more events" and
608 // exited.
609 let mut shards_reporting_has_more: Vec<u16> = Vec::new();
610 // Per-shard adapter errors. Pre-fix these were logged at
611 // warn and then dropped on the floor; the response's
612 // `events` was a strict subset of the configured shards
613 // with no field indicating WHICH shards were missing.
614 // Surface the failed shard ids so observers can correlate
615 // alerts with specific Redis / JetStream nodes (parallel
616 // to the existing `stalled_shards` field).
617 let mut failed_shards: Vec<u16> = Vec::new();
618 // Shards whose `set_checked` cursor write was refused due to
619 // a backend-format change mid-stream. Their fetched events
620 // MUST be dropped from the response and excluded from any
621 // subsequent cursor override; otherwise the caller would
622 // receive events whose cursor never advances, re-fetch the
623 // same events on the next poll, and burn into an infinite
624 // duplicate-delivery loop. Surface them through the same
625 // `failed_shards` channel so operators see the migration.
626 let mut format_refused_shards: std::collections::HashSet<u16> =
627 std::collections::HashSet::new();
628 // `new_cursor` (fetched-position tracking) is only consulted on the
629 // filter path — building it for unfiltered polls wastes a full
630 // HashMap clone plus a `set()` per shard every poll.
631 let mut new_cursor = if request.filter.is_some() {
632 Some(cursor.clone())
633 } else {
634 None
635 };
636
637 for (shard_id, result) in shard_results {
638 match result {
639 Ok(shard_result) => {
640 // Destructure to move `next_id` out without cloning the
641 // String that the adapter already allocated for us.
642 let ShardPollResult {
643 events,
644 next_id,
645 has_more,
646 } = shard_result;
647 // Route through set_checked so a mid-stream
648 // backend migration (JetStream → Redis cursor
649 // format change) is refused with a loud error
650 // log rather than silently overwriting the
651 // format on the cursor. On refusal, the shard's
652 // events must not enter the response — otherwise
653 // the caller would receive events without a
654 // matching cursor advance and re-fetch them
655 // forever. Track the refusal and skip merging.
656 let refused = if let (Some(nc), Some(next_id)) =
657 (new_cursor.as_mut(), next_id.as_ref())
658 {
659 !nc.set_checked(shard_id, next_id)
660 } else {
661 false
662 };
663 if refused {
664 format_refused_shards.insert(shard_id);
665 failed_shards.push(shard_id);
666 // Drop the shard's events for this poll.
667 // The error log was emitted inside
668 // `set_checked`; the operator-facing
669 // failed_shards entry above is the
670 // structured signal.
671 continue;
672 }
673 if has_more {
674 any_has_more = true;
675 shards_reporting_has_more.push(shard_id);
676 }
677 all_events.extend(events);
678 }
679 Err(e) => {
680 tracing::warn!(
681 shard_id = shard_id,
682 error = %e,
683 "Failed to poll shard, skipping"
684 );
685 failed_shards.push(shard_id);
686 // Continue with other shards
687 }
688 }
689 }
690
691 // Apply filter.
692 //
693 // IMPORTANT: Use `new_cursor` (which tracks fetched positions) as
694 // the base cursor so that shards whose events are entirely filtered
695 // out still advance past those events. Without this, filtered-out
696 // events would be re-fetched on every subsequent poll, causing an
697 // infinite loop.
698 //
699 // Parse failures: a `StoredEvent` whose `raw` bytes don't
700 // deserialize as JSON cannot match a filter, so it is dropped
701 // from the filtered result. Previously this drop was silent
702 // (`unwrap_or(false)`), making on-disk corruption or
703 // adapter-side framing bugs invisible to operators — only
704 // *unfiltered* polls would surface the bad event.
705 //
706 // The previous `Ordering::None` path had a `break` once
707 // `kept.len() >= limit + 1`, which discarded events from later
708 // shards without ever filtering them. Combined with the cursor
709 // advancing past every fetched event, that meant matching
710 // events on un-inspected shards were silently lost. The fix
711 // uses a single full `retain` pass for both ordering modes;
712 // the `lazy parse` micro-optimization is gone, but per-event
713 // filter-matching is cheap and consistent semantics with the
714 // sort path is worth more than parse-skip on over-fetches.
715 if let Some(filter) = &request.filter {
716 // Compile once per poll — pre-splits every dot-path and
717 // pre-parses each segment's integer form. Pre-fix this
718 // work happened on every event in the retain loop
719 // (perf #15 / #16).
720 let compiled = filter.compile();
721 all_events.retain(|e| event_matches_filter(e, &compiled));
722 }
723
724 // Apply ordering
725 match request.ordering {
726 Ordering::None => {
727 // Keep arbitrary order
728 }
729 Ordering::InsertionTs => {
730 // `insertion_ts` is monotonic *per shard*, not
731 // globally (see `event.rs:233`), so two events from
732 // different shards can carry the same timestamp. With
733 // a stable sort on `insertion_ts` alone, ties were
734 // broken by the input order — which depends on
735 // `futures::future::join_all`'s completion ordering
736 // and is non-deterministic across polls. Combined
737 // with `truncate(limit)` and the cursor-rollback step,
738 // the same logical event could be returned twice or
739 // skipped at the limit boundary across consecutive
740 // polls.
741 //
742 // Add `(shard_id, id)` as deterministic
743 // tiebreakers. `id` is the storage backend's
744 // identifier and is unique within a shard, so the
745 // composite is a strict total order.
746 //
747 // The id tiebreak routes through
748 // `compare_stream_ids`, which understands the
749 // unpadded numeric formats both built-in adapters
750 // emit (Redis `<ms>-<seq>`, JetStream `<u64>`) and
751 // falls back to lex for opaque ids (ULID, UUIDv7,
752 // hex digests). The `(insertion_ts, shard_id)`
753 // chain resolves the common cases first; when two
754 // events from the same shard land at the same
755 // `insertion_ts` (rare but legal at millisecond
756 // granularity), the structured id compare is
757 // correct on every adapter format.
758 all_events.sort_by(|a, b| {
759 a.insertion_ts
760 .cmp(&b.insertion_ts)
761 .then(a.shard_id.cmp(&b.shard_id))
762 .then(compare_stream_ids(&a.id, &b.id))
763 });
764 }
765 }
766
767 // Track per-shard match counts *before* truncate. After
768 // truncation, any shard whose match count shrank means matches
769 // were dropped — and those matches must be re-fetched on the
770 // next poll, otherwise they are silently lost (the cursor
771 // would otherwise advance past them via `new_cursor`).
772 let mut matched_per_shard: std::collections::HashMap<u16, usize> =
773 std::collections::HashMap::new();
774 if request.filter.is_some() {
775 for e in &all_events {
776 *matched_per_shard.entry(e.shard_id).or_insert(0) += 1;
777 }
778 }
779
780 // Truncate to requested limit
781 let had_extra = all_events.len() > request.limit;
782 all_events.truncate(request.limit);
783
784 // Build the final cursor.
785 //
786 // With filtering: start from `new_cursor` (fetched positions) so
787 // shards whose events were entirely filtered out advance past
788 // them. Then:
789 // 1. For shards that had matches truncated (returned <
790 // total_matched), roll the cursor *back* to the original
791 // pre-poll position. The override step then bumps it
792 // forward to the last returned match for that shard, so
793 // the unreturned matches re-appear on the next poll.
794 // 2. Override with the last *returned* event id per shard —
795 // this also prevents skipping matching events that were
796 // fetched but truncated by the limit on shards that did
797 // land in the returned set.
798 //
799 // Without filtering: start from the original `cursor` so shards
800 // with no returned events (due to limit truncation) don't skip
801 // ahead.
802 let mut final_cursor = match new_cursor {
803 Some(nc) => nc,
804 None => cursor.clone(),
805 };
806
807 // Step 1: rollback for shards with truncated matches. We
808 // track which shards we rolled back here so Step 2 only
809 // overrides those shards (rather than every shard with a
810 // returned event, which would throw away the adapter's
811 // `next_id` advance for shards that returned all their
812 // matches).
813 let mut rolled_back: std::collections::HashSet<u16> = std::collections::HashSet::new();
814 if request.filter.is_some() && had_extra {
815 let mut returned_per_shard: std::collections::HashMap<u16, usize> =
816 std::collections::HashMap::new();
817 for e in &all_events {
818 *returned_per_shard.entry(e.shard_id).or_insert(0) += 1;
819 }
820 for (shard_id, &total_matched) in &matched_per_shard {
821 let returned = returned_per_shard.get(shard_id).copied().unwrap_or(0);
822 if returned < total_matched {
823 // Some matches for this shard were truncated. Roll
824 // back to the original cursor so they're re-fetched.
825 // The override below will move us forward to the
826 // last *returned* match (if any), so we still make
827 // progress per poll.
828 match cursor.positions.get(shard_id) {
829 Some(orig) => final_cursor.set(*shard_id, orig.clone()),
830 None => {
831 final_cursor.positions.remove(shard_id);
832 }
833 }
834 rolled_back.insert(*shard_id);
835 }
836 }
837 }
838
839 // Step 2: override to last returned event id per shard.
840 // Only the last returned event per shard matters for the
841 // cursor, so iterate in reverse and skip shards already seen.
842 // This reduces id clones from O(all_events.len()) to
843 // O(shards.len()).
844 //
845 // When the filter path is active, Step 1 has already
846 // populated `final_cursor` with the adapter's `next_id`
847 // (a position past the last *fetched* event for each
848 // shard). A blanket Step 2 override here would move the
849 // cursor back to the last *matched* event id, which is
850 // BEHIND the last fetched event for any shard with
851 // non-matched events — subsequent polls would re-fetch and
852 // re-filter those non-matches, wasting work proportional
853 // to `over_fetch_factor` on low-match-rate streams. So we
854 // only Step 2-override for shards that were actually
855 // rolled back in Step 1 (those need a forward push past
856 // the last *returned* match), OR when the filter path
857 // wasn't used at all (filter is None).
858 //
859 // For filter=None, the previous Step-2 behavior was correct
860 // — `final_cursor` started as `cursor.clone()` (no
861 // `new_cursor` advance) and the only progress signal is
862 // the last returned event id.
863 let mut seen_shards: std::collections::HashSet<u16> =
864 std::collections::HashSet::with_capacity(shards.len());
865 for event in all_events.iter().rev() {
866 if seen_shards.insert(event.shard_id) {
867 let should_override =
868 request.filter.is_none() || rolled_back.contains(&event.shard_id);
869 if should_override {
870 // Same backend-format guard as the Step-1
871 // adapter-cursor write site above. Surfaces a
872 // mid-stream backend migration deterministically
873 // rather than overwriting silently. If the
874 // write is refused, surface the shard in
875 // failed_shards so the caller sees the
876 // migration as a structured signal rather than
877 // only as an event-mismatched cursor.
878 if !final_cursor.set_checked(event.shard_id, &event.id)
879 && !format_refused_shards.contains(&event.shard_id)
880 {
881 format_refused_shards.insert(event.shard_id);
882 failed_shards.push(event.shard_id);
883 }
884 }
885 // Else: the adapter's `next_id` (already in
886 // `final_cursor` via the `new_cursor` initial
887 // value) is more advanced than the last matched
888 // event — preserve it.
889 }
890 }
891
892 // Drop events for any shard whose Step-1 OR Step-2 cursor
893 // write was refused due to a backend-format mismatch.
894 // Returning these events without a matching cursor advance
895 // would burn the caller into an infinite duplicate-delivery
896 // loop on the next poll. `failed_shards` already carries the
897 // structured signal to the caller.
898 //
899 // No-op when `format_refused_shards` is empty (the common
900 // case), so the retain is a single bool check per event in
901 // the happy path.
902 if !format_refused_shards.is_empty() {
903 all_events.retain(|e| !format_refused_shards.contains(&e.shard_id));
904 }
905
906 let cursor_advanced = final_cursor.positions != cursor.positions;
907 // When filtering removed everything but we did advance past fetched
908 // events, signal has_more so the caller keeps polling forward.
909 let all_filtered = request.filter.is_some() && all_events.is_empty() && cursor_advanced;
910 // Previously `has_more = any_has_more || had_extra ||
911 // all_filtered`. If a single adapter returned
912 // `ShardPollResult { events: [], next_id: None,
913 // has_more: true }` (legal under the trait contract — nothing
914 // forbids it), then `any_has_more=true` propagated even
915 // though we made *no* progress. The caller observed
916 // `(has_more=true, next_id=None)` and re-polled from the
917 // same starting cursor indefinitely.
918 //
919 // Suppress `has_more` when the merger itself made no progress
920 // at all (no events returned AND the cursor didn't advance).
921 // The caller then sees a clean "nothing to do right now"
922 // response and must back off rather than spin.
923 let we_made_progress = !all_events.is_empty() || cursor_advanced;
924 let has_more = (any_has_more || had_extra || all_filtered) && we_made_progress;
925 // When we suppress has_more for caller-protection
926 // (an adapter stuck at has_more=true with no progress),
927 // surface the offending shard ids on the response so
928 // operators can alert. Pre-fix the suppression was warn-
929 // logged only.
930 let stalled_shards: Vec<u16> = if any_has_more && !we_made_progress {
931 tracing::warn!(
932 stalled_shards = ?shards_reporting_has_more,
933 "PollMerger: an adapter reported has_more=true with no events \
934 and no cursor advance — suppressing to avoid caller infinite-loop"
935 );
936 shards_reporting_has_more
937 } else {
938 Vec::new()
939 };
940 // Return the cursor even when all events were filtered out, so the
941 // caller advances past the filtered region instead of re-fetching
942 // the same events forever. When the poll made no progress at
943 // all, echo back the caller's input cursor (if any) instead of
944 // returning `None` — pre-fix a stalled poll dropped the cursor,
945 // so a caller that interpreted `next_id == None` as "no events,
946 // restart from the beginning" silently regressed pagination
947 // across the stall. Echoing preserves the cursor across stalls.
948 let next_id = if we_made_progress {
949 Some(final_cursor.encode()?)
950 } else {
951 request.from_id.clone()
952 };
953
954 Ok(ConsumeResponse {
955 events: all_events,
956 next_id,
957 has_more,
958 truncated_at_per_shard_cap,
959 stalled_shards,
960 failed_shards,
961 })
962 }
963}
964
965#[cfg(test)]
966mod tests {
967 #![allow(
968 clippy::disallowed_methods,
969 reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
970 )]
971 use super::*;
972 use serde_json::json;
973
974 #[test]
975 fn test_cursor_encode_decode() {
976 let mut cursor = CompositeCursor::new();
977 cursor.set(0, "1702123456789-0".to_string());
978 cursor.set(1, "1702123456790-0".to_string());
979 cursor.set(5, "1702123456795-0".to_string());
980
981 let encoded = cursor.encode().unwrap();
982 let decoded = CompositeCursor::decode(&encoded).unwrap();
983
984 assert_eq!(decoded.get(0), Some("1702123456789-0"));
985 assert_eq!(decoded.get(1), Some("1702123456790-0"));
986 assert_eq!(decoded.get(5), Some("1702123456795-0"));
987 assert_eq!(decoded.get(2), None);
988 }
989
990 #[test]
991 fn test_cursor_update_from_events() {
992 let mut cursor = CompositeCursor::new();
993
994 let events = vec![
995 StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
996 StoredEvent::from_value("200-0".to_string(), json!({}), 200, 1),
997 // This used to be "later event in shard 0" by
998 // virtue of being LAST in the input slice, but its id
999 // (150-0) is stream-order BEFORE 200-0 — wait, this is
1000 // for shard 0 not shard 1. shard 0 only had 100-0
1001 // before this; 150-0 > 100-0, so it advances normally.
1002 StoredEvent::from_value("150-0".to_string(), json!({}), 150, 0),
1003 ];
1004
1005 cursor.update_from_events(&events);
1006
1007 // Should have the highest id seen for each shard.
1008 assert_eq!(cursor.get(0), Some("150-0"));
1009 assert_eq!(cursor.get(1), Some("200-0"));
1010 }
1011
1012 /// Cursor must NOT regress when events arrive in a
1013 /// non-ascending order for the same shard. Pre-fix the cursor
1014 /// for shard 0 would land on `100-0` (the last item in the
1015 /// slice), regressing past `200-0`.
1016 #[test]
1017 fn cursor_does_not_regress_on_unsorted_per_shard_events() {
1018 let mut cursor = CompositeCursor::new();
1019 // For shard 0: stream-order 100-0 → 200-0, but the slice
1020 // has them reversed (consumer received 200-0 first,
1021 // then 100-0 because of merge ordering).
1022 let events = vec![
1023 StoredEvent::from_value("200-0".to_string(), json!({}), 200, 0),
1024 StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
1025 ];
1026 cursor.update_from_events(&events);
1027 assert_eq!(
1028 cursor.get(0),
1029 Some("200-0"),
1030 "cursor must hold the highest id, not the last-in-slice id",
1031 );
1032 }
1033
1034 /// A partial overlap (advance for one shard, regression
1035 /// attempt for another shard) must keep both cursors at their
1036 /// respective max.
1037 #[test]
1038 fn cursor_compare_and_set_is_per_shard() {
1039 let mut cursor = CompositeCursor::new();
1040 cursor.update_from_events(&[
1041 StoredEvent::from_value("500-0".to_string(), json!({}), 500, 0),
1042 StoredEvent::from_value("500-0".to_string(), json!({}), 500, 1),
1043 ]);
1044 // Now "advance" with one regress attempt for shard 0 + a
1045 // legitimate advance for shard 1.
1046 cursor.update_from_events(&[
1047 StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0), // regress
1048 StoredEvent::from_value("700-0".to_string(), json!({}), 700, 1), // advance
1049 ]);
1050 assert_eq!(cursor.get(0), Some("500-0"), "shard 0 must not regress");
1051 assert_eq!(cursor.get(1), Some("700-0"), "shard 1 must advance");
1052 }
1053
1054 /// CR-1: pre-fix the cursor used `str::cmp` which inverts at
1055 /// every decade rollover for unpadded numeric ids. JetStream
1056 /// emits `seq.to_string()` (unpadded) — once seq=10 lands, lex
1057 /// compare says `"10" < "9"` and the cursor freezes at "9".
1058 #[test]
1059 fn cursor_does_not_wedge_on_jetstream_decade_rollover() {
1060 let mut cursor = CompositeCursor::new();
1061 for seq in 1u64..=20 {
1062 let ev = StoredEvent::from_value(seq.to_string(), json!({}), seq, 0);
1063 cursor.update_from_events(&[ev]);
1064 }
1065 assert_eq!(
1066 cursor.get(0),
1067 Some("20"),
1068 "cursor must reach 20; lex compare would wedge at \"9\""
1069 );
1070 }
1071
1072 /// CR-1: same hazard for Redis's `<ms>-<seq>` format when seq
1073 /// rolls past a decade within a single ms.
1074 #[test]
1075 fn cursor_does_not_wedge_on_redis_seq_decade_rollover() {
1076 let mut cursor = CompositeCursor::new();
1077 // All within one ms — Redis collides on seq when many events
1078 // hit in the same millisecond.
1079 for seq in 1u64..=20 {
1080 let id = format!("1700000000000-{}", seq);
1081 let ev = StoredEvent::from_value(id, json!({}), 1700000000000, 0);
1082 cursor.update_from_events(&[ev]);
1083 }
1084 assert_eq!(
1085 cursor.get(0),
1086 Some("1700000000000-20"),
1087 "cursor must reach -20; lex compare would wedge at -9"
1088 );
1089 }
1090
1091 /// Regression: a backend migration (e.g. JetStream → Redis)
1092 /// that lands an id of a different format than the existing
1093 /// cursor must NOT silently advance OR silently stall. Pre-
1094 /// fix `compare_stream_ids` fell through both structured
1095 /// branches and hit the lex fallback: `"42" > "1700-0"` (because
1096 /// `'4' > '1'`), so the CAS guard refused to update the
1097 /// cursor. Result: the consumer kept seeing `"42"` forever
1098 /// while the new Redis backend kept emitting Redis-formatted
1099 /// ids, with no surfaced error.
1100 ///
1101 /// Post-fix: format-mismatch is detected explicitly and
1102 /// surfaced via `tracing::error!`. The cursor stays at its
1103 /// current value (so we don't regress), and an operator must
1104 /// reset the cursor deliberately to consume from the new
1105 /// backend. This test pins the "stays at existing value"
1106 /// half of the contract; the loud error is observability,
1107 /// not behavior, so it isn't asserted here.
1108 #[test]
1109 fn cursor_refuses_to_advance_across_backend_format_change() {
1110 let mut cursor = CompositeCursor::new();
1111 // Cursor starts at JetStream-style numeric "42".
1112 cursor.update_from_events(&[StoredEvent::from_value("42".to_string(), json!({}), 42, 0)]);
1113 assert_eq!(cursor.get(0), Some("42"));
1114
1115 // A new event arrives in Redis format. Pre-fix this would
1116 // hit the lex fallback and the cursor would refuse to
1117 // advance silently; post-fix the format mismatch is
1118 // detected and the cursor STILL refuses to advance, but
1119 // a `tracing::error!` is emitted so operators see the
1120 // backend migration.
1121 cursor.update_from_events(&[StoredEvent::from_value(
1122 "1700000000000-0".to_string(),
1123 json!({}),
1124 1700000000000,
1125 0,
1126 )]);
1127
1128 // Cursor must still be at the original numeric id (not
1129 // the new Redis id, which would be a regression-like
1130 // jump back in time, AND not unset, which would lose
1131 // progress).
1132 assert_eq!(
1133 cursor.get(0),
1134 Some("42"),
1135 "regression: cursor must not silently advance through a \
1136 backend-format change. The pre-fix lex fallback also \
1137 happened to keep the existing value (by `'4' > '1'`), \
1138 but only by accident; this test pins the explicit \
1139 format-mismatch refusal."
1140 );
1141
1142 // And the reverse direction: a Redis cursor confronted
1143 // with a new numeric id must also stay put.
1144 let mut cursor = CompositeCursor::new();
1145 cursor.update_from_events(&[StoredEvent::from_value(
1146 "1700000000000-0".to_string(),
1147 json!({}),
1148 1700000000000,
1149 0,
1150 )]);
1151 cursor.update_from_events(&[StoredEvent::from_value(
1152 "9000".to_string(),
1153 json!({}),
1154 9000,
1155 0,
1156 )]);
1157 assert_eq!(
1158 cursor.get(0),
1159 Some("1700000000000-0"),
1160 "regression (reverse direction): Redis cursor must not be \
1161 silently overwritten by an incoming numeric id"
1162 );
1163 }
1164
1165 /// CR-1: cross-decade compare on JetStream-style ids.
1166 #[test]
1167 fn cursor_advances_from_unpadded_9_to_unpadded_10() {
1168 let mut cursor = CompositeCursor::new();
1169 cursor.update_from_events(&[StoredEvent::from_value("9".to_string(), json!({}), 9, 0)]);
1170 cursor.update_from_events(&[StoredEvent::from_value("10".to_string(), json!({}), 10, 0)]);
1171 assert_eq!(cursor.get(0), Some("10"));
1172 }
1173
1174 /// CR-1: ULID / opaque ids must still work via lex fallback.
1175 /// ULIDs are designed to be lex-sortable and we should NOT route
1176 /// them through the numeric parsers.
1177 #[test]
1178 fn cursor_advances_on_ulid_ids_via_lex_fallback() {
1179 let mut cursor = CompositeCursor::new();
1180 // Two real ULIDs in ascending stream order. Different
1181 // timestamp prefixes ensure lex order matches stream order.
1182 let earlier = "01HZ0000000000000000000000";
1183 let later = "01HZ0000010000000000000000";
1184 cursor.update_from_events(&[StoredEvent::from_value(
1185 earlier.to_string(),
1186 json!({}),
1187 1,
1188 0,
1189 )]);
1190 cursor.update_from_events(&[StoredEvent::from_value(later.to_string(), json!({}), 2, 0)]);
1191 assert_eq!(cursor.get(0), Some(later));
1192 // Now feed the earlier id again — must NOT regress.
1193 cursor.update_from_events(&[StoredEvent::from_value(
1194 earlier.to_string(),
1195 json!({}),
1196 1,
1197 0,
1198 )]);
1199 assert_eq!(cursor.get(0), Some(later));
1200 }
1201
1202 /// CR-1: direct `compare_stream_ids` unit coverage.
1203 #[test]
1204 fn compare_stream_ids_handles_known_formats() {
1205 // JetStream-style: numeric compare, padded-or-not.
1206 assert_eq!(compare_stream_ids("9", "10"), CmpOrdering::Less);
1207 assert_eq!(compare_stream_ids("10", "9"), CmpOrdering::Greater);
1208 assert_eq!(compare_stream_ids("100", "100"), CmpOrdering::Equal);
1209 // Padding-insensitive: leading zeros do not change numeric value.
1210 assert_eq!(compare_stream_ids("00000010", "9"), CmpOrdering::Greater);
1211
1212 // Redis-style: tuple compare on (ms, seq).
1213 assert_eq!(
1214 compare_stream_ids("1700-9", "1700-10"),
1215 CmpOrdering::Less,
1216 "seq must compare numerically, not lex"
1217 );
1218 assert_eq!(
1219 compare_stream_ids("1700-9", "1701-0"),
1220 CmpOrdering::Less,
1221 "ms wins over seq"
1222 );
1223 assert_eq!(
1224 compare_stream_ids("1700-100", "1700-9"),
1225 CmpOrdering::Greater
1226 );
1227
1228 // ULID-shaped ids fall through to lex compare.
1229 let ulid_a = "01HZ0000000000000000000000";
1230 let ulid_b = "01HZ0000010000000000000000";
1231 assert_eq!(compare_stream_ids(ulid_a, ulid_b), CmpOrdering::Less);
1232
1233 // Mixed format (one Redis, one numeric) — neither structured
1234 // path applies, falls through to lex. Documented limitation.
1235 // (Adapters emit a single format, so this is pathological.)
1236 let _ = compare_stream_ids("1700-0", "9999");
1237 }
1238
1239 /// CR-1: in `Ordering::InsertionTs` mode, two events from the
1240 /// same shard with the same `insertion_ts` and unpadded numeric
1241 /// ids must sort by numeric id, not lex id.
1242 #[test]
1243 fn insertion_ts_sort_breaks_tie_on_id_numerically() {
1244 // Same shard, same ts — only the id tiebreak fires.
1245 let mut events = [
1246 StoredEvent::from_value("10".to_string(), json!({}), 1000, 0),
1247 StoredEvent::from_value("9".to_string(), json!({}), 1000, 0),
1248 StoredEvent::from_value("11".to_string(), json!({}), 1000, 0),
1249 ];
1250 events.sort_by(|a, b| {
1251 a.insertion_ts
1252 .cmp(&b.insertion_ts)
1253 .then(a.shard_id.cmp(&b.shard_id))
1254 .then(compare_stream_ids(&a.id, &b.id))
1255 });
1256 let ordered: Vec<&str> = events.iter().map(|e| e.id.as_str()).collect();
1257 assert_eq!(
1258 ordered,
1259 vec!["9", "10", "11"],
1260 "id tiebreak must be numeric, not lex"
1261 );
1262 }
1263
1264 #[test]
1265 fn test_consume_request_builder() {
1266 let request = ConsumeRequest::new(100)
1267 .from("some_cursor")
1268 .ordering(Ordering::InsertionTs)
1269 .shards(vec![0, 1, 2])
1270 .filter(Filter::eq("type", json!("token")));
1271
1272 assert_eq!(request.limit, 100);
1273 assert_eq!(request.from_id, Some("some_cursor".to_string()));
1274 assert_eq!(request.ordering, Ordering::InsertionTs);
1275 assert_eq!(request.shards, Some(vec![0, 1, 2]));
1276 assert!(request.filter.is_some());
1277 }
1278
1279 #[test]
1280 fn test_invalid_cursor() {
1281 let result = CompositeCursor::decode("not_valid_base64!!!");
1282 assert!(result.is_err());
1283
1284 // Valid base64 but not valid JSON
1285 let result = CompositeCursor::decode(&BASE64.encode(b"not json"));
1286 assert!(result.is_err());
1287 }
1288
1289 /// Regression: non-canonical shard-id keys must be rejected
1290 /// at decode time. Pre-fix `serde_json::from_slice::<HashMap<u16,
1291 /// _>>` parsed `"00"` and `"0"` both as u16 0; the second
1292 /// insert silently overwrote the first, leaving the consumer
1293 /// with whichever entry happened to come later in the JSON.
1294 /// Two distinct stringifications collapsed to one shard
1295 /// position with no surfaced error.
1296 #[test]
1297 fn cursor_decode_rejects_non_canonical_shard_keys() {
1298 // Construct a JSON cursor with `"00"` aliasing `"0"`.
1299 // Both round-trip to u16 0 under standard parsing.
1300 let hostile = br#"{"00":"id_a","1":"id_b"}"#;
1301 let encoded = BASE64.encode(hostile);
1302 let result = CompositeCursor::decode(&encoded);
1303 assert!(
1304 result.is_err(),
1305 "non-canonical shard key `\"00\"` must reject; \
1306 pre-fix this silently parsed as shard 0"
1307 );
1308
1309 // Boundary: the canonical "0" (no leading zero) decodes
1310 // normally.
1311 let canonical = br#"{"0":"id_a","1":"id_b"}"#;
1312 let encoded_ok = BASE64.encode(canonical);
1313 let cursor =
1314 CompositeCursor::decode(&encoded_ok).expect("canonical shard keys must decode cleanly");
1315 assert_eq!(cursor.get(0), Some("id_a"));
1316 assert_eq!(cursor.get(1), Some("id_b"));
1317 }
1318
1319 #[test]
1320 fn test_composite_cursor_new() {
1321 let cursor = CompositeCursor::new();
1322 assert!(cursor.positions.is_empty());
1323 }
1324
1325 #[test]
1326 fn test_composite_cursor_default() {
1327 let cursor = CompositeCursor::default();
1328 assert!(cursor.positions.is_empty());
1329 }
1330
1331 #[test]
1332 fn test_composite_cursor_get_nonexistent() {
1333 let cursor = CompositeCursor::new();
1334 assert!(cursor.get(0).is_none());
1335 assert!(cursor.get(100).is_none());
1336 }
1337
1338 #[test]
1339 fn test_composite_cursor_set_overwrites() {
1340 let mut cursor = CompositeCursor::new();
1341 cursor.set(0, "first".to_string());
1342 assert_eq!(cursor.get(0), Some("first"));
1343
1344 cursor.set(0, "second".to_string());
1345 assert_eq!(cursor.get(0), Some("second"));
1346 }
1347
1348 #[test]
1349 fn test_composite_cursor_empty_encode() {
1350 let cursor = CompositeCursor::new();
1351 let encoded = cursor.encode().unwrap();
1352 let decoded = CompositeCursor::decode(&encoded).unwrap();
1353 assert!(decoded.positions.is_empty());
1354 }
1355
1356 #[test]
1357 fn test_composite_cursor_clone() {
1358 let mut cursor = CompositeCursor::new();
1359 cursor.set(0, "pos-0".to_string());
1360 cursor.set(1, "pos-1".to_string());
1361
1362 let cloned = cursor.clone();
1363 assert_eq!(cloned.get(0), Some("pos-0"));
1364 assert_eq!(cloned.get(1), Some("pos-1"));
1365 }
1366
1367 #[test]
1368 fn test_composite_cursor_debug() {
1369 let mut cursor = CompositeCursor::new();
1370 cursor.set(0, "test".to_string());
1371 let debug = format!("{:?}", cursor);
1372 assert!(debug.contains("CompositeCursor"));
1373 assert!(debug.contains("positions"));
1374 }
1375
1376 #[test]
1377 fn test_ordering_default() {
1378 let ordering = Ordering::default();
1379 assert_eq!(ordering, Ordering::None);
1380 }
1381
1382 #[test]
1383 fn test_ordering_clone_copy() {
1384 let ordering = Ordering::InsertionTs;
1385 let cloned = ordering;
1386 assert_eq!(cloned, Ordering::InsertionTs);
1387 }
1388
1389 #[test]
1390 fn test_ordering_debug() {
1391 assert!(format!("{:?}", Ordering::None).contains("None"));
1392 assert!(format!("{:?}", Ordering::InsertionTs).contains("InsertionTs"));
1393 }
1394
1395 #[test]
1396 fn test_consume_request_new() {
1397 let request = ConsumeRequest::new(50);
1398 assert_eq!(request.limit, 50);
1399 assert!(request.from_id.is_none());
1400 assert!(request.filter.is_none());
1401 assert_eq!(request.ordering, Ordering::None);
1402 assert!(request.shards.is_none());
1403 }
1404
1405 #[test]
1406 fn test_consume_request_default() {
1407 let request = ConsumeRequest::default();
1408 assert_eq!(request.limit, 0);
1409 assert!(request.from_id.is_none());
1410 assert!(request.filter.is_none());
1411 assert_eq!(request.ordering, Ordering::None);
1412 assert!(request.shards.is_none());
1413 }
1414
1415 #[test]
1416 fn test_consume_request_from_string() {
1417 let request = ConsumeRequest::new(10).from(String::from("cursor123"));
1418 assert_eq!(request.from_id, Some("cursor123".to_string()));
1419 }
1420
1421 #[test]
1422 fn test_consume_request_clone() {
1423 let request = ConsumeRequest::new(100)
1424 .from("cursor")
1425 .ordering(Ordering::InsertionTs)
1426 .shards(vec![0, 1]);
1427
1428 let cloned = request.clone();
1429 assert_eq!(cloned.limit, 100);
1430 assert_eq!(cloned.from_id, Some("cursor".to_string()));
1431 assert_eq!(cloned.ordering, Ordering::InsertionTs);
1432 assert_eq!(cloned.shards, Some(vec![0, 1]));
1433 }
1434
1435 #[test]
1436 fn test_consume_request_debug() {
1437 let request = ConsumeRequest::new(10);
1438 let debug = format!("{:?}", request);
1439 assert!(debug.contains("ConsumeRequest"));
1440 assert!(debug.contains("limit"));
1441 }
1442
1443 #[test]
1444 fn test_consume_response_empty() {
1445 let response = ConsumeResponse::empty();
1446 assert!(response.events.is_empty());
1447 assert!(response.next_id.is_none());
1448 assert!(!response.has_more);
1449 }
1450
1451 #[test]
1452 fn test_consume_response_clone() {
1453 let mut response = ConsumeResponse::empty();
1454 response.next_id = Some("cursor".to_string());
1455 response.has_more = true;
1456
1457 let cloned = response.clone();
1458 assert_eq!(cloned.next_id, Some("cursor".to_string()));
1459 assert!(cloned.has_more);
1460 }
1461
1462 #[test]
1463 fn test_consume_response_debug() {
1464 let response = ConsumeResponse::empty();
1465 let debug = format!("{:?}", response);
1466 assert!(debug.contains("ConsumeResponse"));
1467 assert!(debug.contains("events"));
1468 }
1469
1470 #[test]
1471 fn test_cursor_update_from_empty_events() {
1472 let mut cursor = CompositeCursor::new();
1473 cursor.set(0, "original".to_string());
1474
1475 let events: Vec<StoredEvent> = vec![];
1476 cursor.update_from_events(&events);
1477
1478 // Cursor should be unchanged
1479 assert_eq!(cursor.get(0), Some("original"));
1480 }
1481
1482 #[test]
1483 fn test_cursor_many_shards() {
1484 let mut cursor = CompositeCursor::new();
1485 for i in 0..100u16 {
1486 cursor.set(i, format!("pos-{}", i));
1487 }
1488
1489 let encoded = cursor.encode().unwrap();
1490 let decoded = CompositeCursor::decode(&encoded).unwrap();
1491
1492 for i in 0..100u16 {
1493 assert_eq!(decoded.get(i), Some(format!("pos-{}", i).as_str()));
1494 }
1495 }
1496
1497 #[test]
1498 fn test_consume_request_empty_shards() {
1499 let request = ConsumeRequest::new(100).shards(vec![]);
1500 assert_eq!(request.shards, Some(vec![]));
1501 }
1502
1503 #[test]
1504 fn test_consume_request_ordering_none() {
1505 let request = ConsumeRequest::new(100).ordering(Ordering::None);
1506 assert_eq!(request.ordering, Ordering::None);
1507 }
1508
1509 #[test]
1510 fn test_ordering_equality() {
1511 assert_eq!(Ordering::None, Ordering::None);
1512 assert_eq!(Ordering::InsertionTs, Ordering::InsertionTs);
1513 assert_ne!(Ordering::None, Ordering::InsertionTs);
1514 }
1515
1516 // Mock adapter for testing PollMerger
1517 use crate::adapter::{Adapter, ShardPollResult};
1518 use crate::error::AdapterError;
1519 use crate::event::Batch;
1520 use async_trait::async_trait;
1521 use std::collections::HashMap;
1522 use std::sync::RwLock;
1523
1524 struct MockAdapter {
1525 events: RwLock<HashMap<u16, Vec<StoredEvent>>>,
1526 }
1527
1528 impl MockAdapter {
1529 fn new() -> Self {
1530 Self {
1531 events: RwLock::new(HashMap::new()),
1532 }
1533 }
1534
1535 fn add_events(&self, shard_id: u16, events: Vec<StoredEvent>) {
1536 let mut map = self.events.write().unwrap();
1537 map.entry(shard_id).or_default().extend(events);
1538 }
1539 }
1540
1541 #[async_trait]
1542 impl Adapter for MockAdapter {
1543 async fn init(&mut self) -> Result<(), AdapterError> {
1544 Ok(())
1545 }
1546
1547 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
1548 Ok(())
1549 }
1550
1551 async fn flush(&self) -> Result<(), AdapterError> {
1552 Ok(())
1553 }
1554
1555 async fn shutdown(&self) -> Result<(), AdapterError> {
1556 Ok(())
1557 }
1558
1559 async fn poll_shard(
1560 &self,
1561 shard_id: u16,
1562 from_id: Option<&str>,
1563 limit: usize,
1564 ) -> Result<ShardPollResult, AdapterError> {
1565 let map = self.events.read().unwrap();
1566 let events = map.get(&shard_id).cloned().unwrap_or_default();
1567
1568 // Filter by from_id if provided
1569 let filtered: Vec<_> = if let Some(from) = from_id {
1570 events
1571 .into_iter()
1572 .skip_while(|e| e.id != from)
1573 .skip(1) // Skip the from_id itself
1574 .collect()
1575 } else {
1576 events
1577 };
1578
1579 let has_more = filtered.len() > limit;
1580 let events: Vec<_> = filtered.into_iter().take(limit).collect();
1581 let next_id = events.last().map(|e| e.id.clone());
1582
1583 Ok(ShardPollResult {
1584 events,
1585 next_id,
1586 has_more,
1587 })
1588 }
1589
1590 fn name(&self) -> &'static str {
1591 "mock"
1592 }
1593 }
1594
1595 #[tokio::test]
1596 async fn test_poll_merger_new() {
1597 let adapter = Arc::new(MockAdapter::new());
1598 let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1599 assert_eq!(merger.shard_ids, vec![0, 1, 2, 3]);
1600 }
1601
1602 /// When the active shard id set is sparse (e.g. shard 0
1603 /// was scaled down, leaving `{1, 2}`), a poll with no explicit
1604 /// `request.shards` must hit shards 1 and 2 — not generate
1605 /// `[0, 1]` from a stale count and miss the live shard 2.
1606 ///
1607 /// Pre-fix, `PollMerger` stored only `num_shards: u16` and the
1608 /// default branch generated `(0..num_shards).collect()`, so
1609 /// shard 2's events were silently invisible to default-shards
1610 /// consumers after a scale-down.
1611 #[tokio::test]
1612 async fn poll_merger_default_shards_uses_active_id_set_after_scale_down() {
1613 let adapter = Arc::new(MockAdapter::new());
1614
1615 // Shard 0 (drained / no longer active): would mis-poll
1616 // pre-fix and could return stale data on adapters that
1617 // recreate streams on demand. We don't add events here.
1618 // Shard 1 + Shard 2: active set after a scale-down that
1619 // evicted shard 0.
1620 adapter.add_events(
1621 1,
1622 vec![StoredEvent::from_value(
1623 "1-a".to_string(),
1624 json!({"shard": 1}),
1625 100,
1626 1,
1627 )],
1628 );
1629 adapter.add_events(
1630 2,
1631 vec![StoredEvent::from_value(
1632 "2-a".to_string(),
1633 json!({"shard": 2}),
1634 200,
1635 2,
1636 )],
1637 );
1638
1639 // Sparse id set — shard 0 is NOT in the active list.
1640 let merger = PollMerger::new(adapter, vec![1, 2]);
1641
1642 // Default-shards request (no `shards` override).
1643 let request = ConsumeRequest::new(100);
1644 let response = merger.poll(request).await.unwrap();
1645
1646 let returned: std::collections::HashSet<u16> =
1647 response.events.iter().map(|e| e.shard_id).collect();
1648 assert!(
1649 returned.contains(&1),
1650 "default-shards poll must include shard 1 (active)",
1651 );
1652 assert!(
1653 returned.contains(&2),
1654 "default-shards poll must include shard 2 — pre-fix this was silently \
1655 skipped because the merger generated `0..num_shards` = `[0, 1]`",
1656 );
1657 assert!(
1658 !returned.contains(&0),
1659 "default-shards poll must NOT touch shard 0 — it was evicted",
1660 );
1661 assert_eq!(response.events.len(), 2);
1662 }
1663
1664 #[tokio::test]
1665 async fn test_poll_merger_empty_limit() {
1666 let adapter = Arc::new(MockAdapter::new());
1667 let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1668
1669 let request = ConsumeRequest::new(0);
1670 let response = merger.poll(request).await.unwrap();
1671
1672 assert!(response.events.is_empty());
1673 assert!(response.next_id.is_none());
1674 assert!(!response.has_more);
1675 }
1676
1677 #[tokio::test]
1678 async fn test_poll_merger_empty_shards() {
1679 let adapter = Arc::new(MockAdapter::new());
1680 let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1681
1682 let request = ConsumeRequest::new(100).shards(vec![]);
1683 let response = merger.poll(request).await.unwrap();
1684
1685 assert!(response.events.is_empty());
1686 assert!(response.next_id.is_none());
1687 assert!(!response.has_more);
1688 }
1689
1690 /// Production poll() must route every cursor write through the
1691 /// backend-format guard. Pre-fix, both the Step-1 adapter-
1692 /// next_id write and the Step-2 last-event override called the
1693 /// unchecked `set`, so a mid-stream backend migration
1694 /// (JetStream numeric ids → Redis stream ids of the form
1695 /// `<ms>-<seq>`) would silently overwrite the format and the
1696 /// documented "refuse to advance, operator must reset" guard
1697 /// from `update_from_events` never fired in production.
1698 #[test]
1699 fn composite_cursor_set_checked_refuses_format_change() {
1700 let mut cursor = CompositeCursor::new();
1701 cursor.set(0, "42".to_string()); // JetStream numeric
1702 // Same shard, new id in Redis stream format.
1703 let accepted = cursor.set_checked(0, "1700000000000-0");
1704 assert!(
1705 !accepted,
1706 "format change must be refused — set_checked must return false"
1707 );
1708 // Cursor must NOT have advanced.
1709 assert_eq!(cursor.get(0), Some("42"));
1710 }
1711
1712 #[test]
1713 fn composite_cursor_set_checked_accepts_same_format_advance() {
1714 let mut cursor = CompositeCursor::new();
1715 cursor.set(0, "42".to_string());
1716 // Newer JetStream id — same format, monotonically forward.
1717 assert!(cursor.set_checked(0, "100"));
1718 assert_eq!(cursor.get(0), Some("100"));
1719 }
1720
1721 #[test]
1722 fn composite_cursor_set_checked_accepts_first_write() {
1723 let mut cursor = CompositeCursor::new();
1724 assert!(cursor.set_checked(3, "1700000000000-0"));
1725 assert_eq!(cursor.get(3), Some("1700000000000-0"));
1726 }
1727
1728 /// A caller that forwards a non-deduplicated shard list — e.g.
1729 /// `vec![0, 0, 1]` — must not cause `poll()` to fetch the same
1730 /// shard twice and return duplicate events. Pre-fix, the merger
1731 /// consumed `shards` verbatim and issued one `poll_shard(0, …)`
1732 /// per occurrence; both returned the same events; the duplicates
1733 /// survived the per-shard limit and reached the caller.
1734 #[tokio::test]
1735 async fn poll_dedupes_duplicate_shard_ids_in_request() {
1736 let adapter = Arc::new(MockAdapter::new());
1737 adapter.add_events(
1738 0,
1739 vec![
1740 StoredEvent::from_value("s0-a".to_string(), json!({"v": 1}), 1, 0),
1741 StoredEvent::from_value("s0-b".to_string(), json!({"v": 2}), 2, 0),
1742 ],
1743 );
1744 adapter.add_events(
1745 1,
1746 vec![StoredEvent::from_value(
1747 "s1-a".to_string(),
1748 json!({"v": 3}),
1749 3,
1750 1,
1751 )],
1752 );
1753
1754 let merger = PollMerger::new(adapter, vec![0, 1]);
1755 let response = merger
1756 .poll(ConsumeRequest::new(100).shards(vec![0, 0, 1]))
1757 .await
1758 .unwrap();
1759
1760 // 2 events from shard 0 (not 4 — i.e. not fetched twice) plus
1761 // 1 event from shard 1 = 3 total. Pre-fix this was 5.
1762 assert_eq!(response.events.len(), 3);
1763 let s0_count = response.events.iter().filter(|e| e.shard_id == 0).count();
1764 assert_eq!(s0_count, 2, "shard 0 events must not be duplicated");
1765 }
1766
1767 /// Regression: per-shard adapter errors must surface on
1768 /// `ConsumeResponse.failed_shards`, not silently disappear
1769 /// after a `tracing::warn!`. Pre-fix the merger logged the
1770 /// error and continued; the response carried events from
1771 /// the surviving shards with no field indicating WHICH
1772 /// shards failed (in contrast to `stalled_shards` which IS
1773 /// surfaced). Operators correlating alerts with specific
1774 /// Redis / JetStream nodes had to grep logs instead of
1775 /// reading a structured response field.
1776 #[tokio::test]
1777 async fn poll_response_surfaces_failed_shard_ids() {
1778 // Mock that fails on a specific shard id.
1779 struct FailingShardMock {
1780 inner: MockAdapter,
1781 fail_shard: u16,
1782 }
1783
1784 #[async_trait]
1785 impl Adapter for FailingShardMock {
1786 async fn init(&mut self) -> Result<(), AdapterError> {
1787 Ok(())
1788 }
1789 async fn on_batch(&self, _b: Arc<Batch>) -> Result<(), AdapterError> {
1790 Ok(())
1791 }
1792 async fn flush(&self) -> Result<(), AdapterError> {
1793 Ok(())
1794 }
1795 async fn shutdown(&self) -> Result<(), AdapterError> {
1796 Ok(())
1797 }
1798 async fn poll_shard(
1799 &self,
1800 shard_id: u16,
1801 from_id: Option<&str>,
1802 limit: usize,
1803 ) -> Result<ShardPollResult, AdapterError> {
1804 if shard_id == self.fail_shard {
1805 return Err(AdapterError::Transient(format!(
1806 "synthetic failure on shard {shard_id}"
1807 )));
1808 }
1809 self.inner.poll_shard(shard_id, from_id, limit).await
1810 }
1811 fn name(&self) -> &'static str {
1812 "failing-mock"
1813 }
1814 }
1815
1816 let inner = MockAdapter::new();
1817 // Shard 0 has events, shard 1 will fail, shard 2 has events.
1818 inner.add_events(
1819 0,
1820 vec![StoredEvent::from_value(
1821 "0-1".to_string(),
1822 json!({"shard": 0}),
1823 100,
1824 0,
1825 )],
1826 );
1827 inner.add_events(
1828 2,
1829 vec![StoredEvent::from_value(
1830 "2-1".to_string(),
1831 json!({"shard": 2}),
1832 100,
1833 2,
1834 )],
1835 );
1836
1837 let adapter = Arc::new(FailingShardMock {
1838 inner,
1839 fail_shard: 1,
1840 });
1841 let merger = PollMerger::new(adapter, vec![0, 1, 2]);
1842
1843 let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
1844
1845 // Surviving shards' events still come through.
1846 assert_eq!(
1847 response.events.len(),
1848 2,
1849 "events from non-failing shards must still be returned"
1850 );
1851
1852 // The failed shard id is surfaced on the response.
1853 assert_eq!(
1854 response.failed_shards,
1855 vec![1],
1856 "regression: failed_shards must list the shard whose adapter \
1857 errored. Pre-fix this list didn't exist; observers couldn't \
1858 tell which shard was missing without log scraping."
1859 );
1860 }
1861
1862 #[tokio::test]
1863 async fn test_poll_merger_with_events() {
1864 let adapter = Arc::new(MockAdapter::new());
1865
1866 // Add events to shard 0
1867 adapter.add_events(
1868 0,
1869 vec![
1870 StoredEvent::from_value("0-1".to_string(), json!({"type": "a"}), 100, 0),
1871 StoredEvent::from_value("0-2".to_string(), json!({"type": "b"}), 200, 0),
1872 ],
1873 );
1874
1875 // Add events to shard 1
1876 adapter.add_events(
1877 1,
1878 vec![StoredEvent::from_value(
1879 "1-1".to_string(),
1880 json!({"type": "c"}),
1881 150,
1882 1,
1883 )],
1884 );
1885
1886 let merger = PollMerger::new(adapter, vec![0, 1]);
1887
1888 let request = ConsumeRequest::new(100);
1889 let response = merger.poll(request).await.unwrap();
1890
1891 assert_eq!(response.events.len(), 3);
1892 assert!(response.next_id.is_some());
1893 }
1894
1895 #[tokio::test]
1896 async fn test_poll_merger_with_ordering() {
1897 let adapter = Arc::new(MockAdapter::new());
1898
1899 // Add events with different timestamps
1900 adapter.add_events(
1901 0,
1902 vec![
1903 StoredEvent::from_value("0-1".to_string(), json!({}), 300, 0),
1904 StoredEvent::from_value("0-2".to_string(), json!({}), 100, 0),
1905 ],
1906 );
1907 adapter.add_events(
1908 1,
1909 vec![StoredEvent::from_value(
1910 "1-1".to_string(),
1911 json!({}),
1912 200,
1913 1,
1914 )],
1915 );
1916
1917 let merger = PollMerger::new(adapter, vec![0, 1]);
1918
1919 let request = ConsumeRequest::new(100).ordering(Ordering::InsertionTs);
1920 let response = merger.poll(request).await.unwrap();
1921
1922 // Events should be sorted by insertion_ts
1923 assert_eq!(response.events.len(), 3);
1924 assert_eq!(response.events[0].insertion_ts, 100);
1925 assert_eq!(response.events[1].insertion_ts, 200);
1926 assert_eq!(response.events[2].insertion_ts, 300);
1927 }
1928
1929 #[tokio::test]
1930 async fn test_poll_merger_with_filter() {
1931 let adapter = Arc::new(MockAdapter::new());
1932
1933 adapter.add_events(
1934 0,
1935 vec![
1936 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
1937 StoredEvent::from_value("0-2".to_string(), json!({"type": "message"}), 200, 0),
1938 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 300, 0),
1939 ],
1940 );
1941
1942 let merger = PollMerger::new(adapter, vec![0]);
1943
1944 let request = ConsumeRequest::new(100).filter(Filter::eq("type", json!("token")));
1945 let response = merger.poll(request).await.unwrap();
1946
1947 assert_eq!(response.events.len(), 2);
1948 for event in &response.events {
1949 assert!(event.raw_str().unwrap().contains("token"));
1950 }
1951 }
1952
1953 #[tokio::test]
1954 async fn test_poll_merger_with_limit() {
1955 let adapter = Arc::new(MockAdapter::new());
1956
1957 adapter.add_events(
1958 0,
1959 vec![
1960 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1961 StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
1962 StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
1963 ],
1964 );
1965
1966 let merger = PollMerger::new(adapter, vec![0]);
1967
1968 let request = ConsumeRequest::new(2);
1969 let response = merger.poll(request).await.unwrap();
1970
1971 assert_eq!(response.events.len(), 2);
1972 assert!(response.has_more);
1973 }
1974
1975 #[tokio::test]
1976 async fn test_poll_merger_specific_shards() {
1977 let adapter = Arc::new(MockAdapter::new());
1978
1979 adapter.add_events(
1980 0,
1981 vec![StoredEvent::from_value(
1982 "0-1".to_string(),
1983 json!({"shard": 0}),
1984 100,
1985 0,
1986 )],
1987 );
1988 adapter.add_events(
1989 1,
1990 vec![StoredEvent::from_value(
1991 "1-1".to_string(),
1992 json!({"shard": 1}),
1993 100,
1994 1,
1995 )],
1996 );
1997 adapter.add_events(
1998 2,
1999 vec![StoredEvent::from_value(
2000 "2-1".to_string(),
2001 json!({"shard": 2}),
2002 100,
2003 2,
2004 )],
2005 );
2006
2007 let merger = PollMerger::new(adapter, vec![0, 1, 2]);
2008
2009 // Only poll shard 0 and 2
2010 let request = ConsumeRequest::new(100).shards(vec![0, 2]);
2011 let response = merger.poll(request).await.unwrap();
2012
2013 assert_eq!(response.events.len(), 2);
2014 let shard_ids: Vec<_> = response.events.iter().map(|e| e.shard_id).collect();
2015 assert!(shard_ids.contains(&0));
2016 assert!(shard_ids.contains(&2));
2017 assert!(!shard_ids.contains(&1));
2018 }
2019
2020 #[tokio::test]
2021 async fn test_poll_merger_with_cursor() {
2022 let adapter = Arc::new(MockAdapter::new());
2023
2024 adapter.add_events(
2025 0,
2026 vec![
2027 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2028 StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
2029 StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
2030 ],
2031 );
2032
2033 let merger = PollMerger::new(adapter, vec![0]);
2034
2035 // First poll
2036 let request = ConsumeRequest::new(2);
2037 let response1 = merger.poll(request).await.unwrap();
2038 assert_eq!(response1.events.len(), 2);
2039
2040 // Second poll with cursor
2041 let cursor = response1.next_id.unwrap();
2042 let request2 = ConsumeRequest::new(10).from(cursor);
2043 let response2 = merger.poll(request2).await.unwrap();
2044
2045 assert_eq!(response2.events.len(), 1);
2046 assert_eq!(response2.events[0].id, "0-3");
2047 }
2048
2049 #[tokio::test]
2050 async fn test_poll_merger_pagination_multi_shard() {
2051 // Test that pagination across multiple shards doesn't skip events
2052 let adapter = Arc::new(MockAdapter::new());
2053
2054 // Shard 0: 10 events
2055 let shard0_events: Vec<_> = (1..=10)
2056 .map(|i| {
2057 StoredEvent::from_value(
2058 format!("0-{}", i),
2059 json!({"shard": 0, "idx": i}),
2060 i as u64 * 10,
2061 0,
2062 )
2063 })
2064 .collect();
2065 adapter.add_events(0, shard0_events);
2066
2067 // Shard 1: 15 events
2068 let shard1_events: Vec<_> = (1..=15)
2069 .map(|i| {
2070 StoredEvent::from_value(
2071 format!("1-{}", i),
2072 json!({"shard": 1, "idx": i}),
2073 i as u64 * 10 + 5,
2074 1,
2075 )
2076 })
2077 .collect();
2078 adapter.add_events(1, shard1_events);
2079
2080 let merger = PollMerger::new(adapter, vec![0, 1]);
2081
2082 // Poll in pages of 10 and collect all events
2083 let mut all_events = Vec::new();
2084 let mut cursor: Option<String> = None;
2085 let mut iterations = 0;
2086
2087 loop {
2088 iterations += 1;
2089 let request = match &cursor {
2090 Some(c) => ConsumeRequest::new(10).from(c.clone()),
2091 None => ConsumeRequest::new(10),
2092 };
2093
2094 let response = merger.poll(request).await.unwrap();
2095 all_events.extend(response.events);
2096
2097 if !response.has_more {
2098 break;
2099 }
2100 cursor = response.next_id;
2101
2102 // Safety: prevent infinite loop
2103 if iterations > 10 {
2104 panic!("Too many iterations");
2105 }
2106 }
2107
2108 // Should get all 25 events (10 from shard 0 + 15 from shard 1)
2109 assert_eq!(
2110 all_events.len(),
2111 25,
2112 "Expected 25 events, got {}. Iterations: {}",
2113 all_events.len(),
2114 iterations
2115 );
2116
2117 // Verify we got events from both shards
2118 let shard0_count = all_events.iter().filter(|e| e.shard_id == 0).count();
2119 let shard1_count = all_events.iter().filter(|e| e.shard_id == 1).count();
2120 assert_eq!(shard0_count, 10, "Expected 10 events from shard 0");
2121 assert_eq!(shard1_count, 15, "Expected 15 events from shard 1");
2122 }
2123
2124 #[tokio::test]
2125 async fn test_poll_merger_pagination_no_duplicates() {
2126 // Test that pagination doesn't return duplicate events
2127 let adapter = Arc::new(MockAdapter::new());
2128
2129 // Add events to both shards
2130 for shard_id in 0..2u16 {
2131 let events: Vec<_> = (1..=20)
2132 .map(|i| {
2133 StoredEvent::from_value(
2134 format!("{}-{}", shard_id, i),
2135 json!({"shard": shard_id, "idx": i}),
2136 i as u64 * 10,
2137 shard_id,
2138 )
2139 })
2140 .collect();
2141 adapter.add_events(shard_id, events);
2142 }
2143
2144 let merger = PollMerger::new(adapter, vec![0, 1]);
2145
2146 // Poll in small pages
2147 let mut all_event_ids = Vec::new();
2148 let mut cursor: Option<String> = None;
2149
2150 for _ in 0..20 {
2151 let request = match &cursor {
2152 Some(c) => ConsumeRequest::new(5).from(c.clone()),
2153 None => ConsumeRequest::new(5),
2154 };
2155
2156 let response = merger.poll(request).await.unwrap();
2157 all_event_ids.extend(response.events.iter().map(|e| e.id.clone()));
2158
2159 if !response.has_more {
2160 break;
2161 }
2162 cursor = response.next_id;
2163 }
2164
2165 // Check for duplicates
2166 let unique_count = {
2167 let mut ids = all_event_ids.clone();
2168 ids.sort();
2169 ids.dedup();
2170 ids.len()
2171 };
2172
2173 assert_eq!(
2174 unique_count,
2175 all_event_ids.len(),
2176 "Found duplicate events! Total: {}, Unique: {}",
2177 all_event_ids.len(),
2178 unique_count
2179 );
2180
2181 // Should have all 40 events
2182 assert_eq!(all_event_ids.len(), 40);
2183 }
2184
2185 #[tokio::test]
2186 async fn test_poll_merger_pagination_with_ordering() {
2187 // Test pagination with timestamp ordering
2188 let adapter = Arc::new(MockAdapter::new());
2189
2190 // Add events with interleaved timestamps across shards
2191 adapter.add_events(
2192 0,
2193 vec![
2194 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2195 StoredEvent::from_value("0-2".to_string(), json!({}), 300, 0),
2196 StoredEvent::from_value("0-3".to_string(), json!({}), 500, 0),
2197 ],
2198 );
2199 adapter.add_events(
2200 1,
2201 vec![
2202 StoredEvent::from_value("1-1".to_string(), json!({}), 200, 1),
2203 StoredEvent::from_value("1-2".to_string(), json!({}), 400, 1),
2204 ],
2205 );
2206
2207 let merger = PollMerger::new(adapter, vec![0, 1]);
2208
2209 // Poll with ordering, page size 2
2210 let mut all_events = Vec::new();
2211 let mut cursor: Option<String> = None;
2212
2213 for _ in 0..5 {
2214 let mut request = ConsumeRequest::new(2).ordering(Ordering::InsertionTs);
2215 if let Some(c) = &cursor {
2216 request = request.from(c.clone());
2217 }
2218
2219 let response = merger.poll(request).await.unwrap();
2220 all_events.extend(response.events);
2221
2222 if !response.has_more {
2223 break;
2224 }
2225 cursor = response.next_id;
2226 }
2227
2228 // Should get all 5 events
2229 assert_eq!(all_events.len(), 5);
2230
2231 // Verify ordering is maintained
2232 let timestamps: Vec<_> = all_events.iter().map(|e| e.insertion_ts).collect();
2233 let mut sorted = timestamps.clone();
2234 sorted.sort();
2235 assert_eq!(timestamps, sorted, "Events should be sorted by timestamp");
2236 }
2237
2238 #[tokio::test]
2239 async fn test_poll_merger_cursor_tracks_returned_events_only() {
2240 // Test that cursor tracks position based on returned events, not fetched events
2241 let adapter = Arc::new(MockAdapter::new());
2242
2243 // Shard 0: 3 events
2244 adapter.add_events(
2245 0,
2246 vec![
2247 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2248 StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
2249 StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
2250 ],
2251 );
2252
2253 // Shard 1: 3 events
2254 adapter.add_events(
2255 1,
2256 vec![
2257 StoredEvent::from_value("1-1".to_string(), json!({}), 150, 1),
2258 StoredEvent::from_value("1-2".to_string(), json!({}), 250, 1),
2259 StoredEvent::from_value("1-3".to_string(), json!({}), 350, 1),
2260 ],
2261 );
2262
2263 let merger = PollMerger::new(adapter, vec![0, 1]);
2264
2265 // First poll with limit 2 - should get 2 events and cursor should reflect only those 2
2266 let response1 = merger.poll(ConsumeRequest::new(2)).await.unwrap();
2267 assert_eq!(response1.events.len(), 2);
2268 assert!(response1.has_more);
2269
2270 // Decode cursor to verify it tracks returned events
2271 let next_id = response1.next_id.clone().unwrap();
2272 let cursor = CompositeCursor::decode(&next_id).unwrap();
2273
2274 // Cursor should only have positions for shards that had events in the returned set
2275 let returned_shard_ids: std::collections::HashSet<_> =
2276 response1.events.iter().map(|e| e.shard_id).collect();
2277
2278 for shard_id in 0..2u16 {
2279 if returned_shard_ids.contains(&shard_id) {
2280 // Shard had returned events, cursor should have position
2281 assert!(
2282 cursor.get(shard_id).is_some(),
2283 "Cursor should have position for shard {} which had returned events",
2284 shard_id
2285 );
2286 }
2287 }
2288
2289 // Second poll should continue from where we left off
2290 let response2 = merger
2291 .poll(ConsumeRequest::new(10).from(next_id))
2292 .await
2293 .unwrap();
2294
2295 // Should get remaining 4 events
2296 assert_eq!(response2.events.len(), 4, "Should get remaining 4 events");
2297 }
2298
2299 /// When the per-shard fetch hits the
2300 /// PER_SHARD_FETCH_CAP clamp, the response must surface
2301 /// `truncated_at_per_shard_cap = true` so callers can detect
2302 /// the silent under-delivery.
2303 #[tokio::test]
2304 async fn poll_merger_surfaces_per_shard_cap_truncation() {
2305 let adapter = Arc::new(MockAdapter::new());
2306 // Single shard — over-fetch factor 2 — request limit
2307 // 50 000 → unclamped per_shard would be 100 000 → clamped
2308 // to PER_SHARD_FETCH_CAP (10 000).
2309 adapter.add_events(
2310 0,
2311 (0..1)
2312 .map(|i| StoredEvent::from_value(format!("0-{}", i), json!({}), 100, 0))
2313 .collect(),
2314 );
2315
2316 let merger = PollMerger::new(adapter, vec![0]);
2317 let response = merger.poll(ConsumeRequest::new(50_000)).await.unwrap();
2318 assert!(
2319 response.truncated_at_per_shard_cap,
2320 "large limit must flag the per-shard cap clamp",
2321 );
2322 }
2323
2324 /// A single-shard request with a filter where every fetched
2325 /// event matches AND the per-shard cap clamps the fetch must
2326 /// not stall — each poll must advance the cursor by
2327 /// `request.limit` matches and the loop must drain in
2328 /// `ceil(total / limit)` iterations.
2329 ///
2330 /// Without the rollback+override pairing, the rollback step
2331 /// would roll the cursor back to the original position with
2332 /// no matching forward override, so the cursor never
2333 /// advances — re-fetching the same events on every poll. The
2334 /// current logic resolves this: Step 1 rolls back for shards
2335 /// with truncated matches; Step 2 overrides to the last
2336 /// *returned* match's id so each poll makes `request.limit`
2337 /// worth of forward progress.
2338 #[tokio::test]
2339 async fn poll_merger_does_not_stall_on_single_shard_filter_under_cap() {
2340 // Adapter holds 50 events on shard 0. With a filter that
2341 // matches every event, request.limit = 10 → per_shard_limit
2342 // could be 30 (limit×3 over_fetch_factor) without the cap;
2343 // here we keep numbers small so the test is deterministic.
2344 let adapter = Arc::new(MockAdapter::new());
2345 let mut events: Vec<StoredEvent> = (0..50)
2346 .map(|i| {
2347 StoredEvent::from_value(
2348 format!("0-{}", i),
2349 json!({"keep": true}),
2350 (i + 1) as u64,
2351 0,
2352 )
2353 })
2354 .collect();
2355 adapter.add_events(0, events.split_off(0));
2356
2357 let merger = PollMerger::new(adapter.clone(), vec![0]);
2358 // Filter that matches everything ('keep': true on all events).
2359 let make_request = |from_id: Option<String>| ConsumeRequest {
2360 limit: 10,
2361 from_id,
2362 shards: None,
2363 filter: Some(Filter::eq("keep", json!(true))),
2364 ordering: Ordering::None,
2365 };
2366
2367 let mut cursor: Option<String> = None;
2368 let mut total = 0;
2369 let mut polls = 0;
2370 loop {
2371 polls += 1;
2372 assert!(
2373 polls < 20,
2374 "poll loop must terminate; got {} polls without draining \
2375 (cursor={:?}, total={})",
2376 polls,
2377 cursor,
2378 total,
2379 );
2380 let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2381 total += response.events.len();
2382 let new_cursor = response.next_id.clone();
2383 assert!(
2384 new_cursor.is_some(),
2385 "response must surface a cursor on every progress poll \
2386 (poll={}, returned={})",
2387 polls,
2388 response.events.len(),
2389 );
2390 // The cursor MUST advance OR has_more must be false.
2391 // Pre-fix the cursor would echo back unchanged on a
2392 // stall; we'd hit the polls<20 watchdog above.
2393 if cursor == new_cursor {
2394 assert!(
2395 !response.has_more,
2396 "cursor stuck at {:?} but has_more=true → stall",
2397 cursor,
2398 );
2399 break;
2400 }
2401 cursor = new_cursor;
2402 if !response.has_more && response.events.is_empty() {
2403 break;
2404 }
2405 }
2406 assert_eq!(
2407 total, 50,
2408 "full draining of 50 events must succeed without stall \
2409 (got {} in {} polls)",
2410 total, polls,
2411 );
2412 }
2413
2414 /// Multi-shard variant. The rollback+override logic must
2415 /// drain every match across multiple shards when each poll
2416 /// truncates matches from BOTH shards (so both shards land
2417 /// in the `rolled_back` set on the same poll). Without it,
2418 /// a stall on either shard would leave matches stranded;
2419 /// this exercises the dual-rollback path the single-shard
2420 /// test doesn't.
2421 #[tokio::test]
2422 async fn poll_merger_does_not_stall_on_multi_shard_filter_truncation() {
2423 let adapter = Arc::new(MockAdapter::new());
2424 // Two shards, 30 matching events each. Limit=10 means
2425 // every poll truncates matches from both shards
2426 // simultaneously (the global sort interleaves them, and
2427 // 10 < 2*30).
2428 for shard_id in 0..2u16 {
2429 let events: Vec<StoredEvent> = (0..30)
2430 .map(|i| {
2431 StoredEvent::from_value(
2432 format!("{}-{}", shard_id, i),
2433 json!({"keep": true}),
2434 // Stagger timestamps so the global sort
2435 // interleaves shards: shard 0 ts even,
2436 // shard 1 ts odd.
2437 (i * 2 + shard_id as usize) as u64 + 1,
2438 shard_id,
2439 )
2440 })
2441 .collect();
2442 adapter.add_events(shard_id, events);
2443 }
2444
2445 let merger = PollMerger::new(adapter, vec![0, 1]);
2446 let make_request = |from_id: Option<String>| ConsumeRequest {
2447 limit: 10,
2448 from_id,
2449 shards: None,
2450 filter: Some(Filter::eq("keep", json!(true))),
2451 ordering: Ordering::None,
2452 };
2453
2454 let mut cursor: Option<String> = None;
2455 let mut returned: std::collections::HashSet<String> = std::collections::HashSet::new();
2456 let mut polls = 0;
2457 loop {
2458 polls += 1;
2459 assert!(
2460 polls < 30,
2461 "multi-shard: poll loop must terminate; \
2462 got {} polls without draining (cursor={:?}, returned={})",
2463 polls,
2464 cursor,
2465 returned.len(),
2466 );
2467 let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2468 for e in &response.events {
2469 returned.insert(e.id.clone());
2470 }
2471 let new_cursor = response.next_id.clone();
2472 // Same stall guard as the single-shard test: the
2473 // cursor must advance OR has_more must be false on
2474 // any poll that returned events.
2475 if cursor == new_cursor {
2476 assert!(
2477 !response.has_more,
2478 "multi-shard: cursor stuck at {:?} but \
2479 has_more=true → stall",
2480 cursor,
2481 );
2482 break;
2483 }
2484 cursor = new_cursor;
2485 if !response.has_more && response.events.is_empty() {
2486 break;
2487 }
2488 }
2489 assert_eq!(
2490 returned.len(),
2491 60,
2492 "multi-shard: every match across both shards must \
2493 surface exactly once (got {} unique in {} polls)",
2494 returned.len(),
2495 polls,
2496 );
2497 }
2498
2499 /// Corollary: a small request that fits well below
2500 /// the cap must NOT flag truncation.
2501 #[tokio::test]
2502 async fn poll_merger_does_not_flag_truncation_on_small_limit() {
2503 let adapter = Arc::new(MockAdapter::new());
2504 adapter.add_events(
2505 0,
2506 vec![StoredEvent::from_value(
2507 "0-1".to_string(),
2508 json!({}),
2509 100,
2510 0,
2511 )],
2512 );
2513 let merger = PollMerger::new(adapter, vec![0]);
2514 let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2515 assert!(
2516 !response.truncated_at_per_shard_cap,
2517 "small limits must not flag the cap",
2518 );
2519 }
2520
2521 #[tokio::test]
2522 async fn test_poll_merger_small_limit_many_shards() {
2523 // Regression: limit < shard count caused integer division truncation to 0,
2524 // making per-shard fetch too small. Now uses ceiling division.
2525 let adapter = Arc::new(MockAdapter::new());
2526 let num_shards = 8u16;
2527
2528 for shard_id in 0..num_shards {
2529 adapter.add_events(
2530 shard_id,
2531 vec![StoredEvent::from_value(
2532 format!("{}-1", shard_id),
2533 json!({"shard": shard_id}),
2534 100,
2535 shard_id,
2536 )],
2537 );
2538 }
2539
2540 let merger = PollMerger::new(adapter, (0..num_shards).collect());
2541
2542 // Request fewer events than shards — should still work
2543 let request = ConsumeRequest::new(3);
2544 let response = merger.poll(request).await.unwrap();
2545
2546 assert_eq!(response.events.len(), 3);
2547 assert!(response.has_more);
2548 }
2549
2550 #[tokio::test]
2551 async fn test_regression_filtered_shards_cursor_advances() {
2552 // Bug 3: "Cursor never advances for filtered-out shards"
2553 //
2554 // When shard 1's events are entirely filtered out, the cursor for shard 1
2555 // must still advance past those events. Otherwise, subsequent polls will
2556 // re-fetch the same filtered-out events forever.
2557 let adapter = Arc::new(MockAdapter::new());
2558
2559 // Shard 0: events matching the filter
2560 adapter.add_events(
2561 0,
2562 vec![
2563 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2564 StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 200, 0),
2565 ],
2566 );
2567
2568 // Shard 1: events that will be filtered out
2569 adapter.add_events(
2570 1,
2571 vec![
2572 StoredEvent::from_value("1-1".to_string(), json!({"type": "message"}), 150, 1),
2573 StoredEvent::from_value("1-2".to_string(), json!({"type": "message"}), 250, 1),
2574 ],
2575 );
2576
2577 let merger = PollMerger::new(adapter, vec![0, 1]);
2578 let filter = Filter::eq("type", json!("token"));
2579
2580 // First poll: should return only the "token" events from shard 0
2581 let response1 = merger
2582 .poll(ConsumeRequest::new(100).filter(filter.clone()))
2583 .await
2584 .unwrap();
2585
2586 assert_eq!(response1.events.len(), 2, "Should get 2 token events");
2587 for event in &response1.events {
2588 assert_eq!(
2589 event.shard_id, 0,
2590 "All returned events should be from shard 0"
2591 );
2592 }
2593
2594 let cursor1 = response1
2595 .next_id
2596 .expect("Should have a cursor after first poll");
2597
2598 // Verify the cursor advanced for shard 1 even though its events were filtered out
2599 let decoded = CompositeCursor::decode(&cursor1).unwrap();
2600 assert!(
2601 decoded.get(1).is_some(),
2602 "Cursor must advance for shard 1 even though all its events were filtered out"
2603 );
2604 assert_eq!(
2605 decoded.get(1),
2606 Some("1-2"),
2607 "Shard 1 cursor should point to its last fetched event"
2608 );
2609
2610 // Second poll with the cursor: should NOT re-fetch shard 1's events
2611 let response2 = merger
2612 .poll(ConsumeRequest::new(100).filter(filter).from(cursor1))
2613 .await
2614 .unwrap();
2615
2616 assert!(
2617 response2.events.is_empty(),
2618 "Second poll should return no events (all events already consumed or filtered)"
2619 );
2620 }
2621
2622 #[tokio::test]
2623 async fn test_regression_poll_merger_filter_does_not_infinite_loop() {
2624 // Regression: when one shard has events matching the filter and another
2625 // shard has events that are all filtered out, polling in pages must
2626 // terminate and return all matching events without looping forever.
2627 let adapter = Arc::new(MockAdapter::new());
2628
2629 // Shard 0: 100 events all matching filter
2630 let shard0_events: Vec<_> = (1..=100)
2631 .map(|i| {
2632 StoredEvent::from_value(
2633 format!("0-{}", i),
2634 json!({"type": "token", "idx": i}),
2635 i as u64 * 10,
2636 0,
2637 )
2638 })
2639 .collect();
2640 adapter.add_events(0, shard0_events);
2641
2642 // Shard 1: 100 events none matching filter
2643 let shard1_events: Vec<_> = (1..=100)
2644 .map(|i| {
2645 StoredEvent::from_value(
2646 format!("1-{}", i),
2647 json!({"type": "message", "idx": i}),
2648 i as u64 * 10 + 5,
2649 1,
2650 )
2651 })
2652 .collect();
2653 adapter.add_events(1, shard1_events);
2654
2655 let merger = PollMerger::new(adapter, vec![0, 1]);
2656 let filter = Filter::eq("type", json!("token"));
2657
2658 let mut all_events = Vec::new();
2659 let mut cursor: Option<String> = None;
2660 let max_iterations = 50;
2661 let mut iterations = 0;
2662
2663 loop {
2664 iterations += 1;
2665 if iterations > max_iterations {
2666 panic!(
2667 "Infinite loop detected after {} iterations! Collected {} events so far.",
2668 max_iterations,
2669 all_events.len()
2670 );
2671 }
2672
2673 let mut request = ConsumeRequest::new(50).filter(filter.clone());
2674 if let Some(c) = &cursor {
2675 request = request.from(c.clone());
2676 }
2677
2678 let response = merger.poll(request).await.unwrap();
2679 all_events.extend(response.events);
2680
2681 if !response.has_more {
2682 break;
2683 }
2684 cursor = response.next_id;
2685 }
2686
2687 // Should have collected exactly 100 matching events from shard 0
2688 assert_eq!(
2689 all_events.len(),
2690 100,
2691 "Expected 100 matching events, got {}. Iterations: {}",
2692 all_events.len(),
2693 iterations
2694 );
2695
2696 // All events should be from shard 0 (the "token" shard)
2697 for event in &all_events {
2698 assert_eq!(
2699 event.shard_id, 0,
2700 "All matching events should come from shard 0"
2701 );
2702 }
2703
2704 // Verify no duplicates
2705 let mut ids: Vec<_> = all_events.iter().map(|e| e.id.clone()).collect();
2706 ids.sort();
2707 ids.dedup();
2708 assert_eq!(ids.len(), 100, "Should have no duplicate events");
2709 }
2710
2711 #[tokio::test]
2712 async fn test_regression_all_events_filtered_returns_cursor() {
2713 // Regression: when every fetched event was filtered out, next_id was
2714 // None, leaving the caller stuck re-fetching the same events forever.
2715 let adapter = Arc::new(MockAdapter::new());
2716
2717 // Only non-matching events
2718 adapter.add_events(
2719 0,
2720 vec![
2721 StoredEvent::from_value("0-1".to_string(), json!({"type": "noise"}), 100, 0),
2722 StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 200, 0),
2723 ],
2724 );
2725
2726 let merger = PollMerger::new(adapter, vec![0]);
2727 let filter = Filter::eq("type", json!("signal"));
2728
2729 let response = merger
2730 .poll(ConsumeRequest::new(100).filter(filter))
2731 .await
2732 .unwrap();
2733
2734 // No events match, but cursor must still advance
2735 assert!(response.events.is_empty());
2736 assert!(
2737 response.next_id.is_some(),
2738 "cursor must advance past filtered events even when none match"
2739 );
2740 }
2741
2742 /// Exercises the non-lazy filter branch: when `Ordering::InsertionTs`
2743 /// is requested we can't short-circuit at `limit + 1` matches (the
2744 /// sort needs every event first), so the code falls through to
2745 /// `retain` → sort → truncate. This test pins:
2746 /// - Results are globally sorted by `insertion_ts` (not input order).
2747 /// - Only filter-matching events come through.
2748 /// - Truncation picks the `limit` *earliest* matches by ts.
2749 /// - `has_more` is set when matches exceed `limit`.
2750 #[tokio::test]
2751 async fn test_poll_merger_filter_insertion_ts_truncates_after_sort() {
2752 let adapter = Arc::new(MockAdapter::new());
2753
2754 // Interleave shards with out-of-order timestamps and a mix of
2755 // matching / non-matching events. Matching timestamps: 120, 200,
2756 // 260, 400. Non-matching timestamps: 100, 300.
2757 adapter.add_events(
2758 0,
2759 vec![
2760 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 400, 0),
2761 StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 100, 0),
2762 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 200, 0),
2763 ],
2764 );
2765 adapter.add_events(
2766 1,
2767 vec![
2768 StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 260, 1),
2769 StoredEvent::from_value("1-2".to_string(), json!({"type": "noise"}), 300, 1),
2770 StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 120, 1),
2771 ],
2772 );
2773
2774 let merger = PollMerger::new(adapter, vec![0, 1]);
2775 let filter = Filter::eq("type", json!("token"));
2776
2777 // 4 matches exist; asking for 2 must yield the two earliest
2778 // after a full sort (120, 200) and signal has_more.
2779 let response = merger
2780 .poll(
2781 ConsumeRequest::new(2)
2782 .filter(filter)
2783 .ordering(Ordering::InsertionTs),
2784 )
2785 .await
2786 .unwrap();
2787
2788 assert_eq!(response.events.len(), 2);
2789 assert_eq!(
2790 response.events[0].insertion_ts, 120,
2791 "earliest match must come first"
2792 );
2793 assert_eq!(response.events[1].insertion_ts, 200);
2794 assert!(
2795 response.has_more,
2796 "two more matching events remain past the limit"
2797 );
2798 }
2799
2800 #[tokio::test]
2801 async fn test_regression_corrupt_event_filter_drop_is_consistent_and_logged() {
2802 // Regression: corrupt events (raw bytes that don't deserialize as
2803 // JSON) used to be silently dropped from the filtered poll path
2804 // via `event.parse().map(...).unwrap_or(false)`, while the
2805 // unfiltered path returned them as-is. That inconsistency hid
2806 // upstream framing/storage corruption from anyone running with a
2807 // filter (i.e. most consumers).
2808 //
2809 // The fix routes parse failures through `event_matches_filter`,
2810 // which emits `tracing::warn!` per dropped event. We don't have
2811 // a tracing-test subscriber wired up so we don't assert on the
2812 // log line itself; instead we pin the behavioral surface so a
2813 // future regression that re-silences corruption (e.g., dropping
2814 // the helper) shows up in code review:
2815 // - filtered poll: corrupt event is dropped, valid event kept
2816 // - unfiltered poll: corrupt event flows through unchanged
2817 //
2818 // If the helper is removed or the warn! is downgraded to debug!,
2819 // this test still passes — but the helper's doc-comment names
2820 // the inconsistency and is the artifact that protects the
2821 // observability requirement.
2822 let adapter = Arc::new(MockAdapter::new());
2823 adapter.add_events(
2824 0,
2825 vec![
2826 StoredEvent::from_value("0-1".to_string(), json!({"type": "ok"}), 100, 0),
2827 // Raw bytes that don't parse as JSON — a torn write or
2828 // upstream framing bug surface.
2829 StoredEvent::new(
2830 "0-2".to_string(),
2831 bytes::Bytes::from_static(b"\xff\xff not json \xff"),
2832 200,
2833 0,
2834 ),
2835 ],
2836 );
2837
2838 let merger = PollMerger::new(adapter, vec![0]);
2839
2840 // Filtered: corrupt event must be dropped, valid event kept.
2841 let filtered = merger
2842 .poll(ConsumeRequest::new(100).filter(Filter::eq("type", json!("ok"))))
2843 .await
2844 .unwrap();
2845 assert_eq!(
2846 filtered.events.len(),
2847 1,
2848 "filtered poll must drop the corrupt event"
2849 );
2850 assert_eq!(filtered.events[0].id, "0-1");
2851
2852 // Unfiltered: corrupt event flows through. Documenting that the
2853 // unfiltered path is the *only* way an operator currently sees
2854 // the corrupt bytes — without the warn! the filtered path is
2855 // a black hole.
2856 let unfiltered = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2857 assert_eq!(
2858 unfiltered.events.len(),
2859 2,
2860 "unfiltered poll must surface the corrupt event verbatim"
2861 );
2862 let ids: Vec<_> = unfiltered.events.iter().map(|e| e.id.as_str()).collect();
2863 assert!(ids.contains(&"0-1"));
2864 assert!(ids.contains(&"0-2"));
2865 }
2866
2867 /// Regression: BUG_REPORT.md #2 — `Ordering::None` filter previously
2868 /// broke out of the drain loop once `kept.len() >= limit + 1`,
2869 /// which silently discarded events from later shards without
2870 /// checking the filter. Combined with `new_cursor` advancing for
2871 /// every polled shard, that meant matching events on un-inspected
2872 /// shards were lost forever.
2873 ///
2874 /// Setup: shard 0 has matches followed by shard 1 with matches.
2875 /// With `limit=2`, shard 0's first three events (two matches plus
2876 /// one extra to trigger has_more) used to satisfy the early break,
2877 /// silently dropping shard 1's matches AND advancing past them.
2878 /// The fix runs a full `retain` pass over every fetched event,
2879 /// then rolls back the cursor for shards whose matches were
2880 /// truncated so they're re-fetched on the next poll.
2881 #[tokio::test]
2882 async fn test_regression_ordering_none_filter_does_not_strand_later_shards() {
2883 let adapter = Arc::new(MockAdapter::new());
2884
2885 // Shard 0: 3 matching events.
2886 adapter.add_events(
2887 0,
2888 vec![
2889 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2890 StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2891 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2892 ],
2893 );
2894 // Shard 1: 3 matching events.
2895 adapter.add_events(
2896 1,
2897 vec![
2898 StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 200, 1),
2899 StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 210, 1),
2900 StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 220, 1),
2901 ],
2902 );
2903
2904 let merger = PollMerger::new(adapter, vec![0, 1]);
2905 let filter = Filter::eq("type", json!("token"));
2906
2907 // Page through with a small limit — over many polls every
2908 // matching event must surface exactly once. Bound iterations
2909 // to detect either a stall or an explosion.
2910 let mut all_returned: Vec<String> = Vec::new();
2911 let mut cursor: Option<String> = None;
2912 for _ in 0..20 {
2913 let mut req = ConsumeRequest::new(2).filter(filter.clone());
2914 if let Some(c) = &cursor {
2915 req = req.from(c.clone());
2916 }
2917 let resp = merger.poll(req).await.unwrap();
2918 for e in &resp.events {
2919 all_returned.push(e.id.clone());
2920 }
2921 if !resp.has_more {
2922 break;
2923 }
2924 cursor = resp.next_id;
2925 }
2926
2927 all_returned.sort();
2928 all_returned.dedup();
2929 assert_eq!(
2930 all_returned,
2931 vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2932 "every matching event from every shard must be returned exactly once"
2933 );
2934 }
2935
2936 /// Regression: BUG_REPORT.md #23 — `Ordering::InsertionTs` filter
2937 /// previously stranded matches on shards whose matching events
2938 /// all sorted later than `limit` matches from other shards. The
2939 /// global sort+truncate dropped them, the cursor-override only
2940 /// fired for shards present in the *returned* set, and so the
2941 /// cursor for the un-returned shard advanced to its fetched
2942 /// position via `new_cursor` — silently skipping the matches.
2943 ///
2944 /// Setup: shard 0 has 3 early-ts matches and shard 1 has 3
2945 /// late-ts matches. With `limit=2` and `InsertionTs` ordering,
2946 /// the first poll returns the two earliest from shard 0;
2947 /// shard 1's matches must NOT be lost. The fix detects that
2948 /// shard 1 had matches truncated and rolls its cursor back so
2949 /// they're re-fetched on the next poll.
2950 #[tokio::test]
2951 async fn test_regression_insertion_ts_filter_does_not_strand_late_shard() {
2952 let adapter = Arc::new(MockAdapter::new());
2953
2954 adapter.add_events(
2955 0,
2956 vec![
2957 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2958 StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2959 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2960 ],
2961 );
2962 adapter.add_events(
2963 1,
2964 vec![
2965 StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 1000, 1),
2966 StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 1010, 1),
2967 StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 1020, 1),
2968 ],
2969 );
2970
2971 let merger = PollMerger::new(adapter, vec![0, 1]);
2972 let filter = Filter::eq("type", json!("token"));
2973
2974 let mut all_returned: Vec<String> = Vec::new();
2975 let mut cursor: Option<String> = None;
2976 for _ in 0..20 {
2977 let mut req = ConsumeRequest::new(2)
2978 .filter(filter.clone())
2979 .ordering(Ordering::InsertionTs);
2980 if let Some(c) = &cursor {
2981 req = req.from(c.clone());
2982 }
2983 let resp = merger.poll(req).await.unwrap();
2984 for e in &resp.events {
2985 all_returned.push(e.id.clone());
2986 }
2987 if !resp.has_more {
2988 break;
2989 }
2990 cursor = resp.next_id;
2991 }
2992
2993 all_returned.sort();
2994 all_returned.dedup();
2995 assert_eq!(
2996 all_returned,
2997 vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2998 "matches from the late-ts shard must not be lost to truncation"
2999 );
3000 }
3001
3002 /// Regression: BUG_REPORT.md #50 — if any adapter returns
3003 /// `has_more: true` with no events and no `next_id`, the merger
3004 /// previously forwarded that as `(has_more=true, next_id=None)`,
3005 /// causing the caller to re-poll from the same starting cursor
3006 /// indefinitely. The fix suppresses `has_more` whenever the
3007 /// merger itself made no observable progress (no events AND no
3008 /// cursor advance).
3009 #[tokio::test]
3010 async fn has_more_is_suppressed_when_no_progress() {
3011 struct LiarAdapter;
3012
3013 #[async_trait]
3014 impl Adapter for LiarAdapter {
3015 async fn init(&mut self) -> Result<(), AdapterError> {
3016 Ok(())
3017 }
3018 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3019 Ok(())
3020 }
3021 async fn flush(&self) -> Result<(), AdapterError> {
3022 Ok(())
3023 }
3024 async fn shutdown(&self) -> Result<(), AdapterError> {
3025 Ok(())
3026 }
3027 async fn poll_shard(
3028 &self,
3029 _shard_id: u16,
3030 _from_id: Option<&str>,
3031 _limit: usize,
3032 ) -> Result<ShardPollResult, AdapterError> {
3033 // The pathological case: claim has_more without
3034 // returning events or advancing the cursor.
3035 Ok(ShardPollResult {
3036 events: Vec::new(),
3037 next_id: None,
3038 has_more: true,
3039 })
3040 }
3041 fn name(&self) -> &'static str {
3042 "liar"
3043 }
3044 }
3045
3046 let adapter: Arc<dyn Adapter> = Arc::new(LiarAdapter);
3047 let merger = PollMerger::new(adapter, vec![0, 1]);
3048 let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
3049
3050 assert!(
3051 response.events.is_empty(),
3052 "no events were emitted, but merger returned {}",
3053 response.events.len()
3054 );
3055 // The whole point of this fix: don't let a misbehaving
3056 // adapter trick the caller into an infinite re-poll.
3057 assert!(
3058 !response.has_more,
3059 "has_more must be suppressed when merger made no progress (#50)"
3060 );
3061 assert!(
3062 response.next_id.is_none(),
3063 "next_id must remain None when no progress was made (#50)"
3064 );
3065 }
3066
3067 /// Pin: a stalled poll (no events, no cursor advance) that
3068 /// was given an input cursor must echo the cursor back to the
3069 /// caller. Pre-fix the merger returned `next_id = None` on no
3070 /// progress, so a caller that interpreted None as "no events
3071 /// — restart from the beginning" silently re-fetched from the
3072 /// stream's start across the stall, losing pagination
3073 /// continuity.
3074 #[tokio::test]
3075 async fn stalled_poll_echoes_caller_cursor_back() {
3076 struct EmptyAdapter;
3077
3078 #[async_trait]
3079 impl Adapter for EmptyAdapter {
3080 async fn init(&mut self) -> Result<(), AdapterError> {
3081 Ok(())
3082 }
3083 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3084 Ok(())
3085 }
3086 async fn flush(&self) -> Result<(), AdapterError> {
3087 Ok(())
3088 }
3089 async fn shutdown(&self) -> Result<(), AdapterError> {
3090 Ok(())
3091 }
3092 async fn poll_shard(
3093 &self,
3094 _shard_id: u16,
3095 _from_id: Option<&str>,
3096 _limit: usize,
3097 ) -> Result<ShardPollResult, AdapterError> {
3098 Ok(ShardPollResult {
3099 events: Vec::new(),
3100 next_id: None,
3101 has_more: false,
3102 })
3103 }
3104 fn name(&self) -> &'static str {
3105 "empty"
3106 }
3107 }
3108
3109 let adapter: Arc<dyn Adapter> = Arc::new(EmptyAdapter);
3110 let merger = PollMerger::new(adapter, vec![0, 1]);
3111
3112 // Build a real composite cursor for the request to echo.
3113 let mut cursor = CompositeCursor::new();
3114 cursor.set(0, "1702-0".to_string());
3115 cursor.set(1, "1703-0".to_string());
3116 let encoded = cursor.encode().unwrap();
3117
3118 let mut req = ConsumeRequest::new(100);
3119 req.from_id = Some(encoded.clone());
3120
3121 let response = merger.poll(req).await.unwrap();
3122
3123 assert!(response.events.is_empty());
3124 assert_eq!(
3125 response.next_id.as_deref(),
3126 Some(encoded.as_str()),
3127 "stalled poll with input cursor must echo cursor back \
3128 (got {:?}); pre-fix this was None and callers paged \
3129 back to the stream's start",
3130 response.next_id,
3131 );
3132
3133 // Without an input cursor, no progress + no input → still
3134 // None (preserving the prior behavior of #50).
3135 let response_no_cursor = merger.poll(ConsumeRequest::new(100)).await.unwrap();
3136 assert!(response_no_cursor.next_id.is_none());
3137 }
3138
3139 /// Regression: a shard whose `set_checked` cursor write is refused
3140 /// because of a backend-format mismatch (e.g. a JetStream → Redis
3141 /// migration mid-stream) must NOT have its events delivered to
3142 /// the caller. Pre-fix `nc.set_checked(...)`'s `bool` return was
3143 /// discarded — events were merged into the response while the
3144 /// cursor stayed pinned at the old format, so every subsequent
3145 /// poll re-fetched and re-delivered the same events forever.
3146 ///
3147 /// Post-fix:
3148 /// - the refused shard's events are dropped from the response,
3149 /// - its id is surfaced in `failed_shards` so the caller sees
3150 /// the migration as a structured signal, and
3151 /// - a shard whose write was accepted is unaffected (events
3152 /// still flow through, cursor still advances).
3153 #[tokio::test]
3154 async fn refused_set_checked_drops_shard_events_and_surfaces_failure() {
3155 // Adapter that ignores `from_id` and always emits the same
3156 // events with the same next_id. The test seeds an input
3157 // cursor in a different format so set_checked refuses.
3158 struct FormatAdapter {
3159 events_by_shard: HashMap<u16, Vec<StoredEvent>>,
3160 }
3161 #[async_trait]
3162 impl Adapter for FormatAdapter {
3163 async fn init(&mut self) -> Result<(), AdapterError> {
3164 Ok(())
3165 }
3166 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3167 Ok(())
3168 }
3169 async fn flush(&self) -> Result<(), AdapterError> {
3170 Ok(())
3171 }
3172 async fn shutdown(&self) -> Result<(), AdapterError> {
3173 Ok(())
3174 }
3175 async fn poll_shard(
3176 &self,
3177 shard_id: u16,
3178 _from_id: Option<&str>,
3179 _limit: usize,
3180 ) -> Result<ShardPollResult, AdapterError> {
3181 let events = self
3182 .events_by_shard
3183 .get(&shard_id)
3184 .cloned()
3185 .unwrap_or_default();
3186 let next_id = events.last().map(|e| e.id.clone());
3187 Ok(ShardPollResult {
3188 events,
3189 next_id,
3190 has_more: false,
3191 })
3192 }
3193 fn name(&self) -> &'static str {
3194 "format-adapter"
3195 }
3196 }
3197
3198 let mut events_by_shard: HashMap<u16, Vec<StoredEvent>> = HashMap::new();
3199 // shard 0 emits Redis-format ids
3200 events_by_shard.insert(
3201 0,
3202 vec![
3203 StoredEvent::from_value("1700-0".to_string(), json!({"shard": 0}), 100, 0),
3204 StoredEvent::from_value("1701-0".to_string(), json!({"shard": 0}), 101, 0),
3205 ],
3206 );
3207 // shard 1 emits numeric (JetStream-format) ids
3208 events_by_shard.insert(
3209 1,
3210 vec![StoredEvent::from_value(
3211 "42".to_string(),
3212 json!({"shard": 1}),
3213 100,
3214 1,
3215 )],
3216 );
3217
3218 let adapter: Arc<dyn Adapter> = Arc::new(FormatAdapter { events_by_shard });
3219 let merger = PollMerger::new(adapter, vec![0, 1]);
3220
3221 // Seed the input cursor in the OPPOSITE format for each
3222 // shard so set_checked is guaranteed to refuse both writes.
3223 // - shard 0 cursor in Numeric format; adapter wants Redis → refused
3224 // - shard 1 cursor in Redis format; adapter wants Numeric → refused
3225 let mut cursor = CompositeCursor::new();
3226 cursor.set(0, "41".to_string());
3227 cursor.set(1, "1600-0".to_string());
3228 let encoded = cursor.encode().unwrap();
3229
3230 // Use a filter so `new_cursor` is populated (set_checked
3231 // path #1 only fires on the filter branch). The filter
3232 // matches every event so it doesn't drop them on its own.
3233 let mut req = ConsumeRequest::new(10);
3234 req.from_id = Some(encoded.clone());
3235 // NOT(empty-AND) is a tautology — `Filter::and(vec![])`
3236 // matches nothing, so the negation matches everything.
3237 // We only need filter=Some(...) so the `new_cursor` clone
3238 // is built and the Step-1 set_checked path runs.
3239 req.filter = Some(crate::consumer::Filter::not(crate::consumer::Filter::and(
3240 Vec::new(),
3241 )));
3242
3243 let response = merger.poll(req).await.unwrap();
3244
3245 assert!(
3246 response.events.is_empty(),
3247 "events from refused shards must be dropped; got {} \
3248 event(s) — pre-fix this is where the duplicate-delivery \
3249 loop began",
3250 response.events.len(),
3251 );
3252 let mut failed = response.failed_shards.clone();
3253 failed.sort_unstable();
3254 assert_eq!(
3255 failed,
3256 vec![0u16, 1],
3257 "both shards must surface in failed_shards so the caller \
3258 sees the backend-format mismatch as a structured signal",
3259 );
3260 }
3261
3262 /// Companion: when set_checked is refused for ONE shard but
3263 /// accepted for another, only the refused shard is dropped — the
3264 /// accepted shard's events flow through normally.
3265 #[tokio::test]
3266 async fn refused_set_checked_does_not_drop_unaffected_shards() {
3267 struct FormatAdapter {
3268 events_by_shard: HashMap<u16, Vec<StoredEvent>>,
3269 }
3270 #[async_trait]
3271 impl Adapter for FormatAdapter {
3272 async fn init(&mut self) -> Result<(), AdapterError> {
3273 Ok(())
3274 }
3275 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3276 Ok(())
3277 }
3278 async fn flush(&self) -> Result<(), AdapterError> {
3279 Ok(())
3280 }
3281 async fn shutdown(&self) -> Result<(), AdapterError> {
3282 Ok(())
3283 }
3284 async fn poll_shard(
3285 &self,
3286 shard_id: u16,
3287 _from_id: Option<&str>,
3288 _limit: usize,
3289 ) -> Result<ShardPollResult, AdapterError> {
3290 let events = self
3291 .events_by_shard
3292 .get(&shard_id)
3293 .cloned()
3294 .unwrap_or_default();
3295 let next_id = events.last().map(|e| e.id.clone());
3296 Ok(ShardPollResult {
3297 events,
3298 next_id,
3299 has_more: false,
3300 })
3301 }
3302 fn name(&self) -> &'static str {
3303 "format-adapter-partial"
3304 }
3305 }
3306
3307 let mut events_by_shard: HashMap<u16, Vec<StoredEvent>> = HashMap::new();
3308 // Both shards emit Redis-format ids on the adapter side.
3309 events_by_shard.insert(
3310 0,
3311 vec![StoredEvent::from_value(
3312 "1700-0".to_string(),
3313 json!({"shard": 0}),
3314 100,
3315 0,
3316 )],
3317 );
3318 events_by_shard.insert(
3319 1,
3320 vec![StoredEvent::from_value(
3321 "1700-0".to_string(),
3322 json!({"shard": 1}),
3323 100,
3324 1,
3325 )],
3326 );
3327
3328 // Seed cursor: shard 0 in Numeric format (will be refused),
3329 // shard 1 in Redis format (will be accepted).
3330 let mut cursor = CompositeCursor::new();
3331 cursor.set(0, "41".to_string());
3332 cursor.set(1, "1600-0".to_string());
3333 let encoded = cursor.encode().unwrap();
3334
3335 let adapter: Arc<dyn Adapter> = Arc::new(FormatAdapter { events_by_shard });
3336 let merger = PollMerger::new(adapter, vec![0, 1]);
3337 let mut req = ConsumeRequest::new(10);
3338 req.from_id = Some(encoded);
3339 // NOT(empty-AND) is a tautology — `Filter::and(vec![])`
3340 // matches nothing, so the negation matches everything.
3341 // We only need filter=Some(...) so the `new_cursor` clone
3342 // is built and the Step-1 set_checked path runs.
3343 req.filter = Some(crate::consumer::Filter::not(crate::consumer::Filter::and(
3344 Vec::new(),
3345 )));
3346
3347 let response = merger.poll(req).await.unwrap();
3348
3349 assert_eq!(
3350 response.events.len(),
3351 1,
3352 "shard 1's event must still be delivered; shard 0's must be dropped",
3353 );
3354 assert_eq!(response.events[0].shard_id, 1);
3355 assert_eq!(response.failed_shards, vec![0u16]);
3356 }
3357
3358 /// Regression: BUG_REPORT.md #52 — `sort_by_key(|e| e.insertion_ts)`
3359 /// is stable but ties across shards depend on `join_all`'s
3360 /// completion order, which is non-deterministic. Combined with
3361 /// `truncate(limit)`, this could drop or duplicate events at the
3362 /// limit boundary across consecutive polls. The fix breaks ties
3363 /// deterministically on `(shard_id, id)`.
3364 #[tokio::test]
3365 async fn sort_breaks_ties_deterministically_across_shards() {
3366 // Two shards with events that share `insertion_ts` so the
3367 // tiebreaker controls the order.
3368 let adapter = Arc::new(MockAdapter::new());
3369 adapter.add_events(
3370 0,
3371 vec![
3372 StoredEvent::from_value("0-a".to_string(), json!({}), 100, 0),
3373 StoredEvent::from_value("0-b".to_string(), json!({}), 100, 0),
3374 ],
3375 );
3376 adapter.add_events(
3377 1,
3378 vec![
3379 StoredEvent::from_value("1-a".to_string(), json!({}), 100, 1),
3380 StoredEvent::from_value("1-b".to_string(), json!({}), 100, 1),
3381 ],
3382 );
3383
3384 // Poll many times; the order must be stable.
3385 let merger = PollMerger::new(adapter, vec![0, 1]);
3386 let mut prior_order: Option<Vec<String>> = None;
3387 for iter in 0..20 {
3388 let r = merger
3389 .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
3390 .await
3391 .unwrap();
3392 let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
3393 if let Some(prev) = &prior_order {
3394 assert_eq!(
3395 &ids, prev,
3396 "iter {iter}: order is non-deterministic — sort tie-break failed (#52)"
3397 );
3398 }
3399 prior_order = Some(ids);
3400 }
3401
3402 // And the order must match `(shard_id, id)`.
3403 let r = merger
3404 .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
3405 .await
3406 .unwrap();
3407 let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
3408 assert_eq!(ids, vec!["0-a", "0-b", "1-a", "1-b"]);
3409 }
3410}