Skip to main content

net/consumer/
merge.rs

1//! Cross-shard poll merge layer.
2//!
3//! This module handles polling from multiple shards and merging the results
4//! into a unified stream with proper cursor management.
5//!
6//! # Composite Cursor
7//!
8//! When polling multiple shards, we track position in each shard using a
9//! composite cursor encoded as base64 JSON:
10//!
11//! ```json
12//! {"0": "1702123456789-0", "1": "1702123456790-0", ...}
13//! ```
14
15use std::cmp::Ordering as CmpOrdering;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
20use serde::{Deserialize, Serialize};
21
22use crate::adapter::{Adapter, ShardPollResult};
23use crate::consumer::filter::{CompiledFilter, Filter};
24use crate::error::{AdapterError, ConsumerError};
25use crate::event::StoredEvent;
26
27/// Compare two adapter-emitted stream ids using numeric semantics for
28/// the formats both built-in adapters produce, with a lex fallback for
29/// already-lex-comparable opaque ids (ULID, UUIDv7, fixed-width hex).
30///
31/// A raw `str::cmp` would invert ordering on the unpadded numeric
32/// ids the JetStream adapter emits (`seq.to_string()`) and the
33/// Redis Streams server-side `{ms}-{seq}` format — `"9" > "10"`
34/// lexicographically would wedge the cursor at every decade
35/// boundary. The structured comparator below handles both formats
36/// numerically. Mixed-padding comparisons across upgrades still
37/// compare correctly because parse-then-compare ignores leading
38/// zeros.
39///
40/// Order of attempts:
41/// 1. Both ids parse as `<u64>-<u64>` (Redis Streams).
42/// 2. Both ids parse as `<u128>` (raw numeric, padded or unpadded).
43/// 3. Lex compare (ULID, UUID, hex digests, etc.).
44///
45/// We deliberately do not try to mix formats — if one side is `123`
46/// and the other is `456-0`, the lex fallback kicks in. In practice
47/// a single adapter emits a single format, so this only matters for
48/// pathological mixed-source streams.
49pub(crate) fn compare_stream_ids(a: &str, b: &str) -> CmpOrdering {
50    // Redis Streams `<ms>-<seq>`.
51    if let (Some((a_ms, a_seq)), Some((b_ms, b_seq))) = (split_redis_id(a), split_redis_id(b)) {
52        return (a_ms, a_seq).cmp(&(b_ms, b_seq));
53    }
54    // Plain numeric (JetStream `seq.to_string()` or future zero-padded form).
55    if let (Ok(an), Ok(bn)) = (a.parse::<u128>(), b.parse::<u128>()) {
56        return an.cmp(&bn);
57    }
58    // Opaque id — assumed already lex-comparable.
59    a.cmp(b)
60}
61
62fn split_redis_id(s: &str) -> Option<(u64, u64)> {
63    let (ms, seq) = s.split_once('-')?;
64    Some((ms.parse().ok()?, seq.parse().ok()?))
65}
66
67/// Coarse classifier for a stream id. Two ids of the same
68/// format compare safely via `compare_stream_ids`; two ids of
69/// different formats fall through to a lex compare that may
70/// wedge the cursor (see `update_from_events`).
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub(crate) enum IdFormat {
73    /// Redis Streams `<ms>-<seq>`.
74    Redis,
75    /// Plain numeric (JetStream `seq.to_string()`).
76    Numeric,
77    /// Anything else — assumed lex-comparable.
78    Opaque,
79}
80
81pub(crate) fn id_format(s: &str) -> IdFormat {
82    if split_redis_id(s).is_some() {
83        IdFormat::Redis
84    } else if s.parse::<u128>().is_ok() {
85        IdFormat::Numeric
86    } else {
87        IdFormat::Opaque
88    }
89}
90
91/// Backing type for per-shard cursor positions. `Arc<str>` makes
92/// cursor clones (and internal copies during poll merging) cheap by
93/// reference-counting the id bytes rather than copying them.
94type CursorPos = Arc<str>;
95
96/// Ordering mode for consumed events.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum Ordering {
99    /// Return events in arbitrary order (fastest).
100    ///
101    /// **Cross-shard order is non-deterministic.** Events from a
102    /// given shard preserve their per-shard arrival order, but the
103    /// interleaving across shards depends on `futures::join_all`'s
104    /// completion order — which varies with runtime scheduling,
105    /// adapter latency, and concurrent load. Callers that need a
106    /// stable order across polls must use [`Ordering::InsertionTs`]
107    /// or sort the response themselves (e.g. by `(shard_id, id)`).
108    #[default]
109    None,
110    /// Sort events by insertion timestamp (cross-shard ordering).
111    InsertionTs,
112}
113
114/// Composite cursor tracking position across multiple shards.
115#[derive(Debug, Clone, Default, Serialize, Deserialize)]
116pub struct CompositeCursor {
117    /// Per-shard positions (shard_id -> stream_id).
118    ///
119    /// Stored as `Arc<str>` so internal copies (e.g. `cursor.clone()`
120    /// inside the poll merger) bump a refcount instead of duplicating
121    /// each id's bytes.
122    #[serde(flatten)]
123    pub positions: HashMap<u16, CursorPos>,
124}
125
126impl CompositeCursor {
127    /// Create an empty cursor.
128    pub fn new() -> Self {
129        Self::default()
130    }
131
132    /// Encode the cursor as a base64 string.
133    ///
134    /// Pre-fix used `unwrap_or_default()`, which silently
135    /// produced an empty string on serialization failure. The
136    /// empty cursor then base64-encoded to an empty string and
137    /// the consumer's next poll restarted from the beginning of
138    /// the stream — silent rewind. For the current `positions`
139    /// schema (`HashMap<u16, Arc<str>>`), serialization is
140    /// infallible, so the failure path is unreachable.
141    ///
142    /// We surface that as a `ConsumerError::InvalidCursor` rather
143    /// than a panic. `poll()` is an `async fn`, and a panic that
144    /// propagates from there can abort the surrounding tokio
145    /// runtime worker. Returning `Err` lets `poll()` stay
146    /// non-panicking even if a future schema change breaks the
147    /// invariant; the caller will see a structured error and can
148    /// retry / log instead of taking down a worker.
149    pub fn encode(&self) -> Result<String, ConsumerError> {
150        let json = serde_json::to_string(&self.positions).map_err(|e| {
151            ConsumerError::InvalidCursor(format!(
152                "CompositeCursor::encode failed to serialize positions \
153                 (HashMap<u16, Arc<str>> should be infallible): {e}"
154            ))
155        })?;
156        Ok(BASE64.encode(json.as_bytes()))
157    }
158
159    /// Decode a cursor from a base64 string.
160    pub fn decode(s: &str) -> Result<Self, ConsumerError> {
161        let bytes = BASE64
162            .decode(s)
163            .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
164
165        // Two-pass parse so non-canonical shard-id keys (e.g.
166        // `"00"` aliasing `"0"`) are rejected explicitly. Pre-fix
167        // we deserialized straight into `HashMap<u16, _>`; serde
168        // parses each string key as u16, so `"0"` and `"00"`
169        // both produce key 0 and the second insert silently
170        // overwrites the first. The collision is benign in
171        // production (no caller emits non-canonical keys) but
172        // a malicious or buggy producer could inject a hostile
173        // cursor that ambiguates which shard's position a
174        // consumer ended up with on round-trip.
175        let raw_positions: HashMap<String, CursorPos> = serde_json::from_slice(&bytes)
176            .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
177        let mut positions: HashMap<u16, CursorPos> = HashMap::with_capacity(raw_positions.len());
178        for (key, val) in raw_positions {
179            let id: u16 = key.parse().map_err(|_| {
180                ConsumerError::InvalidCursor(format!("shard key {key:?} is not a valid u16"))
181            })?;
182            // Reject non-canonical stringifications. The
183            // round-trip `u16 → String` is the canonical form;
184            // any other string that parses to the same u16 is
185            // a non-canonical alias.
186            if id.to_string() != key {
187                return Err(ConsumerError::InvalidCursor(format!(
188                    "non-canonical shard key {key:?} (parses to {id}, \
189                     canonical form is {id})"
190                )));
191            }
192            if positions.insert(id, val).is_some() {
193                // Defensive: if non-canonical detection above
194                // missed something (it shouldn't), a duplicate
195                // canonical key is also a structural error.
196                return Err(ConsumerError::InvalidCursor(format!(
197                    "duplicate shard key {id} after canonicalization"
198                )));
199            }
200        }
201
202        Ok(Self { positions })
203    }
204
205    /// Get the position for a specific shard.
206    pub fn get(&self, shard_id: u16) -> Option<&str> {
207        self.positions.get(&shard_id).map(|s| s.as_ref())
208    }
209
210    /// Set the position for a specific shard.
211    ///
212    /// Accepts anything that converts into an `Arc<str>` — notably
213    /// `String`, `&str`, and `Arc<str>` itself. This lets adapters
214    /// hand us a freshly-allocated `String` (becomes a single boxed
215    /// allocation) without forcing a second copy for the cursor.
216    ///
217    /// **Unconditional**: skips the backend-format guard and the
218    /// monotonicity guard. Use [`Self::set_checked`] from production
219    /// poll paths — the documented "refuse to advance across a
220    /// JetStream → Redis cursor format change" protection lives in
221    /// the checked variant, not here. The unchecked variant is kept
222    /// public for tests that need to seed a cursor directly.
223    pub fn set(&mut self, shard_id: u16, position: impl Into<CursorPos>) {
224        self.positions.insert(shard_id, position.into());
225    }
226
227    /// Set the position for a specific shard, applying the same
228    /// backend-format mismatch guard as [`Self::update_from_events`].
229    /// Returns `true` if the write happened, `false` if it was
230    /// refused (format mismatch — error-logged so operators see the
231    /// migration). No-existing-position is treated as accept (first
232    /// write).
233    ///
234    /// Production poll paths must use this instead of [`Self::set`]
235    /// so a mid-stream backend migration (JetStream → Redis or vice
236    /// versa) is surfaced and refused at the cursor write site
237    /// rather than silently overwriting the format and stalling the
238    /// caller.
239    pub fn set_checked(&mut self, shard_id: u16, new_id: &str) -> bool {
240        match self.positions.get(&shard_id) {
241            Some(existing) => {
242                let existing_fmt = id_format(existing.as_ref());
243                let new_fmt = id_format(new_id);
244                if existing_fmt != new_fmt {
245                    tracing::error!(
246                        shard_id,
247                        existing = %existing,
248                        new = %new_id,
249                        existing_format = ?existing_fmt,
250                        new_format = ?new_fmt,
251                        "stream id format change detected — likely a \
252                         backend migration (e.g. JetStream → Redis). \
253                         Refusing to advance the cursor; operator must \
254                         explicitly reset to consume from the new \
255                         backend.",
256                    );
257                    return false;
258                }
259                self.positions.insert(shard_id, Arc::from(new_id));
260                true
261            }
262            None => {
263                self.positions.insert(shard_id, Arc::from(new_id));
264                true
265            }
266        }
267    }
268
269    /// Update positions from consumed events.
270    ///
271    /// Per-shard CAS routed through `compare_stream_ids`, which
272    /// understands the Redis (`<ms>-<seq>`) and JetStream (`<u64>`)
273    /// formats numerically and falls back to lex for opaque ids
274    /// (ULID, UUIDv7, hex digests). The cursor cannot regress and
275    /// decade-rollovers cannot freeze it. Unconditional inserts
276    /// would let whichever event for a given `shard_id` appeared
277    /// *last* in the slice win regardless of stream order; a plain
278    /// `str::cmp` CAS would wedge on the unpadded numeric ids both
279    /// built-in adapters emit (`"9" > "10"` lexicographically).
280    pub fn update_from_events(&mut self, events: &[StoredEvent]) {
281        for event in events {
282            let new_id = event.id.as_str();
283            match self.positions.get(&event.shard_id) {
284                Some(existing) => {
285                    // Detect a backend-format change before
286                    // calling the comparator. Pre-fix a cursor
287                    // at `"42"` (JetStream numeric) confronted
288                    // with a new `"1700-0"` (Redis) fell through
289                    // both structured branches of
290                    // `compare_stream_ids`, hit the lex fallback
291                    // (`'4' > '1'`), and the CAS guard refused
292                    // to update — silent stall requiring manual
293                    // cursor reset. Detect the mismatch
294                    // explicitly: surface a loud error and
295                    // refuse the update so operators see the
296                    // backend migration in logs and reset the
297                    // cursor deliberately. Keeping the existing
298                    // value (rather than blindly accepting the
299                    // new one) avoids a potential regression in
300                    // the other direction.
301                    let existing_fmt = id_format(existing.as_ref());
302                    let new_fmt = id_format(new_id);
303                    if existing_fmt != new_fmt {
304                        tracing::error!(
305                            shard_id = event.shard_id,
306                            existing = %existing,
307                            new = %new_id,
308                            existing_format = ?existing_fmt,
309                            new_format = ?new_fmt,
310                            "stream id format change detected — likely a \
311                             backend migration (e.g. JetStream → Redis). \
312                             Refusing to advance the cursor; operator must \
313                             explicitly reset to consume from the new \
314                             backend.",
315                        );
316                        continue;
317                    }
318                    if compare_stream_ids(existing.as_ref(), new_id) == CmpOrdering::Less {
319                        self.positions.insert(event.shard_id, Arc::from(new_id));
320                    }
321                    // Existing is >= new_id under the structured
322                    // comparator — don't regress.
323                }
324                None => {
325                    self.positions.insert(event.shard_id, Arc::from(new_id));
326                }
327            }
328        }
329    }
330}
331
332/// Request for consuming events.
333#[derive(Debug, Clone, Default)]
334pub struct ConsumeRequest {
335    /// Start cursor (opaque to caller). None means from the beginning.
336    pub from_id: Option<String>,
337    /// Maximum number of events to return.
338    pub limit: usize,
339    /// Optional filter to apply.
340    pub filter: Option<Filter>,
341    /// Ordering mode.
342    pub ordering: Ordering,
343    /// Specific shards to poll. None means all shards.
344    pub shards: Option<Vec<u16>>,
345}
346
347impl ConsumeRequest {
348    /// Create a new consume request.
349    pub fn new(limit: usize) -> Self {
350        Self {
351            limit,
352            ..Default::default()
353        }
354    }
355
356    /// Set the starting cursor.
357    pub fn from(mut self, cursor: impl Into<String>) -> Self {
358        self.from_id = Some(cursor.into());
359        self
360    }
361
362    /// Set the filter.
363    pub fn filter(mut self, filter: Filter) -> Self {
364        self.filter = Some(filter);
365        self
366    }
367
368    /// Set the ordering mode.
369    pub fn ordering(mut self, ordering: Ordering) -> Self {
370        self.ordering = ordering;
371        self
372    }
373
374    /// Set specific shards to poll.
375    pub fn shards(mut self, shards: Vec<u16>) -> Self {
376        self.shards = Some(shards);
377        self
378    }
379}
380
381/// Response from consuming events.
382#[derive(Debug, Clone)]
383pub struct ConsumeResponse {
384    /// Events matching the request.
385    pub events: Vec<StoredEvent>,
386    /// Cursor for the next poll. None if no events returned.
387    pub next_id: Option<String>,
388    /// True if there are more events available.
389    pub has_more: bool,
390    /// `true` if the per-shard fetch was clamped by the internal
391    /// `PER_SHARD_FETCH_CAP` (10 000). Callers requesting very
392    /// large `limit` values across few shards may receive fewer
393    /// events than `limit` per `poll()` even when the underlying
394    /// streams have more — pagination via `next_id` still works.
395    ///
396    /// Pre-fix this clamp was silent. The default is
397    /// `false`; tools building observability around large polls
398    /// can detect under-delivery via this flag.
399    pub truncated_at_per_shard_cap: bool,
400    /// Shards that reported `has_more=true` but contributed no
401    /// events and no cursor advance to this poll. The merger
402    /// suppresses the aggregate `has_more` flag for caller-
403    /// protection (preventing infinite loops), but operators
404    /// monitoring adapter health should know which shards are
405    /// stuck.
406    ///
407    /// Pre-fix the suppression was logged at warn but
408    /// invisible to callers. Empty on the happy path; populated
409    /// only when a stall was detected and suppressed.
410    pub stalled_shards: Vec<u16>,
411    /// Shards whose adapter call returned an error during this
412    /// poll. The merger logs each error at WARN and continues
413    /// with the surviving shards, so the response's `events`
414    /// can come from a strict subset of the configured shards
415    /// and silently miss data the operator expected to see.
416    /// Operators monitoring adapter health need to know WHICH
417    /// shards failed (not just that something logged a warn) so
418    /// they can correlate alerts with specific Redis / JetStream
419    /// nodes.
420    ///
421    /// Pre-fix this signal lived only in the warn log; an
422    /// observer parsing `ConsumeResponse` saw a clean partial-
423    /// shards response with no field indicating *which* shards
424    /// were missing, in contrast to `stalled_shards` which IS
425    /// surfaced. Empty on the happy path; populated only when at
426    /// least one shard's poll errored.
427    ///
428    /// **Recovery-latency note:** when a previously-failed shard
429    /// returns to health, its backlog drains at the per-shard limit
430    /// of the next poll (`limit / shards.len() * over_fetch_factor`).
431    /// A 4-shard poll where one shard was down for 60 s and the
432    /// others kept ingesting can take several normal-cadence polls
433    /// to fully drain the recovered shard. No backlog-size hint is
434    /// surfaced on the response; callers that need to detect a
435    /// stuck-recovery condition should monitor cursor advance per
436    /// shard across consecutive polls.
437    pub failed_shards: Vec<u16>,
438}
439
440impl ConsumeResponse {
441    /// Create an empty response.
442    pub fn empty() -> Self {
443        Self {
444            events: Vec::new(),
445            next_id: None,
446            has_more: false,
447            truncated_at_per_shard_cap: false,
448            stalled_shards: Vec::new(),
449            failed_shards: Vec::new(),
450        }
451    }
452}
453
454/// Internal cap on per-shard `direct_get` / `XRANGE` fetch sizes,
455/// applied in `PollMerger::poll`. Bounds the adapter's per-call
456/// memory pressure for a single poll. Callers needing larger
457/// effective limits should paginate.
458///
459/// Marked `#[doc(hidden)]` because the value is an internal
460/// tuning knob, not part of the consumer's public API. Surfacing
461/// it on the docs would invite downstreams to match against it
462/// and turn a silent tuning change into a breaking-change
463/// negotiation. Callers that need to know whether a poll was
464/// truncated should read `ConsumeResponse::truncated_at_per_shard_cap`
465/// rather than comparing against this constant directly.
466#[doc(hidden)]
467pub const PER_SHARD_FETCH_CAP: usize = 10_000;
468
469/// Match a `StoredEvent` against a filter, surfacing parse failures.
470///
471/// Returns `true` iff the event parses as JSON AND the filter matches
472/// the parsed value. A parse failure is logged at WARN with the event's
473/// id and shard so on-disk corruption or framing bugs in upstream
474/// adapters are observable from the filtered-poll path; without this,
475/// corrupt events were silently dropped from filtered results while the
476/// unfiltered path still returned them — a confusing inconsistency.
477///
478/// Takes a [`CompiledFilter`] rather than a [`Filter`] so the dot-path
479/// split + per-segment integer parse happens once per poll, not once
480/// per event — see perf #15 / #16.
481fn event_matches_filter(event: &StoredEvent, filter: &CompiledFilter) -> bool {
482    match event.parse() {
483        Ok(value) => filter.matches(&value),
484        Err(e) => {
485            tracing::warn!(
486                event_id = %event.id,
487                shard_id = event.shard_id,
488                error = %e,
489                "dropping unparseable event from filtered poll result"
490            );
491            false
492        }
493    }
494}
495
496/// Poll merger for cross-shard aggregation.
497pub struct PollMerger {
498    /// Adapter for polling shards.
499    adapter: Arc<dyn Adapter>,
500    /// Active shard IDs to poll when the request omits an explicit
501    /// `shards` list.
502    ///
503    /// Previously stored only `num_shards: u16` and generated
504    /// `(0..num_shards)` on every default-shards poll. After a dynamic
505    /// scale-down (`ShardMapper::scale_down` evicts the lowest-weight
506    /// shard, not necessarily the highest id), the active id set can
507    /// become sparse — e.g. `{1, 2}` after id 0 was drained — but
508    /// `num_shards == 2` still produces `[0, 1]`, polling a stale or
509    /// nonexistent shard 0 and skipping the live shard 2 entirely.
510    /// Captured at construction; the bus replaces the merger via
511    /// `ArcSwap` whenever topology changes (`add_shard`,
512    /// `remove_shard_internal`).
513    shard_ids: Vec<u16>,
514}
515
516impl PollMerger {
517    /// Create a new poll merger.
518    ///
519    /// `shard_ids` should be the snapshot of currently-active shard IDs
520    /// (e.g. `ShardManager::shard_ids()`). Passing `0..num_shards` is
521    /// only correct when ids are guaranteed dense from 0 — i.e. the
522    /// static-shards path with no scaling.
523    pub fn new(adapter: Arc<dyn Adapter>, shard_ids: Vec<u16>) -> Self {
524        Self { adapter, shard_ids }
525    }
526
527    /// Poll events according to the request.
528    pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
529        if request.limit == 0 {
530            return Ok(ConsumeResponse::empty());
531        }
532
533        // Decode cursor
534        let cursor = match &request.from_id {
535            Some(s) => CompositeCursor::decode(s)?,
536            None => CompositeCursor::new(),
537        };
538
539        // Determine which shards to poll. Dedup the list so a caller
540        // that supplies a non-deduped slice (e.g. forwarded from a
541        // fan-out source) doesn't double-fetch a shard and return
542        // duplicate events in the response payload. The cursor would
543        // stay correct either way, but the duplicate events would
544        // survive filtering + ordering + the per-poll limit and reach
545        // the caller.
546        let mut shards: Vec<u16> = request
547            .shards
548            .clone()
549            .unwrap_or_else(|| self.shard_ids.clone());
550        shards.sort_unstable();
551        shards.dedup();
552
553        if shards.is_empty() {
554            return Ok(ConsumeResponse::empty());
555        }
556
557        // Calculate per-shard limit (over-fetch to account for filtering)
558        // Use ceiling division to avoid truncating to 0 when limit < shard count.
559        //
560        // Pre-fix this `min(10_000)` clamp was silent —
561        // a caller with `limit=200_000` over 10 shards expected
562        // 20 000/shard plus over-fetch but got 10 000/shard with
563        // no diagnostic. Track whether the clamp triggered and
564        // surface it on the response so callers building
565        // observability around large polls can detect under-
566        // delivery.
567        let over_fetch_factor = if request.filter.is_some() { 3 } else { 2 };
568        let unclamped_per_shard = request
569            .limit
570            .div_ceil(shards.len())
571            .max(1)
572            .saturating_mul(over_fetch_factor);
573        let per_shard_limit = unclamped_per_shard.min(PER_SHARD_FETCH_CAP);
574        let truncated_at_per_shard_cap = unclamped_per_shard > PER_SHARD_FETCH_CAP;
575
576        // Poll all shards in parallel. Each future borrows its start
577        // position directly from `cursor` (which outlives `join_all` below),
578        // avoiding a per-shard `String` allocation on every poll.
579        let poll_futures: Vec<_> = shards
580            .iter()
581            .map(|&shard_id| {
582                let adapter = self.adapter.clone();
583                let from: Option<&str> = cursor.get(shard_id);
584                async move {
585                    let result = adapter.poll_shard(shard_id, from, per_shard_limit).await;
586                    (shard_id, result)
587                }
588            })
589            .collect();
590
591        let shard_results: Vec<(u16, Result<ShardPollResult, AdapterError>)> =
592            futures::future::join_all(poll_futures).await;
593
594        // Collect results, tracking errors. Pre-allocate to the exact total
595        // event count so extend() below never reallocates.
596        let total_events: usize = shard_results
597            .iter()
598            .filter_map(|(_, r)| r.as_ref().ok().map(|sr| sr.events.len()))
599            .sum();
600        let mut all_events = Vec::with_capacity(total_events);
601        let mut any_has_more = false;
602        // Track which shards reported `has_more=true` so
603        // we can surface them on the response when the merger
604        // suppresses has_more for caller-protection. Pre-fix, an
605        // adapter stuck reporting `has_more=true` with no events
606        // and no cursor advance was logged at warn but invisible
607        // to callers — they saw a clean "no more events" and
608        // exited.
609        let mut shards_reporting_has_more: Vec<u16> = Vec::new();
610        // Per-shard adapter errors. Pre-fix these were logged at
611        // warn and then dropped on the floor; the response's
612        // `events` was a strict subset of the configured shards
613        // with no field indicating WHICH shards were missing.
614        // Surface the failed shard ids so observers can correlate
615        // alerts with specific Redis / JetStream nodes (parallel
616        // to the existing `stalled_shards` field).
617        let mut failed_shards: Vec<u16> = Vec::new();
618        // Shards whose `set_checked` cursor write was refused due to
619        // a backend-format change mid-stream. Their fetched events
620        // MUST be dropped from the response and excluded from any
621        // subsequent cursor override; otherwise the caller would
622        // receive events whose cursor never advances, re-fetch the
623        // same events on the next poll, and burn into an infinite
624        // duplicate-delivery loop. Surface them through the same
625        // `failed_shards` channel so operators see the migration.
626        let mut format_refused_shards: std::collections::HashSet<u16> =
627            std::collections::HashSet::new();
628        // `new_cursor` (fetched-position tracking) is only consulted on the
629        // filter path — building it for unfiltered polls wastes a full
630        // HashMap clone plus a `set()` per shard every poll.
631        let mut new_cursor = if request.filter.is_some() {
632            Some(cursor.clone())
633        } else {
634            None
635        };
636
637        for (shard_id, result) in shard_results {
638            match result {
639                Ok(shard_result) => {
640                    // Destructure to move `next_id` out without cloning the
641                    // String that the adapter already allocated for us.
642                    let ShardPollResult {
643                        events,
644                        next_id,
645                        has_more,
646                    } = shard_result;
647                    // Route through set_checked so a mid-stream
648                    // backend migration (JetStream → Redis cursor
649                    // format change) is refused with a loud error
650                    // log rather than silently overwriting the
651                    // format on the cursor. On refusal, the shard's
652                    // events must not enter the response — otherwise
653                    // the caller would receive events without a
654                    // matching cursor advance and re-fetch them
655                    // forever. Track the refusal and skip merging.
656                    let refused = if let (Some(nc), Some(next_id)) =
657                        (new_cursor.as_mut(), next_id.as_ref())
658                    {
659                        !nc.set_checked(shard_id, next_id)
660                    } else {
661                        false
662                    };
663                    if refused {
664                        format_refused_shards.insert(shard_id);
665                        failed_shards.push(shard_id);
666                        // Drop the shard's events for this poll.
667                        // The error log was emitted inside
668                        // `set_checked`; the operator-facing
669                        // failed_shards entry above is the
670                        // structured signal.
671                        continue;
672                    }
673                    if has_more {
674                        any_has_more = true;
675                        shards_reporting_has_more.push(shard_id);
676                    }
677                    all_events.extend(events);
678                }
679                Err(e) => {
680                    tracing::warn!(
681                        shard_id = shard_id,
682                        error = %e,
683                        "Failed to poll shard, skipping"
684                    );
685                    failed_shards.push(shard_id);
686                    // Continue with other shards
687                }
688            }
689        }
690
691        // Apply filter.
692        //
693        // IMPORTANT: Use `new_cursor` (which tracks fetched positions) as
694        // the base cursor so that shards whose events are entirely filtered
695        // out still advance past those events. Without this, filtered-out
696        // events would be re-fetched on every subsequent poll, causing an
697        // infinite loop.
698        //
699        // Parse failures: a `StoredEvent` whose `raw` bytes don't
700        // deserialize as JSON cannot match a filter, so it is dropped
701        // from the filtered result. Previously this drop was silent
702        // (`unwrap_or(false)`), making on-disk corruption or
703        // adapter-side framing bugs invisible to operators — only
704        // *unfiltered* polls would surface the bad event.
705        //
706        // The previous `Ordering::None` path had a `break` once
707        // `kept.len() >= limit + 1`, which discarded events from later
708        // shards without ever filtering them. Combined with the cursor
709        // advancing past every fetched event, that meant matching
710        // events on un-inspected shards were silently lost. The fix
711        // uses a single full `retain` pass for both ordering modes;
712        // the `lazy parse` micro-optimization is gone, but per-event
713        // filter-matching is cheap and consistent semantics with the
714        // sort path is worth more than parse-skip on over-fetches.
715        if let Some(filter) = &request.filter {
716            // Compile once per poll — pre-splits every dot-path and
717            // pre-parses each segment's integer form. Pre-fix this
718            // work happened on every event in the retain loop
719            // (perf #15 / #16).
720            let compiled = filter.compile();
721            all_events.retain(|e| event_matches_filter(e, &compiled));
722        }
723
724        // Apply ordering
725        match request.ordering {
726            Ordering::None => {
727                // Keep arbitrary order
728            }
729            Ordering::InsertionTs => {
730                // `insertion_ts` is monotonic *per shard*, not
731                // globally (see `event.rs:233`), so two events from
732                // different shards can carry the same timestamp. With
733                // a stable sort on `insertion_ts` alone, ties were
734                // broken by the input order — which depends on
735                // `futures::future::join_all`'s completion ordering
736                // and is non-deterministic across polls. Combined
737                // with `truncate(limit)` and the cursor-rollback step,
738                // the same logical event could be returned twice or
739                // skipped at the limit boundary across consecutive
740                // polls.
741                //
742                // Add `(shard_id, id)` as deterministic
743                // tiebreakers. `id` is the storage backend's
744                // identifier and is unique within a shard, so the
745                // composite is a strict total order.
746                //
747                // The id tiebreak routes through
748                // `compare_stream_ids`, which understands the
749                // unpadded numeric formats both built-in adapters
750                // emit (Redis `<ms>-<seq>`, JetStream `<u64>`) and
751                // falls back to lex for opaque ids (ULID, UUIDv7,
752                // hex digests). The `(insertion_ts, shard_id)`
753                // chain resolves the common cases first; when two
754                // events from the same shard land at the same
755                // `insertion_ts` (rare but legal at millisecond
756                // granularity), the structured id compare is
757                // correct on every adapter format.
758                all_events.sort_by(|a, b| {
759                    a.insertion_ts
760                        .cmp(&b.insertion_ts)
761                        .then(a.shard_id.cmp(&b.shard_id))
762                        .then(compare_stream_ids(&a.id, &b.id))
763                });
764            }
765        }
766
767        // Track per-shard match counts *before* truncate. After
768        // truncation, any shard whose match count shrank means matches
769        // were dropped — and those matches must be re-fetched on the
770        // next poll, otherwise they are silently lost (the cursor
771        // would otherwise advance past them via `new_cursor`).
772        let mut matched_per_shard: std::collections::HashMap<u16, usize> =
773            std::collections::HashMap::new();
774        if request.filter.is_some() {
775            for e in &all_events {
776                *matched_per_shard.entry(e.shard_id).or_insert(0) += 1;
777            }
778        }
779
780        // Truncate to requested limit
781        let had_extra = all_events.len() > request.limit;
782        all_events.truncate(request.limit);
783
784        // Build the final cursor.
785        //
786        // With filtering: start from `new_cursor` (fetched positions) so
787        // shards whose events were entirely filtered out advance past
788        // them. Then:
789        //   1. For shards that had matches truncated (returned <
790        //      total_matched), roll the cursor *back* to the original
791        //      pre-poll position. The override step then bumps it
792        //      forward to the last returned match for that shard, so
793        //      the unreturned matches re-appear on the next poll.
794        //   2. Override with the last *returned* event id per shard —
795        //      this also prevents skipping matching events that were
796        //      fetched but truncated by the limit on shards that did
797        //      land in the returned set.
798        //
799        // Without filtering: start from the original `cursor` so shards
800        // with no returned events (due to limit truncation) don't skip
801        // ahead.
802        let mut final_cursor = match new_cursor {
803            Some(nc) => nc,
804            None => cursor.clone(),
805        };
806
807        // Step 1: rollback for shards with truncated matches. We
808        // track which shards we rolled back here so Step 2 only
809        // overrides those shards (rather than every shard with a
810        // returned event, which would throw away the adapter's
811        // `next_id` advance for shards that returned all their
812        // matches).
813        let mut rolled_back: std::collections::HashSet<u16> = std::collections::HashSet::new();
814        if request.filter.is_some() && had_extra {
815            let mut returned_per_shard: std::collections::HashMap<u16, usize> =
816                std::collections::HashMap::new();
817            for e in &all_events {
818                *returned_per_shard.entry(e.shard_id).or_insert(0) += 1;
819            }
820            for (shard_id, &total_matched) in &matched_per_shard {
821                let returned = returned_per_shard.get(shard_id).copied().unwrap_or(0);
822                if returned < total_matched {
823                    // Some matches for this shard were truncated. Roll
824                    // back to the original cursor so they're re-fetched.
825                    // The override below will move us forward to the
826                    // last *returned* match (if any), so we still make
827                    // progress per poll.
828                    match cursor.positions.get(shard_id) {
829                        Some(orig) => final_cursor.set(*shard_id, orig.clone()),
830                        None => {
831                            final_cursor.positions.remove(shard_id);
832                        }
833                    }
834                    rolled_back.insert(*shard_id);
835                }
836            }
837        }
838
839        // Step 2: override to last returned event id per shard.
840        // Only the last returned event per shard matters for the
841        // cursor, so iterate in reverse and skip shards already seen.
842        // This reduces id clones from O(all_events.len()) to
843        // O(shards.len()).
844        //
845        // When the filter path is active, Step 1 has already
846        // populated `final_cursor` with the adapter's `next_id`
847        // (a position past the last *fetched* event for each
848        // shard). A blanket Step 2 override here would move the
849        // cursor back to the last *matched* event id, which is
850        // BEHIND the last fetched event for any shard with
851        // non-matched events — subsequent polls would re-fetch and
852        // re-filter those non-matches, wasting work proportional
853        // to `over_fetch_factor` on low-match-rate streams. So we
854        // only Step 2-override for shards that were actually
855        // rolled back in Step 1 (those need a forward push past
856        // the last *returned* match), OR when the filter path
857        // wasn't used at all (filter is None).
858        //
859        // For filter=None, the previous Step-2 behavior was correct
860        // — `final_cursor` started as `cursor.clone()` (no
861        // `new_cursor` advance) and the only progress signal is
862        // the last returned event id.
863        let mut seen_shards: std::collections::HashSet<u16> =
864            std::collections::HashSet::with_capacity(shards.len());
865        for event in all_events.iter().rev() {
866            if seen_shards.insert(event.shard_id) {
867                let should_override =
868                    request.filter.is_none() || rolled_back.contains(&event.shard_id);
869                if should_override {
870                    // Same backend-format guard as the Step-1
871                    // adapter-cursor write site above. Surfaces a
872                    // mid-stream backend migration deterministically
873                    // rather than overwriting silently. If the
874                    // write is refused, surface the shard in
875                    // failed_shards so the caller sees the
876                    // migration as a structured signal rather than
877                    // only as an event-mismatched cursor.
878                    if !final_cursor.set_checked(event.shard_id, &event.id)
879                        && !format_refused_shards.contains(&event.shard_id)
880                    {
881                        format_refused_shards.insert(event.shard_id);
882                        failed_shards.push(event.shard_id);
883                    }
884                }
885                // Else: the adapter's `next_id` (already in
886                // `final_cursor` via the `new_cursor` initial
887                // value) is more advanced than the last matched
888                // event — preserve it.
889            }
890        }
891
892        // Drop events for any shard whose Step-1 OR Step-2 cursor
893        // write was refused due to a backend-format mismatch.
894        // Returning these events without a matching cursor advance
895        // would burn the caller into an infinite duplicate-delivery
896        // loop on the next poll. `failed_shards` already carries the
897        // structured signal to the caller.
898        //
899        // No-op when `format_refused_shards` is empty (the common
900        // case), so the retain is a single bool check per event in
901        // the happy path.
902        if !format_refused_shards.is_empty() {
903            all_events.retain(|e| !format_refused_shards.contains(&e.shard_id));
904        }
905
906        let cursor_advanced = final_cursor.positions != cursor.positions;
907        // When filtering removed everything but we did advance past fetched
908        // events, signal has_more so the caller keeps polling forward.
909        let all_filtered = request.filter.is_some() && all_events.is_empty() && cursor_advanced;
910        // Previously `has_more = any_has_more || had_extra ||
911        // all_filtered`. If a single adapter returned
912        // `ShardPollResult { events: [], next_id: None,
913        // has_more: true }` (legal under the trait contract — nothing
914        // forbids it), then `any_has_more=true` propagated even
915        // though we made *no* progress. The caller observed
916        // `(has_more=true, next_id=None)` and re-polled from the
917        // same starting cursor indefinitely.
918        //
919        // Suppress `has_more` when the merger itself made no progress
920        // at all (no events returned AND the cursor didn't advance).
921        // The caller then sees a clean "nothing to do right now"
922        // response and must back off rather than spin.
923        let we_made_progress = !all_events.is_empty() || cursor_advanced;
924        let has_more = (any_has_more || had_extra || all_filtered) && we_made_progress;
925        // When we suppress has_more for caller-protection
926        // (an adapter stuck at has_more=true with no progress),
927        // surface the offending shard ids on the response so
928        // operators can alert. Pre-fix the suppression was warn-
929        // logged only.
930        let stalled_shards: Vec<u16> = if any_has_more && !we_made_progress {
931            tracing::warn!(
932                stalled_shards = ?shards_reporting_has_more,
933                "PollMerger: an adapter reported has_more=true with no events \
934                 and no cursor advance — suppressing to avoid caller infinite-loop"
935            );
936            shards_reporting_has_more
937        } else {
938            Vec::new()
939        };
940        // Return the cursor even when all events were filtered out, so the
941        // caller advances past the filtered region instead of re-fetching
942        // the same events forever. When the poll made no progress at
943        // all, echo back the caller's input cursor (if any) instead of
944        // returning `None` — pre-fix a stalled poll dropped the cursor,
945        // so a caller that interpreted `next_id == None` as "no events,
946        // restart from the beginning" silently regressed pagination
947        // across the stall. Echoing preserves the cursor across stalls.
948        let next_id = if we_made_progress {
949            Some(final_cursor.encode()?)
950        } else {
951            request.from_id.clone()
952        };
953
954        Ok(ConsumeResponse {
955            events: all_events,
956            next_id,
957            has_more,
958            truncated_at_per_shard_cap,
959            stalled_shards,
960            failed_shards,
961        })
962    }
963}
964
965#[cfg(test)]
966mod tests {
967    #![allow(
968        clippy::disallowed_methods,
969        reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
970    )]
971    use super::*;
972    use serde_json::json;
973
974    #[test]
975    fn test_cursor_encode_decode() {
976        let mut cursor = CompositeCursor::new();
977        cursor.set(0, "1702123456789-0".to_string());
978        cursor.set(1, "1702123456790-0".to_string());
979        cursor.set(5, "1702123456795-0".to_string());
980
981        let encoded = cursor.encode().unwrap();
982        let decoded = CompositeCursor::decode(&encoded).unwrap();
983
984        assert_eq!(decoded.get(0), Some("1702123456789-0"));
985        assert_eq!(decoded.get(1), Some("1702123456790-0"));
986        assert_eq!(decoded.get(5), Some("1702123456795-0"));
987        assert_eq!(decoded.get(2), None);
988    }
989
990    #[test]
991    fn test_cursor_update_from_events() {
992        let mut cursor = CompositeCursor::new();
993
994        let events = vec![
995            StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
996            StoredEvent::from_value("200-0".to_string(), json!({}), 200, 1),
997            // This used to be "later event in shard 0" by
998            // virtue of being LAST in the input slice, but its id
999            // (150-0) is stream-order BEFORE 200-0 — wait, this is
1000            // for shard 0 not shard 1. shard 0 only had 100-0
1001            // before this; 150-0 > 100-0, so it advances normally.
1002            StoredEvent::from_value("150-0".to_string(), json!({}), 150, 0),
1003        ];
1004
1005        cursor.update_from_events(&events);
1006
1007        // Should have the highest id seen for each shard.
1008        assert_eq!(cursor.get(0), Some("150-0"));
1009        assert_eq!(cursor.get(1), Some("200-0"));
1010    }
1011
1012    /// Cursor must NOT regress when events arrive in a
1013    /// non-ascending order for the same shard. Pre-fix the cursor
1014    /// for shard 0 would land on `100-0` (the last item in the
1015    /// slice), regressing past `200-0`.
1016    #[test]
1017    fn cursor_does_not_regress_on_unsorted_per_shard_events() {
1018        let mut cursor = CompositeCursor::new();
1019        // For shard 0: stream-order 100-0 → 200-0, but the slice
1020        // has them reversed (consumer received 200-0 first,
1021        // then 100-0 because of merge ordering).
1022        let events = vec![
1023            StoredEvent::from_value("200-0".to_string(), json!({}), 200, 0),
1024            StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
1025        ];
1026        cursor.update_from_events(&events);
1027        assert_eq!(
1028            cursor.get(0),
1029            Some("200-0"),
1030            "cursor must hold the highest id, not the last-in-slice id",
1031        );
1032    }
1033
1034    /// A partial overlap (advance for one shard, regression
1035    /// attempt for another shard) must keep both cursors at their
1036    /// respective max.
1037    #[test]
1038    fn cursor_compare_and_set_is_per_shard() {
1039        let mut cursor = CompositeCursor::new();
1040        cursor.update_from_events(&[
1041            StoredEvent::from_value("500-0".to_string(), json!({}), 500, 0),
1042            StoredEvent::from_value("500-0".to_string(), json!({}), 500, 1),
1043        ]);
1044        // Now "advance" with one regress attempt for shard 0 + a
1045        // legitimate advance for shard 1.
1046        cursor.update_from_events(&[
1047            StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0), // regress
1048            StoredEvent::from_value("700-0".to_string(), json!({}), 700, 1), // advance
1049        ]);
1050        assert_eq!(cursor.get(0), Some("500-0"), "shard 0 must not regress");
1051        assert_eq!(cursor.get(1), Some("700-0"), "shard 1 must advance");
1052    }
1053
1054    /// CR-1: pre-fix the cursor used `str::cmp` which inverts at
1055    /// every decade rollover for unpadded numeric ids. JetStream
1056    /// emits `seq.to_string()` (unpadded) — once seq=10 lands, lex
1057    /// compare says `"10" < "9"` and the cursor freezes at "9".
1058    #[test]
1059    fn cursor_does_not_wedge_on_jetstream_decade_rollover() {
1060        let mut cursor = CompositeCursor::new();
1061        for seq in 1u64..=20 {
1062            let ev = StoredEvent::from_value(seq.to_string(), json!({}), seq, 0);
1063            cursor.update_from_events(&[ev]);
1064        }
1065        assert_eq!(
1066            cursor.get(0),
1067            Some("20"),
1068            "cursor must reach 20; lex compare would wedge at \"9\""
1069        );
1070    }
1071
1072    /// CR-1: same hazard for Redis's `<ms>-<seq>` format when seq
1073    /// rolls past a decade within a single ms.
1074    #[test]
1075    fn cursor_does_not_wedge_on_redis_seq_decade_rollover() {
1076        let mut cursor = CompositeCursor::new();
1077        // All within one ms — Redis collides on seq when many events
1078        // hit in the same millisecond.
1079        for seq in 1u64..=20 {
1080            let id = format!("1700000000000-{}", seq);
1081            let ev = StoredEvent::from_value(id, json!({}), 1700000000000, 0);
1082            cursor.update_from_events(&[ev]);
1083        }
1084        assert_eq!(
1085            cursor.get(0),
1086            Some("1700000000000-20"),
1087            "cursor must reach -20; lex compare would wedge at -9"
1088        );
1089    }
1090
1091    /// Regression: a backend migration (e.g. JetStream → Redis)
1092    /// that lands an id of a different format than the existing
1093    /// cursor must NOT silently advance OR silently stall. Pre-
1094    /// fix `compare_stream_ids` fell through both structured
1095    /// branches and hit the lex fallback: `"42" > "1700-0"` (because
1096    /// `'4' > '1'`), so the CAS guard refused to update the
1097    /// cursor. Result: the consumer kept seeing `"42"` forever
1098    /// while the new Redis backend kept emitting Redis-formatted
1099    /// ids, with no surfaced error.
1100    ///
1101    /// Post-fix: format-mismatch is detected explicitly and
1102    /// surfaced via `tracing::error!`. The cursor stays at its
1103    /// current value (so we don't regress), and an operator must
1104    /// reset the cursor deliberately to consume from the new
1105    /// backend. This test pins the "stays at existing value"
1106    /// half of the contract; the loud error is observability,
1107    /// not behavior, so it isn't asserted here.
1108    #[test]
1109    fn cursor_refuses_to_advance_across_backend_format_change() {
1110        let mut cursor = CompositeCursor::new();
1111        // Cursor starts at JetStream-style numeric "42".
1112        cursor.update_from_events(&[StoredEvent::from_value("42".to_string(), json!({}), 42, 0)]);
1113        assert_eq!(cursor.get(0), Some("42"));
1114
1115        // A new event arrives in Redis format. Pre-fix this would
1116        // hit the lex fallback and the cursor would refuse to
1117        // advance silently; post-fix the format mismatch is
1118        // detected and the cursor STILL refuses to advance, but
1119        // a `tracing::error!` is emitted so operators see the
1120        // backend migration.
1121        cursor.update_from_events(&[StoredEvent::from_value(
1122            "1700000000000-0".to_string(),
1123            json!({}),
1124            1700000000000,
1125            0,
1126        )]);
1127
1128        // Cursor must still be at the original numeric id (not
1129        // the new Redis id, which would be a regression-like
1130        // jump back in time, AND not unset, which would lose
1131        // progress).
1132        assert_eq!(
1133            cursor.get(0),
1134            Some("42"),
1135            "regression: cursor must not silently advance through a \
1136             backend-format change. The pre-fix lex fallback also \
1137             happened to keep the existing value (by `'4' > '1'`), \
1138             but only by accident; this test pins the explicit \
1139             format-mismatch refusal."
1140        );
1141
1142        // And the reverse direction: a Redis cursor confronted
1143        // with a new numeric id must also stay put.
1144        let mut cursor = CompositeCursor::new();
1145        cursor.update_from_events(&[StoredEvent::from_value(
1146            "1700000000000-0".to_string(),
1147            json!({}),
1148            1700000000000,
1149            0,
1150        )]);
1151        cursor.update_from_events(&[StoredEvent::from_value(
1152            "9000".to_string(),
1153            json!({}),
1154            9000,
1155            0,
1156        )]);
1157        assert_eq!(
1158            cursor.get(0),
1159            Some("1700000000000-0"),
1160            "regression (reverse direction): Redis cursor must not be \
1161             silently overwritten by an incoming numeric id"
1162        );
1163    }
1164
1165    /// CR-1: cross-decade compare on JetStream-style ids.
1166    #[test]
1167    fn cursor_advances_from_unpadded_9_to_unpadded_10() {
1168        let mut cursor = CompositeCursor::new();
1169        cursor.update_from_events(&[StoredEvent::from_value("9".to_string(), json!({}), 9, 0)]);
1170        cursor.update_from_events(&[StoredEvent::from_value("10".to_string(), json!({}), 10, 0)]);
1171        assert_eq!(cursor.get(0), Some("10"));
1172    }
1173
1174    /// CR-1: ULID / opaque ids must still work via lex fallback.
1175    /// ULIDs are designed to be lex-sortable and we should NOT route
1176    /// them through the numeric parsers.
1177    #[test]
1178    fn cursor_advances_on_ulid_ids_via_lex_fallback() {
1179        let mut cursor = CompositeCursor::new();
1180        // Two real ULIDs in ascending stream order. Different
1181        // timestamp prefixes ensure lex order matches stream order.
1182        let earlier = "01HZ0000000000000000000000";
1183        let later = "01HZ0000010000000000000000";
1184        cursor.update_from_events(&[StoredEvent::from_value(
1185            earlier.to_string(),
1186            json!({}),
1187            1,
1188            0,
1189        )]);
1190        cursor.update_from_events(&[StoredEvent::from_value(later.to_string(), json!({}), 2, 0)]);
1191        assert_eq!(cursor.get(0), Some(later));
1192        // Now feed the earlier id again — must NOT regress.
1193        cursor.update_from_events(&[StoredEvent::from_value(
1194            earlier.to_string(),
1195            json!({}),
1196            1,
1197            0,
1198        )]);
1199        assert_eq!(cursor.get(0), Some(later));
1200    }
1201
1202    /// CR-1: direct `compare_stream_ids` unit coverage.
1203    #[test]
1204    fn compare_stream_ids_handles_known_formats() {
1205        // JetStream-style: numeric compare, padded-or-not.
1206        assert_eq!(compare_stream_ids("9", "10"), CmpOrdering::Less);
1207        assert_eq!(compare_stream_ids("10", "9"), CmpOrdering::Greater);
1208        assert_eq!(compare_stream_ids("100", "100"), CmpOrdering::Equal);
1209        // Padding-insensitive: leading zeros do not change numeric value.
1210        assert_eq!(compare_stream_ids("00000010", "9"), CmpOrdering::Greater);
1211
1212        // Redis-style: tuple compare on (ms, seq).
1213        assert_eq!(
1214            compare_stream_ids("1700-9", "1700-10"),
1215            CmpOrdering::Less,
1216            "seq must compare numerically, not lex"
1217        );
1218        assert_eq!(
1219            compare_stream_ids("1700-9", "1701-0"),
1220            CmpOrdering::Less,
1221            "ms wins over seq"
1222        );
1223        assert_eq!(
1224            compare_stream_ids("1700-100", "1700-9"),
1225            CmpOrdering::Greater
1226        );
1227
1228        // ULID-shaped ids fall through to lex compare.
1229        let ulid_a = "01HZ0000000000000000000000";
1230        let ulid_b = "01HZ0000010000000000000000";
1231        assert_eq!(compare_stream_ids(ulid_a, ulid_b), CmpOrdering::Less);
1232
1233        // Mixed format (one Redis, one numeric) — neither structured
1234        // path applies, falls through to lex. Documented limitation.
1235        // (Adapters emit a single format, so this is pathological.)
1236        let _ = compare_stream_ids("1700-0", "9999");
1237    }
1238
1239    /// CR-1: in `Ordering::InsertionTs` mode, two events from the
1240    /// same shard with the same `insertion_ts` and unpadded numeric
1241    /// ids must sort by numeric id, not lex id.
1242    #[test]
1243    fn insertion_ts_sort_breaks_tie_on_id_numerically() {
1244        // Same shard, same ts — only the id tiebreak fires.
1245        let mut events = [
1246            StoredEvent::from_value("10".to_string(), json!({}), 1000, 0),
1247            StoredEvent::from_value("9".to_string(), json!({}), 1000, 0),
1248            StoredEvent::from_value("11".to_string(), json!({}), 1000, 0),
1249        ];
1250        events.sort_by(|a, b| {
1251            a.insertion_ts
1252                .cmp(&b.insertion_ts)
1253                .then(a.shard_id.cmp(&b.shard_id))
1254                .then(compare_stream_ids(&a.id, &b.id))
1255        });
1256        let ordered: Vec<&str> = events.iter().map(|e| e.id.as_str()).collect();
1257        assert_eq!(
1258            ordered,
1259            vec!["9", "10", "11"],
1260            "id tiebreak must be numeric, not lex"
1261        );
1262    }
1263
1264    #[test]
1265    fn test_consume_request_builder() {
1266        let request = ConsumeRequest::new(100)
1267            .from("some_cursor")
1268            .ordering(Ordering::InsertionTs)
1269            .shards(vec![0, 1, 2])
1270            .filter(Filter::eq("type", json!("token")));
1271
1272        assert_eq!(request.limit, 100);
1273        assert_eq!(request.from_id, Some("some_cursor".to_string()));
1274        assert_eq!(request.ordering, Ordering::InsertionTs);
1275        assert_eq!(request.shards, Some(vec![0, 1, 2]));
1276        assert!(request.filter.is_some());
1277    }
1278
1279    #[test]
1280    fn test_invalid_cursor() {
1281        let result = CompositeCursor::decode("not_valid_base64!!!");
1282        assert!(result.is_err());
1283
1284        // Valid base64 but not valid JSON
1285        let result = CompositeCursor::decode(&BASE64.encode(b"not json"));
1286        assert!(result.is_err());
1287    }
1288
1289    /// Regression: non-canonical shard-id keys must be rejected
1290    /// at decode time. Pre-fix `serde_json::from_slice::<HashMap<u16,
1291    /// _>>` parsed `"00"` and `"0"` both as u16 0; the second
1292    /// insert silently overwrote the first, leaving the consumer
1293    /// with whichever entry happened to come later in the JSON.
1294    /// Two distinct stringifications collapsed to one shard
1295    /// position with no surfaced error.
1296    #[test]
1297    fn cursor_decode_rejects_non_canonical_shard_keys() {
1298        // Construct a JSON cursor with `"00"` aliasing `"0"`.
1299        // Both round-trip to u16 0 under standard parsing.
1300        let hostile = br#"{"00":"id_a","1":"id_b"}"#;
1301        let encoded = BASE64.encode(hostile);
1302        let result = CompositeCursor::decode(&encoded);
1303        assert!(
1304            result.is_err(),
1305            "non-canonical shard key `\"00\"` must reject; \
1306             pre-fix this silently parsed as shard 0"
1307        );
1308
1309        // Boundary: the canonical "0" (no leading zero) decodes
1310        // normally.
1311        let canonical = br#"{"0":"id_a","1":"id_b"}"#;
1312        let encoded_ok = BASE64.encode(canonical);
1313        let cursor =
1314            CompositeCursor::decode(&encoded_ok).expect("canonical shard keys must decode cleanly");
1315        assert_eq!(cursor.get(0), Some("id_a"));
1316        assert_eq!(cursor.get(1), Some("id_b"));
1317    }
1318
1319    #[test]
1320    fn test_composite_cursor_new() {
1321        let cursor = CompositeCursor::new();
1322        assert!(cursor.positions.is_empty());
1323    }
1324
1325    #[test]
1326    fn test_composite_cursor_default() {
1327        let cursor = CompositeCursor::default();
1328        assert!(cursor.positions.is_empty());
1329    }
1330
1331    #[test]
1332    fn test_composite_cursor_get_nonexistent() {
1333        let cursor = CompositeCursor::new();
1334        assert!(cursor.get(0).is_none());
1335        assert!(cursor.get(100).is_none());
1336    }
1337
1338    #[test]
1339    fn test_composite_cursor_set_overwrites() {
1340        let mut cursor = CompositeCursor::new();
1341        cursor.set(0, "first".to_string());
1342        assert_eq!(cursor.get(0), Some("first"));
1343
1344        cursor.set(0, "second".to_string());
1345        assert_eq!(cursor.get(0), Some("second"));
1346    }
1347
1348    #[test]
1349    fn test_composite_cursor_empty_encode() {
1350        let cursor = CompositeCursor::new();
1351        let encoded = cursor.encode().unwrap();
1352        let decoded = CompositeCursor::decode(&encoded).unwrap();
1353        assert!(decoded.positions.is_empty());
1354    }
1355
1356    #[test]
1357    fn test_composite_cursor_clone() {
1358        let mut cursor = CompositeCursor::new();
1359        cursor.set(0, "pos-0".to_string());
1360        cursor.set(1, "pos-1".to_string());
1361
1362        let cloned = cursor.clone();
1363        assert_eq!(cloned.get(0), Some("pos-0"));
1364        assert_eq!(cloned.get(1), Some("pos-1"));
1365    }
1366
1367    #[test]
1368    fn test_composite_cursor_debug() {
1369        let mut cursor = CompositeCursor::new();
1370        cursor.set(0, "test".to_string());
1371        let debug = format!("{:?}", cursor);
1372        assert!(debug.contains("CompositeCursor"));
1373        assert!(debug.contains("positions"));
1374    }
1375
1376    #[test]
1377    fn test_ordering_default() {
1378        let ordering = Ordering::default();
1379        assert_eq!(ordering, Ordering::None);
1380    }
1381
1382    #[test]
1383    fn test_ordering_clone_copy() {
1384        let ordering = Ordering::InsertionTs;
1385        let cloned = ordering;
1386        assert_eq!(cloned, Ordering::InsertionTs);
1387    }
1388
1389    #[test]
1390    fn test_ordering_debug() {
1391        assert!(format!("{:?}", Ordering::None).contains("None"));
1392        assert!(format!("{:?}", Ordering::InsertionTs).contains("InsertionTs"));
1393    }
1394
1395    #[test]
1396    fn test_consume_request_new() {
1397        let request = ConsumeRequest::new(50);
1398        assert_eq!(request.limit, 50);
1399        assert!(request.from_id.is_none());
1400        assert!(request.filter.is_none());
1401        assert_eq!(request.ordering, Ordering::None);
1402        assert!(request.shards.is_none());
1403    }
1404
1405    #[test]
1406    fn test_consume_request_default() {
1407        let request = ConsumeRequest::default();
1408        assert_eq!(request.limit, 0);
1409        assert!(request.from_id.is_none());
1410        assert!(request.filter.is_none());
1411        assert_eq!(request.ordering, Ordering::None);
1412        assert!(request.shards.is_none());
1413    }
1414
1415    #[test]
1416    fn test_consume_request_from_string() {
1417        let request = ConsumeRequest::new(10).from(String::from("cursor123"));
1418        assert_eq!(request.from_id, Some("cursor123".to_string()));
1419    }
1420
1421    #[test]
1422    fn test_consume_request_clone() {
1423        let request = ConsumeRequest::new(100)
1424            .from("cursor")
1425            .ordering(Ordering::InsertionTs)
1426            .shards(vec![0, 1]);
1427
1428        let cloned = request.clone();
1429        assert_eq!(cloned.limit, 100);
1430        assert_eq!(cloned.from_id, Some("cursor".to_string()));
1431        assert_eq!(cloned.ordering, Ordering::InsertionTs);
1432        assert_eq!(cloned.shards, Some(vec![0, 1]));
1433    }
1434
1435    #[test]
1436    fn test_consume_request_debug() {
1437        let request = ConsumeRequest::new(10);
1438        let debug = format!("{:?}", request);
1439        assert!(debug.contains("ConsumeRequest"));
1440        assert!(debug.contains("limit"));
1441    }
1442
1443    #[test]
1444    fn test_consume_response_empty() {
1445        let response = ConsumeResponse::empty();
1446        assert!(response.events.is_empty());
1447        assert!(response.next_id.is_none());
1448        assert!(!response.has_more);
1449    }
1450
1451    #[test]
1452    fn test_consume_response_clone() {
1453        let mut response = ConsumeResponse::empty();
1454        response.next_id = Some("cursor".to_string());
1455        response.has_more = true;
1456
1457        let cloned = response.clone();
1458        assert_eq!(cloned.next_id, Some("cursor".to_string()));
1459        assert!(cloned.has_more);
1460    }
1461
1462    #[test]
1463    fn test_consume_response_debug() {
1464        let response = ConsumeResponse::empty();
1465        let debug = format!("{:?}", response);
1466        assert!(debug.contains("ConsumeResponse"));
1467        assert!(debug.contains("events"));
1468    }
1469
1470    #[test]
1471    fn test_cursor_update_from_empty_events() {
1472        let mut cursor = CompositeCursor::new();
1473        cursor.set(0, "original".to_string());
1474
1475        let events: Vec<StoredEvent> = vec![];
1476        cursor.update_from_events(&events);
1477
1478        // Cursor should be unchanged
1479        assert_eq!(cursor.get(0), Some("original"));
1480    }
1481
1482    #[test]
1483    fn test_cursor_many_shards() {
1484        let mut cursor = CompositeCursor::new();
1485        for i in 0..100u16 {
1486            cursor.set(i, format!("pos-{}", i));
1487        }
1488
1489        let encoded = cursor.encode().unwrap();
1490        let decoded = CompositeCursor::decode(&encoded).unwrap();
1491
1492        for i in 0..100u16 {
1493            assert_eq!(decoded.get(i), Some(format!("pos-{}", i).as_str()));
1494        }
1495    }
1496
1497    #[test]
1498    fn test_consume_request_empty_shards() {
1499        let request = ConsumeRequest::new(100).shards(vec![]);
1500        assert_eq!(request.shards, Some(vec![]));
1501    }
1502
1503    #[test]
1504    fn test_consume_request_ordering_none() {
1505        let request = ConsumeRequest::new(100).ordering(Ordering::None);
1506        assert_eq!(request.ordering, Ordering::None);
1507    }
1508
1509    #[test]
1510    fn test_ordering_equality() {
1511        assert_eq!(Ordering::None, Ordering::None);
1512        assert_eq!(Ordering::InsertionTs, Ordering::InsertionTs);
1513        assert_ne!(Ordering::None, Ordering::InsertionTs);
1514    }
1515
1516    // Mock adapter for testing PollMerger
1517    use crate::adapter::{Adapter, ShardPollResult};
1518    use crate::error::AdapterError;
1519    use crate::event::Batch;
1520    use async_trait::async_trait;
1521    use std::collections::HashMap;
1522    use std::sync::RwLock;
1523
1524    struct MockAdapter {
1525        events: RwLock<HashMap<u16, Vec<StoredEvent>>>,
1526    }
1527
1528    impl MockAdapter {
1529        fn new() -> Self {
1530            Self {
1531                events: RwLock::new(HashMap::new()),
1532            }
1533        }
1534
1535        fn add_events(&self, shard_id: u16, events: Vec<StoredEvent>) {
1536            let mut map = self.events.write().unwrap();
1537            map.entry(shard_id).or_default().extend(events);
1538        }
1539    }
1540
1541    #[async_trait]
1542    impl Adapter for MockAdapter {
1543        async fn init(&mut self) -> Result<(), AdapterError> {
1544            Ok(())
1545        }
1546
1547        async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
1548            Ok(())
1549        }
1550
1551        async fn flush(&self) -> Result<(), AdapterError> {
1552            Ok(())
1553        }
1554
1555        async fn shutdown(&self) -> Result<(), AdapterError> {
1556            Ok(())
1557        }
1558
1559        async fn poll_shard(
1560            &self,
1561            shard_id: u16,
1562            from_id: Option<&str>,
1563            limit: usize,
1564        ) -> Result<ShardPollResult, AdapterError> {
1565            let map = self.events.read().unwrap();
1566            let events = map.get(&shard_id).cloned().unwrap_or_default();
1567
1568            // Filter by from_id if provided
1569            let filtered: Vec<_> = if let Some(from) = from_id {
1570                events
1571                    .into_iter()
1572                    .skip_while(|e| e.id != from)
1573                    .skip(1) // Skip the from_id itself
1574                    .collect()
1575            } else {
1576                events
1577            };
1578
1579            let has_more = filtered.len() > limit;
1580            let events: Vec<_> = filtered.into_iter().take(limit).collect();
1581            let next_id = events.last().map(|e| e.id.clone());
1582
1583            Ok(ShardPollResult {
1584                events,
1585                next_id,
1586                has_more,
1587            })
1588        }
1589
1590        fn name(&self) -> &'static str {
1591            "mock"
1592        }
1593    }
1594
1595    #[tokio::test]
1596    async fn test_poll_merger_new() {
1597        let adapter = Arc::new(MockAdapter::new());
1598        let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1599        assert_eq!(merger.shard_ids, vec![0, 1, 2, 3]);
1600    }
1601
1602    /// When the active shard id set is sparse (e.g. shard 0
1603    /// was scaled down, leaving `{1, 2}`), a poll with no explicit
1604    /// `request.shards` must hit shards 1 and 2 — not generate
1605    /// `[0, 1]` from a stale count and miss the live shard 2.
1606    ///
1607    /// Pre-fix, `PollMerger` stored only `num_shards: u16` and the
1608    /// default branch generated `(0..num_shards).collect()`, so
1609    /// shard 2's events were silently invisible to default-shards
1610    /// consumers after a scale-down.
1611    #[tokio::test]
1612    async fn poll_merger_default_shards_uses_active_id_set_after_scale_down() {
1613        let adapter = Arc::new(MockAdapter::new());
1614
1615        // Shard 0 (drained / no longer active): would mis-poll
1616        // pre-fix and could return stale data on adapters that
1617        // recreate streams on demand. We don't add events here.
1618        // Shard 1 + Shard 2: active set after a scale-down that
1619        // evicted shard 0.
1620        adapter.add_events(
1621            1,
1622            vec![StoredEvent::from_value(
1623                "1-a".to_string(),
1624                json!({"shard": 1}),
1625                100,
1626                1,
1627            )],
1628        );
1629        adapter.add_events(
1630            2,
1631            vec![StoredEvent::from_value(
1632                "2-a".to_string(),
1633                json!({"shard": 2}),
1634                200,
1635                2,
1636            )],
1637        );
1638
1639        // Sparse id set — shard 0 is NOT in the active list.
1640        let merger = PollMerger::new(adapter, vec![1, 2]);
1641
1642        // Default-shards request (no `shards` override).
1643        let request = ConsumeRequest::new(100);
1644        let response = merger.poll(request).await.unwrap();
1645
1646        let returned: std::collections::HashSet<u16> =
1647            response.events.iter().map(|e| e.shard_id).collect();
1648        assert!(
1649            returned.contains(&1),
1650            "default-shards poll must include shard 1 (active)",
1651        );
1652        assert!(
1653            returned.contains(&2),
1654            "default-shards poll must include shard 2 — pre-fix this was silently \
1655             skipped because the merger generated `0..num_shards` = `[0, 1]`",
1656        );
1657        assert!(
1658            !returned.contains(&0),
1659            "default-shards poll must NOT touch shard 0 — it was evicted",
1660        );
1661        assert_eq!(response.events.len(), 2);
1662    }
1663
1664    #[tokio::test]
1665    async fn test_poll_merger_empty_limit() {
1666        let adapter = Arc::new(MockAdapter::new());
1667        let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1668
1669        let request = ConsumeRequest::new(0);
1670        let response = merger.poll(request).await.unwrap();
1671
1672        assert!(response.events.is_empty());
1673        assert!(response.next_id.is_none());
1674        assert!(!response.has_more);
1675    }
1676
1677    #[tokio::test]
1678    async fn test_poll_merger_empty_shards() {
1679        let adapter = Arc::new(MockAdapter::new());
1680        let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1681
1682        let request = ConsumeRequest::new(100).shards(vec![]);
1683        let response = merger.poll(request).await.unwrap();
1684
1685        assert!(response.events.is_empty());
1686        assert!(response.next_id.is_none());
1687        assert!(!response.has_more);
1688    }
1689
1690    /// Production poll() must route every cursor write through the
1691    /// backend-format guard. Pre-fix, both the Step-1 adapter-
1692    /// next_id write and the Step-2 last-event override called the
1693    /// unchecked `set`, so a mid-stream backend migration
1694    /// (JetStream numeric ids → Redis stream ids of the form
1695    /// `<ms>-<seq>`) would silently overwrite the format and the
1696    /// documented "refuse to advance, operator must reset" guard
1697    /// from `update_from_events` never fired in production.
1698    #[test]
1699    fn composite_cursor_set_checked_refuses_format_change() {
1700        let mut cursor = CompositeCursor::new();
1701        cursor.set(0, "42".to_string()); // JetStream numeric
1702                                         // Same shard, new id in Redis stream format.
1703        let accepted = cursor.set_checked(0, "1700000000000-0");
1704        assert!(
1705            !accepted,
1706            "format change must be refused — set_checked must return false"
1707        );
1708        // Cursor must NOT have advanced.
1709        assert_eq!(cursor.get(0), Some("42"));
1710    }
1711
1712    #[test]
1713    fn composite_cursor_set_checked_accepts_same_format_advance() {
1714        let mut cursor = CompositeCursor::new();
1715        cursor.set(0, "42".to_string());
1716        // Newer JetStream id — same format, monotonically forward.
1717        assert!(cursor.set_checked(0, "100"));
1718        assert_eq!(cursor.get(0), Some("100"));
1719    }
1720
1721    #[test]
1722    fn composite_cursor_set_checked_accepts_first_write() {
1723        let mut cursor = CompositeCursor::new();
1724        assert!(cursor.set_checked(3, "1700000000000-0"));
1725        assert_eq!(cursor.get(3), Some("1700000000000-0"));
1726    }
1727
1728    /// A caller that forwards a non-deduplicated shard list — e.g.
1729    /// `vec![0, 0, 1]` — must not cause `poll()` to fetch the same
1730    /// shard twice and return duplicate events. Pre-fix, the merger
1731    /// consumed `shards` verbatim and issued one `poll_shard(0, …)`
1732    /// per occurrence; both returned the same events; the duplicates
1733    /// survived the per-shard limit and reached the caller.
1734    #[tokio::test]
1735    async fn poll_dedupes_duplicate_shard_ids_in_request() {
1736        let adapter = Arc::new(MockAdapter::new());
1737        adapter.add_events(
1738            0,
1739            vec![
1740                StoredEvent::from_value("s0-a".to_string(), json!({"v": 1}), 1, 0),
1741                StoredEvent::from_value("s0-b".to_string(), json!({"v": 2}), 2, 0),
1742            ],
1743        );
1744        adapter.add_events(
1745            1,
1746            vec![StoredEvent::from_value(
1747                "s1-a".to_string(),
1748                json!({"v": 3}),
1749                3,
1750                1,
1751            )],
1752        );
1753
1754        let merger = PollMerger::new(adapter, vec![0, 1]);
1755        let response = merger
1756            .poll(ConsumeRequest::new(100).shards(vec![0, 0, 1]))
1757            .await
1758            .unwrap();
1759
1760        // 2 events from shard 0 (not 4 — i.e. not fetched twice) plus
1761        // 1 event from shard 1 = 3 total. Pre-fix this was 5.
1762        assert_eq!(response.events.len(), 3);
1763        let s0_count = response.events.iter().filter(|e| e.shard_id == 0).count();
1764        assert_eq!(s0_count, 2, "shard 0 events must not be duplicated");
1765    }
1766
1767    /// Regression: per-shard adapter errors must surface on
1768    /// `ConsumeResponse.failed_shards`, not silently disappear
1769    /// after a `tracing::warn!`. Pre-fix the merger logged the
1770    /// error and continued; the response carried events from
1771    /// the surviving shards with no field indicating WHICH
1772    /// shards failed (in contrast to `stalled_shards` which IS
1773    /// surfaced). Operators correlating alerts with specific
1774    /// Redis / JetStream nodes had to grep logs instead of
1775    /// reading a structured response field.
1776    #[tokio::test]
1777    async fn poll_response_surfaces_failed_shard_ids() {
1778        // Mock that fails on a specific shard id.
1779        struct FailingShardMock {
1780            inner: MockAdapter,
1781            fail_shard: u16,
1782        }
1783
1784        #[async_trait]
1785        impl Adapter for FailingShardMock {
1786            async fn init(&mut self) -> Result<(), AdapterError> {
1787                Ok(())
1788            }
1789            async fn on_batch(&self, _b: Arc<Batch>) -> Result<(), AdapterError> {
1790                Ok(())
1791            }
1792            async fn flush(&self) -> Result<(), AdapterError> {
1793                Ok(())
1794            }
1795            async fn shutdown(&self) -> Result<(), AdapterError> {
1796                Ok(())
1797            }
1798            async fn poll_shard(
1799                &self,
1800                shard_id: u16,
1801                from_id: Option<&str>,
1802                limit: usize,
1803            ) -> Result<ShardPollResult, AdapterError> {
1804                if shard_id == self.fail_shard {
1805                    return Err(AdapterError::Transient(format!(
1806                        "synthetic failure on shard {shard_id}"
1807                    )));
1808                }
1809                self.inner.poll_shard(shard_id, from_id, limit).await
1810            }
1811            fn name(&self) -> &'static str {
1812                "failing-mock"
1813            }
1814        }
1815
1816        let inner = MockAdapter::new();
1817        // Shard 0 has events, shard 1 will fail, shard 2 has events.
1818        inner.add_events(
1819            0,
1820            vec![StoredEvent::from_value(
1821                "0-1".to_string(),
1822                json!({"shard": 0}),
1823                100,
1824                0,
1825            )],
1826        );
1827        inner.add_events(
1828            2,
1829            vec![StoredEvent::from_value(
1830                "2-1".to_string(),
1831                json!({"shard": 2}),
1832                100,
1833                2,
1834            )],
1835        );
1836
1837        let adapter = Arc::new(FailingShardMock {
1838            inner,
1839            fail_shard: 1,
1840        });
1841        let merger = PollMerger::new(adapter, vec![0, 1, 2]);
1842
1843        let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
1844
1845        // Surviving shards' events still come through.
1846        assert_eq!(
1847            response.events.len(),
1848            2,
1849            "events from non-failing shards must still be returned"
1850        );
1851
1852        // The failed shard id is surfaced on the response.
1853        assert_eq!(
1854            response.failed_shards,
1855            vec![1],
1856            "regression: failed_shards must list the shard whose adapter \
1857             errored. Pre-fix this list didn't exist; observers couldn't \
1858             tell which shard was missing without log scraping."
1859        );
1860    }
1861
1862    #[tokio::test]
1863    async fn test_poll_merger_with_events() {
1864        let adapter = Arc::new(MockAdapter::new());
1865
1866        // Add events to shard 0
1867        adapter.add_events(
1868            0,
1869            vec![
1870                StoredEvent::from_value("0-1".to_string(), json!({"type": "a"}), 100, 0),
1871                StoredEvent::from_value("0-2".to_string(), json!({"type": "b"}), 200, 0),
1872            ],
1873        );
1874
1875        // Add events to shard 1
1876        adapter.add_events(
1877            1,
1878            vec![StoredEvent::from_value(
1879                "1-1".to_string(),
1880                json!({"type": "c"}),
1881                150,
1882                1,
1883            )],
1884        );
1885
1886        let merger = PollMerger::new(adapter, vec![0, 1]);
1887
1888        let request = ConsumeRequest::new(100);
1889        let response = merger.poll(request).await.unwrap();
1890
1891        assert_eq!(response.events.len(), 3);
1892        assert!(response.next_id.is_some());
1893    }
1894
1895    #[tokio::test]
1896    async fn test_poll_merger_with_ordering() {
1897        let adapter = Arc::new(MockAdapter::new());
1898
1899        // Add events with different timestamps
1900        adapter.add_events(
1901            0,
1902            vec![
1903                StoredEvent::from_value("0-1".to_string(), json!({}), 300, 0),
1904                StoredEvent::from_value("0-2".to_string(), json!({}), 100, 0),
1905            ],
1906        );
1907        adapter.add_events(
1908            1,
1909            vec![StoredEvent::from_value(
1910                "1-1".to_string(),
1911                json!({}),
1912                200,
1913                1,
1914            )],
1915        );
1916
1917        let merger = PollMerger::new(adapter, vec![0, 1]);
1918
1919        let request = ConsumeRequest::new(100).ordering(Ordering::InsertionTs);
1920        let response = merger.poll(request).await.unwrap();
1921
1922        // Events should be sorted by insertion_ts
1923        assert_eq!(response.events.len(), 3);
1924        assert_eq!(response.events[0].insertion_ts, 100);
1925        assert_eq!(response.events[1].insertion_ts, 200);
1926        assert_eq!(response.events[2].insertion_ts, 300);
1927    }
1928
1929    #[tokio::test]
1930    async fn test_poll_merger_with_filter() {
1931        let adapter = Arc::new(MockAdapter::new());
1932
1933        adapter.add_events(
1934            0,
1935            vec![
1936                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
1937                StoredEvent::from_value("0-2".to_string(), json!({"type": "message"}), 200, 0),
1938                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 300, 0),
1939            ],
1940        );
1941
1942        let merger = PollMerger::new(adapter, vec![0]);
1943
1944        let request = ConsumeRequest::new(100).filter(Filter::eq("type", json!("token")));
1945        let response = merger.poll(request).await.unwrap();
1946
1947        assert_eq!(response.events.len(), 2);
1948        for event in &response.events {
1949            assert!(event.raw_str().unwrap().contains("token"));
1950        }
1951    }
1952
1953    #[tokio::test]
1954    async fn test_poll_merger_with_limit() {
1955        let adapter = Arc::new(MockAdapter::new());
1956
1957        adapter.add_events(
1958            0,
1959            vec![
1960                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1961                StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
1962                StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
1963            ],
1964        );
1965
1966        let merger = PollMerger::new(adapter, vec![0]);
1967
1968        let request = ConsumeRequest::new(2);
1969        let response = merger.poll(request).await.unwrap();
1970
1971        assert_eq!(response.events.len(), 2);
1972        assert!(response.has_more);
1973    }
1974
1975    #[tokio::test]
1976    async fn test_poll_merger_specific_shards() {
1977        let adapter = Arc::new(MockAdapter::new());
1978
1979        adapter.add_events(
1980            0,
1981            vec![StoredEvent::from_value(
1982                "0-1".to_string(),
1983                json!({"shard": 0}),
1984                100,
1985                0,
1986            )],
1987        );
1988        adapter.add_events(
1989            1,
1990            vec![StoredEvent::from_value(
1991                "1-1".to_string(),
1992                json!({"shard": 1}),
1993                100,
1994                1,
1995            )],
1996        );
1997        adapter.add_events(
1998            2,
1999            vec![StoredEvent::from_value(
2000                "2-1".to_string(),
2001                json!({"shard": 2}),
2002                100,
2003                2,
2004            )],
2005        );
2006
2007        let merger = PollMerger::new(adapter, vec![0, 1, 2]);
2008
2009        // Only poll shard 0 and 2
2010        let request = ConsumeRequest::new(100).shards(vec![0, 2]);
2011        let response = merger.poll(request).await.unwrap();
2012
2013        assert_eq!(response.events.len(), 2);
2014        let shard_ids: Vec<_> = response.events.iter().map(|e| e.shard_id).collect();
2015        assert!(shard_ids.contains(&0));
2016        assert!(shard_ids.contains(&2));
2017        assert!(!shard_ids.contains(&1));
2018    }
2019
2020    #[tokio::test]
2021    async fn test_poll_merger_with_cursor() {
2022        let adapter = Arc::new(MockAdapter::new());
2023
2024        adapter.add_events(
2025            0,
2026            vec![
2027                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2028                StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
2029                StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
2030            ],
2031        );
2032
2033        let merger = PollMerger::new(adapter, vec![0]);
2034
2035        // First poll
2036        let request = ConsumeRequest::new(2);
2037        let response1 = merger.poll(request).await.unwrap();
2038        assert_eq!(response1.events.len(), 2);
2039
2040        // Second poll with cursor
2041        let cursor = response1.next_id.unwrap();
2042        let request2 = ConsumeRequest::new(10).from(cursor);
2043        let response2 = merger.poll(request2).await.unwrap();
2044
2045        assert_eq!(response2.events.len(), 1);
2046        assert_eq!(response2.events[0].id, "0-3");
2047    }
2048
2049    #[tokio::test]
2050    async fn test_poll_merger_pagination_multi_shard() {
2051        // Test that pagination across multiple shards doesn't skip events
2052        let adapter = Arc::new(MockAdapter::new());
2053
2054        // Shard 0: 10 events
2055        let shard0_events: Vec<_> = (1..=10)
2056            .map(|i| {
2057                StoredEvent::from_value(
2058                    format!("0-{}", i),
2059                    json!({"shard": 0, "idx": i}),
2060                    i as u64 * 10,
2061                    0,
2062                )
2063            })
2064            .collect();
2065        adapter.add_events(0, shard0_events);
2066
2067        // Shard 1: 15 events
2068        let shard1_events: Vec<_> = (1..=15)
2069            .map(|i| {
2070                StoredEvent::from_value(
2071                    format!("1-{}", i),
2072                    json!({"shard": 1, "idx": i}),
2073                    i as u64 * 10 + 5,
2074                    1,
2075                )
2076            })
2077            .collect();
2078        adapter.add_events(1, shard1_events);
2079
2080        let merger = PollMerger::new(adapter, vec![0, 1]);
2081
2082        // Poll in pages of 10 and collect all events
2083        let mut all_events = Vec::new();
2084        let mut cursor: Option<String> = None;
2085        let mut iterations = 0;
2086
2087        loop {
2088            iterations += 1;
2089            let request = match &cursor {
2090                Some(c) => ConsumeRequest::new(10).from(c.clone()),
2091                None => ConsumeRequest::new(10),
2092            };
2093
2094            let response = merger.poll(request).await.unwrap();
2095            all_events.extend(response.events);
2096
2097            if !response.has_more {
2098                break;
2099            }
2100            cursor = response.next_id;
2101
2102            // Safety: prevent infinite loop
2103            if iterations > 10 {
2104                panic!("Too many iterations");
2105            }
2106        }
2107
2108        // Should get all 25 events (10 from shard 0 + 15 from shard 1)
2109        assert_eq!(
2110            all_events.len(),
2111            25,
2112            "Expected 25 events, got {}. Iterations: {}",
2113            all_events.len(),
2114            iterations
2115        );
2116
2117        // Verify we got events from both shards
2118        let shard0_count = all_events.iter().filter(|e| e.shard_id == 0).count();
2119        let shard1_count = all_events.iter().filter(|e| e.shard_id == 1).count();
2120        assert_eq!(shard0_count, 10, "Expected 10 events from shard 0");
2121        assert_eq!(shard1_count, 15, "Expected 15 events from shard 1");
2122    }
2123
2124    #[tokio::test]
2125    async fn test_poll_merger_pagination_no_duplicates() {
2126        // Test that pagination doesn't return duplicate events
2127        let adapter = Arc::new(MockAdapter::new());
2128
2129        // Add events to both shards
2130        for shard_id in 0..2u16 {
2131            let events: Vec<_> = (1..=20)
2132                .map(|i| {
2133                    StoredEvent::from_value(
2134                        format!("{}-{}", shard_id, i),
2135                        json!({"shard": shard_id, "idx": i}),
2136                        i as u64 * 10,
2137                        shard_id,
2138                    )
2139                })
2140                .collect();
2141            adapter.add_events(shard_id, events);
2142        }
2143
2144        let merger = PollMerger::new(adapter, vec![0, 1]);
2145
2146        // Poll in small pages
2147        let mut all_event_ids = Vec::new();
2148        let mut cursor: Option<String> = None;
2149
2150        for _ in 0..20 {
2151            let request = match &cursor {
2152                Some(c) => ConsumeRequest::new(5).from(c.clone()),
2153                None => ConsumeRequest::new(5),
2154            };
2155
2156            let response = merger.poll(request).await.unwrap();
2157            all_event_ids.extend(response.events.iter().map(|e| e.id.clone()));
2158
2159            if !response.has_more {
2160                break;
2161            }
2162            cursor = response.next_id;
2163        }
2164
2165        // Check for duplicates
2166        let unique_count = {
2167            let mut ids = all_event_ids.clone();
2168            ids.sort();
2169            ids.dedup();
2170            ids.len()
2171        };
2172
2173        assert_eq!(
2174            unique_count,
2175            all_event_ids.len(),
2176            "Found duplicate events! Total: {}, Unique: {}",
2177            all_event_ids.len(),
2178            unique_count
2179        );
2180
2181        // Should have all 40 events
2182        assert_eq!(all_event_ids.len(), 40);
2183    }
2184
2185    #[tokio::test]
2186    async fn test_poll_merger_pagination_with_ordering() {
2187        // Test pagination with timestamp ordering
2188        let adapter = Arc::new(MockAdapter::new());
2189
2190        // Add events with interleaved timestamps across shards
2191        adapter.add_events(
2192            0,
2193            vec![
2194                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2195                StoredEvent::from_value("0-2".to_string(), json!({}), 300, 0),
2196                StoredEvent::from_value("0-3".to_string(), json!({}), 500, 0),
2197            ],
2198        );
2199        adapter.add_events(
2200            1,
2201            vec![
2202                StoredEvent::from_value("1-1".to_string(), json!({}), 200, 1),
2203                StoredEvent::from_value("1-2".to_string(), json!({}), 400, 1),
2204            ],
2205        );
2206
2207        let merger = PollMerger::new(adapter, vec![0, 1]);
2208
2209        // Poll with ordering, page size 2
2210        let mut all_events = Vec::new();
2211        let mut cursor: Option<String> = None;
2212
2213        for _ in 0..5 {
2214            let mut request = ConsumeRequest::new(2).ordering(Ordering::InsertionTs);
2215            if let Some(c) = &cursor {
2216                request = request.from(c.clone());
2217            }
2218
2219            let response = merger.poll(request).await.unwrap();
2220            all_events.extend(response.events);
2221
2222            if !response.has_more {
2223                break;
2224            }
2225            cursor = response.next_id;
2226        }
2227
2228        // Should get all 5 events
2229        assert_eq!(all_events.len(), 5);
2230
2231        // Verify ordering is maintained
2232        let timestamps: Vec<_> = all_events.iter().map(|e| e.insertion_ts).collect();
2233        let mut sorted = timestamps.clone();
2234        sorted.sort();
2235        assert_eq!(timestamps, sorted, "Events should be sorted by timestamp");
2236    }
2237
2238    #[tokio::test]
2239    async fn test_poll_merger_cursor_tracks_returned_events_only() {
2240        // Test that cursor tracks position based on returned events, not fetched events
2241        let adapter = Arc::new(MockAdapter::new());
2242
2243        // Shard 0: 3 events
2244        adapter.add_events(
2245            0,
2246            vec![
2247                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2248                StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
2249                StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
2250            ],
2251        );
2252
2253        // Shard 1: 3 events
2254        adapter.add_events(
2255            1,
2256            vec![
2257                StoredEvent::from_value("1-1".to_string(), json!({}), 150, 1),
2258                StoredEvent::from_value("1-2".to_string(), json!({}), 250, 1),
2259                StoredEvent::from_value("1-3".to_string(), json!({}), 350, 1),
2260            ],
2261        );
2262
2263        let merger = PollMerger::new(adapter, vec![0, 1]);
2264
2265        // First poll with limit 2 - should get 2 events and cursor should reflect only those 2
2266        let response1 = merger.poll(ConsumeRequest::new(2)).await.unwrap();
2267        assert_eq!(response1.events.len(), 2);
2268        assert!(response1.has_more);
2269
2270        // Decode cursor to verify it tracks returned events
2271        let next_id = response1.next_id.clone().unwrap();
2272        let cursor = CompositeCursor::decode(&next_id).unwrap();
2273
2274        // Cursor should only have positions for shards that had events in the returned set
2275        let returned_shard_ids: std::collections::HashSet<_> =
2276            response1.events.iter().map(|e| e.shard_id).collect();
2277
2278        for shard_id in 0..2u16 {
2279            if returned_shard_ids.contains(&shard_id) {
2280                // Shard had returned events, cursor should have position
2281                assert!(
2282                    cursor.get(shard_id).is_some(),
2283                    "Cursor should have position for shard {} which had returned events",
2284                    shard_id
2285                );
2286            }
2287        }
2288
2289        // Second poll should continue from where we left off
2290        let response2 = merger
2291            .poll(ConsumeRequest::new(10).from(next_id))
2292            .await
2293            .unwrap();
2294
2295        // Should get remaining 4 events
2296        assert_eq!(response2.events.len(), 4, "Should get remaining 4 events");
2297    }
2298
2299    /// When the per-shard fetch hits the
2300    /// PER_SHARD_FETCH_CAP clamp, the response must surface
2301    /// `truncated_at_per_shard_cap = true` so callers can detect
2302    /// the silent under-delivery.
2303    #[tokio::test]
2304    async fn poll_merger_surfaces_per_shard_cap_truncation() {
2305        let adapter = Arc::new(MockAdapter::new());
2306        // Single shard — over-fetch factor 2 — request limit
2307        // 50 000 → unclamped per_shard would be 100 000 → clamped
2308        // to PER_SHARD_FETCH_CAP (10 000).
2309        adapter.add_events(
2310            0,
2311            (0..1)
2312                .map(|i| StoredEvent::from_value(format!("0-{}", i), json!({}), 100, 0))
2313                .collect(),
2314        );
2315
2316        let merger = PollMerger::new(adapter, vec![0]);
2317        let response = merger.poll(ConsumeRequest::new(50_000)).await.unwrap();
2318        assert!(
2319            response.truncated_at_per_shard_cap,
2320            "large limit must flag the per-shard cap clamp",
2321        );
2322    }
2323
2324    /// A single-shard request with a filter where every fetched
2325    /// event matches AND the per-shard cap clamps the fetch must
2326    /// not stall — each poll must advance the cursor by
2327    /// `request.limit` matches and the loop must drain in
2328    /// `ceil(total / limit)` iterations.
2329    ///
2330    /// Without the rollback+override pairing, the rollback step
2331    /// would roll the cursor back to the original position with
2332    /// no matching forward override, so the cursor never
2333    /// advances — re-fetching the same events on every poll. The
2334    /// current logic resolves this: Step 1 rolls back for shards
2335    /// with truncated matches; Step 2 overrides to the last
2336    /// *returned* match's id so each poll makes `request.limit`
2337    /// worth of forward progress.
2338    #[tokio::test]
2339    async fn poll_merger_does_not_stall_on_single_shard_filter_under_cap() {
2340        // Adapter holds 50 events on shard 0. With a filter that
2341        // matches every event, request.limit = 10 → per_shard_limit
2342        // could be 30 (limit×3 over_fetch_factor) without the cap;
2343        // here we keep numbers small so the test is deterministic.
2344        let adapter = Arc::new(MockAdapter::new());
2345        let mut events: Vec<StoredEvent> = (0..50)
2346            .map(|i| {
2347                StoredEvent::from_value(
2348                    format!("0-{}", i),
2349                    json!({"keep": true}),
2350                    (i + 1) as u64,
2351                    0,
2352                )
2353            })
2354            .collect();
2355        adapter.add_events(0, events.split_off(0));
2356
2357        let merger = PollMerger::new(adapter.clone(), vec![0]);
2358        // Filter that matches everything ('keep': true on all events).
2359        let make_request = |from_id: Option<String>| ConsumeRequest {
2360            limit: 10,
2361            from_id,
2362            shards: None,
2363            filter: Some(Filter::eq("keep", json!(true))),
2364            ordering: Ordering::None,
2365        };
2366
2367        let mut cursor: Option<String> = None;
2368        let mut total = 0;
2369        let mut polls = 0;
2370        loop {
2371            polls += 1;
2372            assert!(
2373                polls < 20,
2374                "poll loop must terminate; got {} polls without draining \
2375                 (cursor={:?}, total={})",
2376                polls,
2377                cursor,
2378                total,
2379            );
2380            let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2381            total += response.events.len();
2382            let new_cursor = response.next_id.clone();
2383            assert!(
2384                new_cursor.is_some(),
2385                "response must surface a cursor on every progress poll \
2386                 (poll={}, returned={})",
2387                polls,
2388                response.events.len(),
2389            );
2390            // The cursor MUST advance OR has_more must be false.
2391            // Pre-fix the cursor would echo back unchanged on a
2392            // stall; we'd hit the polls<20 watchdog above.
2393            if cursor == new_cursor {
2394                assert!(
2395                    !response.has_more,
2396                    "cursor stuck at {:?} but has_more=true → stall",
2397                    cursor,
2398                );
2399                break;
2400            }
2401            cursor = new_cursor;
2402            if !response.has_more && response.events.is_empty() {
2403                break;
2404            }
2405        }
2406        assert_eq!(
2407            total, 50,
2408            "full draining of 50 events must succeed without stall \
2409             (got {} in {} polls)",
2410            total, polls,
2411        );
2412    }
2413
2414    /// Multi-shard variant. The rollback+override logic must
2415    /// drain every match across multiple shards when each poll
2416    /// truncates matches from BOTH shards (so both shards land
2417    /// in the `rolled_back` set on the same poll). Without it,
2418    /// a stall on either shard would leave matches stranded;
2419    /// this exercises the dual-rollback path the single-shard
2420    /// test doesn't.
2421    #[tokio::test]
2422    async fn poll_merger_does_not_stall_on_multi_shard_filter_truncation() {
2423        let adapter = Arc::new(MockAdapter::new());
2424        // Two shards, 30 matching events each. Limit=10 means
2425        // every poll truncates matches from both shards
2426        // simultaneously (the global sort interleaves them, and
2427        // 10 < 2*30).
2428        for shard_id in 0..2u16 {
2429            let events: Vec<StoredEvent> = (0..30)
2430                .map(|i| {
2431                    StoredEvent::from_value(
2432                        format!("{}-{}", shard_id, i),
2433                        json!({"keep": true}),
2434                        // Stagger timestamps so the global sort
2435                        // interleaves shards: shard 0 ts even,
2436                        // shard 1 ts odd.
2437                        (i * 2 + shard_id as usize) as u64 + 1,
2438                        shard_id,
2439                    )
2440                })
2441                .collect();
2442            adapter.add_events(shard_id, events);
2443        }
2444
2445        let merger = PollMerger::new(adapter, vec![0, 1]);
2446        let make_request = |from_id: Option<String>| ConsumeRequest {
2447            limit: 10,
2448            from_id,
2449            shards: None,
2450            filter: Some(Filter::eq("keep", json!(true))),
2451            ordering: Ordering::None,
2452        };
2453
2454        let mut cursor: Option<String> = None;
2455        let mut returned: std::collections::HashSet<String> = std::collections::HashSet::new();
2456        let mut polls = 0;
2457        loop {
2458            polls += 1;
2459            assert!(
2460                polls < 30,
2461                "multi-shard: poll loop must terminate; \
2462                 got {} polls without draining (cursor={:?}, returned={})",
2463                polls,
2464                cursor,
2465                returned.len(),
2466            );
2467            let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2468            for e in &response.events {
2469                returned.insert(e.id.clone());
2470            }
2471            let new_cursor = response.next_id.clone();
2472            // Same stall guard as the single-shard test: the
2473            // cursor must advance OR has_more must be false on
2474            // any poll that returned events.
2475            if cursor == new_cursor {
2476                assert!(
2477                    !response.has_more,
2478                    "multi-shard: cursor stuck at {:?} but \
2479                     has_more=true → stall",
2480                    cursor,
2481                );
2482                break;
2483            }
2484            cursor = new_cursor;
2485            if !response.has_more && response.events.is_empty() {
2486                break;
2487            }
2488        }
2489        assert_eq!(
2490            returned.len(),
2491            60,
2492            "multi-shard: every match across both shards must \
2493             surface exactly once (got {} unique in {} polls)",
2494            returned.len(),
2495            polls,
2496        );
2497    }
2498
2499    /// Corollary: a small request that fits well below
2500    /// the cap must NOT flag truncation.
2501    #[tokio::test]
2502    async fn poll_merger_does_not_flag_truncation_on_small_limit() {
2503        let adapter = Arc::new(MockAdapter::new());
2504        adapter.add_events(
2505            0,
2506            vec![StoredEvent::from_value(
2507                "0-1".to_string(),
2508                json!({}),
2509                100,
2510                0,
2511            )],
2512        );
2513        let merger = PollMerger::new(adapter, vec![0]);
2514        let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2515        assert!(
2516            !response.truncated_at_per_shard_cap,
2517            "small limits must not flag the cap",
2518        );
2519    }
2520
2521    #[tokio::test]
2522    async fn test_poll_merger_small_limit_many_shards() {
2523        // Regression: limit < shard count caused integer division truncation to 0,
2524        // making per-shard fetch too small. Now uses ceiling division.
2525        let adapter = Arc::new(MockAdapter::new());
2526        let num_shards = 8u16;
2527
2528        for shard_id in 0..num_shards {
2529            adapter.add_events(
2530                shard_id,
2531                vec![StoredEvent::from_value(
2532                    format!("{}-1", shard_id),
2533                    json!({"shard": shard_id}),
2534                    100,
2535                    shard_id,
2536                )],
2537            );
2538        }
2539
2540        let merger = PollMerger::new(adapter, (0..num_shards).collect());
2541
2542        // Request fewer events than shards — should still work
2543        let request = ConsumeRequest::new(3);
2544        let response = merger.poll(request).await.unwrap();
2545
2546        assert_eq!(response.events.len(), 3);
2547        assert!(response.has_more);
2548    }
2549
2550    #[tokio::test]
2551    async fn test_regression_filtered_shards_cursor_advances() {
2552        // Bug 3: "Cursor never advances for filtered-out shards"
2553        //
2554        // When shard 1's events are entirely filtered out, the cursor for shard 1
2555        // must still advance past those events. Otherwise, subsequent polls will
2556        // re-fetch the same filtered-out events forever.
2557        let adapter = Arc::new(MockAdapter::new());
2558
2559        // Shard 0: events matching the filter
2560        adapter.add_events(
2561            0,
2562            vec![
2563                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2564                StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 200, 0),
2565            ],
2566        );
2567
2568        // Shard 1: events that will be filtered out
2569        adapter.add_events(
2570            1,
2571            vec![
2572                StoredEvent::from_value("1-1".to_string(), json!({"type": "message"}), 150, 1),
2573                StoredEvent::from_value("1-2".to_string(), json!({"type": "message"}), 250, 1),
2574            ],
2575        );
2576
2577        let merger = PollMerger::new(adapter, vec![0, 1]);
2578        let filter = Filter::eq("type", json!("token"));
2579
2580        // First poll: should return only the "token" events from shard 0
2581        let response1 = merger
2582            .poll(ConsumeRequest::new(100).filter(filter.clone()))
2583            .await
2584            .unwrap();
2585
2586        assert_eq!(response1.events.len(), 2, "Should get 2 token events");
2587        for event in &response1.events {
2588            assert_eq!(
2589                event.shard_id, 0,
2590                "All returned events should be from shard 0"
2591            );
2592        }
2593
2594        let cursor1 = response1
2595            .next_id
2596            .expect("Should have a cursor after first poll");
2597
2598        // Verify the cursor advanced for shard 1 even though its events were filtered out
2599        let decoded = CompositeCursor::decode(&cursor1).unwrap();
2600        assert!(
2601            decoded.get(1).is_some(),
2602            "Cursor must advance for shard 1 even though all its events were filtered out"
2603        );
2604        assert_eq!(
2605            decoded.get(1),
2606            Some("1-2"),
2607            "Shard 1 cursor should point to its last fetched event"
2608        );
2609
2610        // Second poll with the cursor: should NOT re-fetch shard 1's events
2611        let response2 = merger
2612            .poll(ConsumeRequest::new(100).filter(filter).from(cursor1))
2613            .await
2614            .unwrap();
2615
2616        assert!(
2617            response2.events.is_empty(),
2618            "Second poll should return no events (all events already consumed or filtered)"
2619        );
2620    }
2621
2622    #[tokio::test]
2623    async fn test_regression_poll_merger_filter_does_not_infinite_loop() {
2624        // Regression: when one shard has events matching the filter and another
2625        // shard has events that are all filtered out, polling in pages must
2626        // terminate and return all matching events without looping forever.
2627        let adapter = Arc::new(MockAdapter::new());
2628
2629        // Shard 0: 100 events all matching filter
2630        let shard0_events: Vec<_> = (1..=100)
2631            .map(|i| {
2632                StoredEvent::from_value(
2633                    format!("0-{}", i),
2634                    json!({"type": "token", "idx": i}),
2635                    i as u64 * 10,
2636                    0,
2637                )
2638            })
2639            .collect();
2640        adapter.add_events(0, shard0_events);
2641
2642        // Shard 1: 100 events none matching filter
2643        let shard1_events: Vec<_> = (1..=100)
2644            .map(|i| {
2645                StoredEvent::from_value(
2646                    format!("1-{}", i),
2647                    json!({"type": "message", "idx": i}),
2648                    i as u64 * 10 + 5,
2649                    1,
2650                )
2651            })
2652            .collect();
2653        adapter.add_events(1, shard1_events);
2654
2655        let merger = PollMerger::new(adapter, vec![0, 1]);
2656        let filter = Filter::eq("type", json!("token"));
2657
2658        let mut all_events = Vec::new();
2659        let mut cursor: Option<String> = None;
2660        let max_iterations = 50;
2661        let mut iterations = 0;
2662
2663        loop {
2664            iterations += 1;
2665            if iterations > max_iterations {
2666                panic!(
2667                    "Infinite loop detected after {} iterations! Collected {} events so far.",
2668                    max_iterations,
2669                    all_events.len()
2670                );
2671            }
2672
2673            let mut request = ConsumeRequest::new(50).filter(filter.clone());
2674            if let Some(c) = &cursor {
2675                request = request.from(c.clone());
2676            }
2677
2678            let response = merger.poll(request).await.unwrap();
2679            all_events.extend(response.events);
2680
2681            if !response.has_more {
2682                break;
2683            }
2684            cursor = response.next_id;
2685        }
2686
2687        // Should have collected exactly 100 matching events from shard 0
2688        assert_eq!(
2689            all_events.len(),
2690            100,
2691            "Expected 100 matching events, got {}. Iterations: {}",
2692            all_events.len(),
2693            iterations
2694        );
2695
2696        // All events should be from shard 0 (the "token" shard)
2697        for event in &all_events {
2698            assert_eq!(
2699                event.shard_id, 0,
2700                "All matching events should come from shard 0"
2701            );
2702        }
2703
2704        // Verify no duplicates
2705        let mut ids: Vec<_> = all_events.iter().map(|e| e.id.clone()).collect();
2706        ids.sort();
2707        ids.dedup();
2708        assert_eq!(ids.len(), 100, "Should have no duplicate events");
2709    }
2710
2711    #[tokio::test]
2712    async fn test_regression_all_events_filtered_returns_cursor() {
2713        // Regression: when every fetched event was filtered out, next_id was
2714        // None, leaving the caller stuck re-fetching the same events forever.
2715        let adapter = Arc::new(MockAdapter::new());
2716
2717        // Only non-matching events
2718        adapter.add_events(
2719            0,
2720            vec![
2721                StoredEvent::from_value("0-1".to_string(), json!({"type": "noise"}), 100, 0),
2722                StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 200, 0),
2723            ],
2724        );
2725
2726        let merger = PollMerger::new(adapter, vec![0]);
2727        let filter = Filter::eq("type", json!("signal"));
2728
2729        let response = merger
2730            .poll(ConsumeRequest::new(100).filter(filter))
2731            .await
2732            .unwrap();
2733
2734        // No events match, but cursor must still advance
2735        assert!(response.events.is_empty());
2736        assert!(
2737            response.next_id.is_some(),
2738            "cursor must advance past filtered events even when none match"
2739        );
2740    }
2741
2742    /// Exercises the non-lazy filter branch: when `Ordering::InsertionTs`
2743    /// is requested we can't short-circuit at `limit + 1` matches (the
2744    /// sort needs every event first), so the code falls through to
2745    /// `retain` → sort → truncate. This test pins:
2746    /// - Results are globally sorted by `insertion_ts` (not input order).
2747    /// - Only filter-matching events come through.
2748    /// - Truncation picks the `limit` *earliest* matches by ts.
2749    /// - `has_more` is set when matches exceed `limit`.
2750    #[tokio::test]
2751    async fn test_poll_merger_filter_insertion_ts_truncates_after_sort() {
2752        let adapter = Arc::new(MockAdapter::new());
2753
2754        // Interleave shards with out-of-order timestamps and a mix of
2755        // matching / non-matching events. Matching timestamps: 120, 200,
2756        // 260, 400. Non-matching timestamps: 100, 300.
2757        adapter.add_events(
2758            0,
2759            vec![
2760                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 400, 0),
2761                StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 100, 0),
2762                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 200, 0),
2763            ],
2764        );
2765        adapter.add_events(
2766            1,
2767            vec![
2768                StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 260, 1),
2769                StoredEvent::from_value("1-2".to_string(), json!({"type": "noise"}), 300, 1),
2770                StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 120, 1),
2771            ],
2772        );
2773
2774        let merger = PollMerger::new(adapter, vec![0, 1]);
2775        let filter = Filter::eq("type", json!("token"));
2776
2777        // 4 matches exist; asking for 2 must yield the two earliest
2778        // after a full sort (120, 200) and signal has_more.
2779        let response = merger
2780            .poll(
2781                ConsumeRequest::new(2)
2782                    .filter(filter)
2783                    .ordering(Ordering::InsertionTs),
2784            )
2785            .await
2786            .unwrap();
2787
2788        assert_eq!(response.events.len(), 2);
2789        assert_eq!(
2790            response.events[0].insertion_ts, 120,
2791            "earliest match must come first"
2792        );
2793        assert_eq!(response.events[1].insertion_ts, 200);
2794        assert!(
2795            response.has_more,
2796            "two more matching events remain past the limit"
2797        );
2798    }
2799
2800    #[tokio::test]
2801    async fn test_regression_corrupt_event_filter_drop_is_consistent_and_logged() {
2802        // Regression: corrupt events (raw bytes that don't deserialize as
2803        // JSON) used to be silently dropped from the filtered poll path
2804        // via `event.parse().map(...).unwrap_or(false)`, while the
2805        // unfiltered path returned them as-is. That inconsistency hid
2806        // upstream framing/storage corruption from anyone running with a
2807        // filter (i.e. most consumers).
2808        //
2809        // The fix routes parse failures through `event_matches_filter`,
2810        // which emits `tracing::warn!` per dropped event. We don't have
2811        // a tracing-test subscriber wired up so we don't assert on the
2812        // log line itself; instead we pin the behavioral surface so a
2813        // future regression that re-silences corruption (e.g., dropping
2814        // the helper) shows up in code review:
2815        //   - filtered poll: corrupt event is dropped, valid event kept
2816        //   - unfiltered poll: corrupt event flows through unchanged
2817        //
2818        // If the helper is removed or the warn! is downgraded to debug!,
2819        // this test still passes — but the helper's doc-comment names
2820        // the inconsistency and is the artifact that protects the
2821        // observability requirement.
2822        let adapter = Arc::new(MockAdapter::new());
2823        adapter.add_events(
2824            0,
2825            vec![
2826                StoredEvent::from_value("0-1".to_string(), json!({"type": "ok"}), 100, 0),
2827                // Raw bytes that don't parse as JSON — a torn write or
2828                // upstream framing bug surface.
2829                StoredEvent::new(
2830                    "0-2".to_string(),
2831                    bytes::Bytes::from_static(b"\xff\xff not json \xff"),
2832                    200,
2833                    0,
2834                ),
2835            ],
2836        );
2837
2838        let merger = PollMerger::new(adapter, vec![0]);
2839
2840        // Filtered: corrupt event must be dropped, valid event kept.
2841        let filtered = merger
2842            .poll(ConsumeRequest::new(100).filter(Filter::eq("type", json!("ok"))))
2843            .await
2844            .unwrap();
2845        assert_eq!(
2846            filtered.events.len(),
2847            1,
2848            "filtered poll must drop the corrupt event"
2849        );
2850        assert_eq!(filtered.events[0].id, "0-1");
2851
2852        // Unfiltered: corrupt event flows through. Documenting that the
2853        // unfiltered path is the *only* way an operator currently sees
2854        // the corrupt bytes — without the warn! the filtered path is
2855        // a black hole.
2856        let unfiltered = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2857        assert_eq!(
2858            unfiltered.events.len(),
2859            2,
2860            "unfiltered poll must surface the corrupt event verbatim"
2861        );
2862        let ids: Vec<_> = unfiltered.events.iter().map(|e| e.id.as_str()).collect();
2863        assert!(ids.contains(&"0-1"));
2864        assert!(ids.contains(&"0-2"));
2865    }
2866
2867    /// Regression: BUG_REPORT.md #2 — `Ordering::None` filter previously
2868    /// broke out of the drain loop once `kept.len() >= limit + 1`,
2869    /// which silently discarded events from later shards without
2870    /// checking the filter. Combined with `new_cursor` advancing for
2871    /// every polled shard, that meant matching events on un-inspected
2872    /// shards were lost forever.
2873    ///
2874    /// Setup: shard 0 has matches followed by shard 1 with matches.
2875    /// With `limit=2`, shard 0's first three events (two matches plus
2876    /// one extra to trigger has_more) used to satisfy the early break,
2877    /// silently dropping shard 1's matches AND advancing past them.
2878    /// The fix runs a full `retain` pass over every fetched event,
2879    /// then rolls back the cursor for shards whose matches were
2880    /// truncated so they're re-fetched on the next poll.
2881    #[tokio::test]
2882    async fn test_regression_ordering_none_filter_does_not_strand_later_shards() {
2883        let adapter = Arc::new(MockAdapter::new());
2884
2885        // Shard 0: 3 matching events.
2886        adapter.add_events(
2887            0,
2888            vec![
2889                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2890                StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2891                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2892            ],
2893        );
2894        // Shard 1: 3 matching events.
2895        adapter.add_events(
2896            1,
2897            vec![
2898                StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 200, 1),
2899                StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 210, 1),
2900                StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 220, 1),
2901            ],
2902        );
2903
2904        let merger = PollMerger::new(adapter, vec![0, 1]);
2905        let filter = Filter::eq("type", json!("token"));
2906
2907        // Page through with a small limit — over many polls every
2908        // matching event must surface exactly once. Bound iterations
2909        // to detect either a stall or an explosion.
2910        let mut all_returned: Vec<String> = Vec::new();
2911        let mut cursor: Option<String> = None;
2912        for _ in 0..20 {
2913            let mut req = ConsumeRequest::new(2).filter(filter.clone());
2914            if let Some(c) = &cursor {
2915                req = req.from(c.clone());
2916            }
2917            let resp = merger.poll(req).await.unwrap();
2918            for e in &resp.events {
2919                all_returned.push(e.id.clone());
2920            }
2921            if !resp.has_more {
2922                break;
2923            }
2924            cursor = resp.next_id;
2925        }
2926
2927        all_returned.sort();
2928        all_returned.dedup();
2929        assert_eq!(
2930            all_returned,
2931            vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2932            "every matching event from every shard must be returned exactly once"
2933        );
2934    }
2935
2936    /// Regression: BUG_REPORT.md #23 — `Ordering::InsertionTs` filter
2937    /// previously stranded matches on shards whose matching events
2938    /// all sorted later than `limit` matches from other shards. The
2939    /// global sort+truncate dropped them, the cursor-override only
2940    /// fired for shards present in the *returned* set, and so the
2941    /// cursor for the un-returned shard advanced to its fetched
2942    /// position via `new_cursor` — silently skipping the matches.
2943    ///
2944    /// Setup: shard 0 has 3 early-ts matches and shard 1 has 3
2945    /// late-ts matches. With `limit=2` and `InsertionTs` ordering,
2946    /// the first poll returns the two earliest from shard 0;
2947    /// shard 1's matches must NOT be lost. The fix detects that
2948    /// shard 1 had matches truncated and rolls its cursor back so
2949    /// they're re-fetched on the next poll.
2950    #[tokio::test]
2951    async fn test_regression_insertion_ts_filter_does_not_strand_late_shard() {
2952        let adapter = Arc::new(MockAdapter::new());
2953
2954        adapter.add_events(
2955            0,
2956            vec![
2957                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2958                StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2959                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2960            ],
2961        );
2962        adapter.add_events(
2963            1,
2964            vec![
2965                StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 1000, 1),
2966                StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 1010, 1),
2967                StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 1020, 1),
2968            ],
2969        );
2970
2971        let merger = PollMerger::new(adapter, vec![0, 1]);
2972        let filter = Filter::eq("type", json!("token"));
2973
2974        let mut all_returned: Vec<String> = Vec::new();
2975        let mut cursor: Option<String> = None;
2976        for _ in 0..20 {
2977            let mut req = ConsumeRequest::new(2)
2978                .filter(filter.clone())
2979                .ordering(Ordering::InsertionTs);
2980            if let Some(c) = &cursor {
2981                req = req.from(c.clone());
2982            }
2983            let resp = merger.poll(req).await.unwrap();
2984            for e in &resp.events {
2985                all_returned.push(e.id.clone());
2986            }
2987            if !resp.has_more {
2988                break;
2989            }
2990            cursor = resp.next_id;
2991        }
2992
2993        all_returned.sort();
2994        all_returned.dedup();
2995        assert_eq!(
2996            all_returned,
2997            vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2998            "matches from the late-ts shard must not be lost to truncation"
2999        );
3000    }
3001
3002    /// Regression: BUG_REPORT.md #50 — if any adapter returns
3003    /// `has_more: true` with no events and no `next_id`, the merger
3004    /// previously forwarded that as `(has_more=true, next_id=None)`,
3005    /// causing the caller to re-poll from the same starting cursor
3006    /// indefinitely. The fix suppresses `has_more` whenever the
3007    /// merger itself made no observable progress (no events AND no
3008    /// cursor advance).
3009    #[tokio::test]
3010    async fn has_more_is_suppressed_when_no_progress() {
3011        struct LiarAdapter;
3012
3013        #[async_trait]
3014        impl Adapter for LiarAdapter {
3015            async fn init(&mut self) -> Result<(), AdapterError> {
3016                Ok(())
3017            }
3018            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3019                Ok(())
3020            }
3021            async fn flush(&self) -> Result<(), AdapterError> {
3022                Ok(())
3023            }
3024            async fn shutdown(&self) -> Result<(), AdapterError> {
3025                Ok(())
3026            }
3027            async fn poll_shard(
3028                &self,
3029                _shard_id: u16,
3030                _from_id: Option<&str>,
3031                _limit: usize,
3032            ) -> Result<ShardPollResult, AdapterError> {
3033                // The pathological case: claim has_more without
3034                // returning events or advancing the cursor.
3035                Ok(ShardPollResult {
3036                    events: Vec::new(),
3037                    next_id: None,
3038                    has_more: true,
3039                })
3040            }
3041            fn name(&self) -> &'static str {
3042                "liar"
3043            }
3044        }
3045
3046        let adapter: Arc<dyn Adapter> = Arc::new(LiarAdapter);
3047        let merger = PollMerger::new(adapter, vec![0, 1]);
3048        let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
3049
3050        assert!(
3051            response.events.is_empty(),
3052            "no events were emitted, but merger returned {}",
3053            response.events.len()
3054        );
3055        // The whole point of this fix: don't let a misbehaving
3056        // adapter trick the caller into an infinite re-poll.
3057        assert!(
3058            !response.has_more,
3059            "has_more must be suppressed when merger made no progress (#50)"
3060        );
3061        assert!(
3062            response.next_id.is_none(),
3063            "next_id must remain None when no progress was made (#50)"
3064        );
3065    }
3066
3067    /// Pin: a stalled poll (no events, no cursor advance) that
3068    /// was given an input cursor must echo the cursor back to the
3069    /// caller. Pre-fix the merger returned `next_id = None` on no
3070    /// progress, so a caller that interpreted None as "no events
3071    /// — restart from the beginning" silently re-fetched from the
3072    /// stream's start across the stall, losing pagination
3073    /// continuity.
3074    #[tokio::test]
3075    async fn stalled_poll_echoes_caller_cursor_back() {
3076        struct EmptyAdapter;
3077
3078        #[async_trait]
3079        impl Adapter for EmptyAdapter {
3080            async fn init(&mut self) -> Result<(), AdapterError> {
3081                Ok(())
3082            }
3083            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3084                Ok(())
3085            }
3086            async fn flush(&self) -> Result<(), AdapterError> {
3087                Ok(())
3088            }
3089            async fn shutdown(&self) -> Result<(), AdapterError> {
3090                Ok(())
3091            }
3092            async fn poll_shard(
3093                &self,
3094                _shard_id: u16,
3095                _from_id: Option<&str>,
3096                _limit: usize,
3097            ) -> Result<ShardPollResult, AdapterError> {
3098                Ok(ShardPollResult {
3099                    events: Vec::new(),
3100                    next_id: None,
3101                    has_more: false,
3102                })
3103            }
3104            fn name(&self) -> &'static str {
3105                "empty"
3106            }
3107        }
3108
3109        let adapter: Arc<dyn Adapter> = Arc::new(EmptyAdapter);
3110        let merger = PollMerger::new(adapter, vec![0, 1]);
3111
3112        // Build a real composite cursor for the request to echo.
3113        let mut cursor = CompositeCursor::new();
3114        cursor.set(0, "1702-0".to_string());
3115        cursor.set(1, "1703-0".to_string());
3116        let encoded = cursor.encode().unwrap();
3117
3118        let mut req = ConsumeRequest::new(100);
3119        req.from_id = Some(encoded.clone());
3120
3121        let response = merger.poll(req).await.unwrap();
3122
3123        assert!(response.events.is_empty());
3124        assert_eq!(
3125            response.next_id.as_deref(),
3126            Some(encoded.as_str()),
3127            "stalled poll with input cursor must echo cursor back \
3128             (got {:?}); pre-fix this was None and callers paged \
3129             back to the stream's start",
3130            response.next_id,
3131        );
3132
3133        // Without an input cursor, no progress + no input → still
3134        // None (preserving the prior behavior of #50).
3135        let response_no_cursor = merger.poll(ConsumeRequest::new(100)).await.unwrap();
3136        assert!(response_no_cursor.next_id.is_none());
3137    }
3138
3139    /// Regression: a shard whose `set_checked` cursor write is refused
3140    /// because of a backend-format mismatch (e.g. a JetStream → Redis
3141    /// migration mid-stream) must NOT have its events delivered to
3142    /// the caller. Pre-fix `nc.set_checked(...)`'s `bool` return was
3143    /// discarded — events were merged into the response while the
3144    /// cursor stayed pinned at the old format, so every subsequent
3145    /// poll re-fetched and re-delivered the same events forever.
3146    ///
3147    /// Post-fix:
3148    ///   - the refused shard's events are dropped from the response,
3149    ///   - its id is surfaced in `failed_shards` so the caller sees
3150    ///     the migration as a structured signal, and
3151    ///   - a shard whose write was accepted is unaffected (events
3152    ///     still flow through, cursor still advances).
3153    #[tokio::test]
3154    async fn refused_set_checked_drops_shard_events_and_surfaces_failure() {
3155        // Adapter that ignores `from_id` and always emits the same
3156        // events with the same next_id. The test seeds an input
3157        // cursor in a different format so set_checked refuses.
3158        struct FormatAdapter {
3159            events_by_shard: HashMap<u16, Vec<StoredEvent>>,
3160        }
3161        #[async_trait]
3162        impl Adapter for FormatAdapter {
3163            async fn init(&mut self) -> Result<(), AdapterError> {
3164                Ok(())
3165            }
3166            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3167                Ok(())
3168            }
3169            async fn flush(&self) -> Result<(), AdapterError> {
3170                Ok(())
3171            }
3172            async fn shutdown(&self) -> Result<(), AdapterError> {
3173                Ok(())
3174            }
3175            async fn poll_shard(
3176                &self,
3177                shard_id: u16,
3178                _from_id: Option<&str>,
3179                _limit: usize,
3180            ) -> Result<ShardPollResult, AdapterError> {
3181                let events = self
3182                    .events_by_shard
3183                    .get(&shard_id)
3184                    .cloned()
3185                    .unwrap_or_default();
3186                let next_id = events.last().map(|e| e.id.clone());
3187                Ok(ShardPollResult {
3188                    events,
3189                    next_id,
3190                    has_more: false,
3191                })
3192            }
3193            fn name(&self) -> &'static str {
3194                "format-adapter"
3195            }
3196        }
3197
3198        let mut events_by_shard: HashMap<u16, Vec<StoredEvent>> = HashMap::new();
3199        // shard 0 emits Redis-format ids
3200        events_by_shard.insert(
3201            0,
3202            vec![
3203                StoredEvent::from_value("1700-0".to_string(), json!({"shard": 0}), 100, 0),
3204                StoredEvent::from_value("1701-0".to_string(), json!({"shard": 0}), 101, 0),
3205            ],
3206        );
3207        // shard 1 emits numeric (JetStream-format) ids
3208        events_by_shard.insert(
3209            1,
3210            vec![StoredEvent::from_value(
3211                "42".to_string(),
3212                json!({"shard": 1}),
3213                100,
3214                1,
3215            )],
3216        );
3217
3218        let adapter: Arc<dyn Adapter> = Arc::new(FormatAdapter { events_by_shard });
3219        let merger = PollMerger::new(adapter, vec![0, 1]);
3220
3221        // Seed the input cursor in the OPPOSITE format for each
3222        // shard so set_checked is guaranteed to refuse both writes.
3223        // - shard 0 cursor in Numeric format; adapter wants Redis  → refused
3224        // - shard 1 cursor in Redis format;   adapter wants Numeric → refused
3225        let mut cursor = CompositeCursor::new();
3226        cursor.set(0, "41".to_string());
3227        cursor.set(1, "1600-0".to_string());
3228        let encoded = cursor.encode().unwrap();
3229
3230        // Use a filter so `new_cursor` is populated (set_checked
3231        // path #1 only fires on the filter branch). The filter
3232        // matches every event so it doesn't drop them on its own.
3233        let mut req = ConsumeRequest::new(10);
3234        req.from_id = Some(encoded.clone());
3235        // NOT(empty-AND) is a tautology — `Filter::and(vec![])`
3236        // matches nothing, so the negation matches everything.
3237        // We only need filter=Some(...) so the `new_cursor` clone
3238        // is built and the Step-1 set_checked path runs.
3239        req.filter = Some(crate::consumer::Filter::not(crate::consumer::Filter::and(
3240            Vec::new(),
3241        )));
3242
3243        let response = merger.poll(req).await.unwrap();
3244
3245        assert!(
3246            response.events.is_empty(),
3247            "events from refused shards must be dropped; got {} \
3248             event(s) — pre-fix this is where the duplicate-delivery \
3249             loop began",
3250            response.events.len(),
3251        );
3252        let mut failed = response.failed_shards.clone();
3253        failed.sort_unstable();
3254        assert_eq!(
3255            failed,
3256            vec![0u16, 1],
3257            "both shards must surface in failed_shards so the caller \
3258             sees the backend-format mismatch as a structured signal",
3259        );
3260    }
3261
3262    /// Companion: when set_checked is refused for ONE shard but
3263    /// accepted for another, only the refused shard is dropped — the
3264    /// accepted shard's events flow through normally.
3265    #[tokio::test]
3266    async fn refused_set_checked_does_not_drop_unaffected_shards() {
3267        struct FormatAdapter {
3268            events_by_shard: HashMap<u16, Vec<StoredEvent>>,
3269        }
3270        #[async_trait]
3271        impl Adapter for FormatAdapter {
3272            async fn init(&mut self) -> Result<(), AdapterError> {
3273                Ok(())
3274            }
3275            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3276                Ok(())
3277            }
3278            async fn flush(&self) -> Result<(), AdapterError> {
3279                Ok(())
3280            }
3281            async fn shutdown(&self) -> Result<(), AdapterError> {
3282                Ok(())
3283            }
3284            async fn poll_shard(
3285                &self,
3286                shard_id: u16,
3287                _from_id: Option<&str>,
3288                _limit: usize,
3289            ) -> Result<ShardPollResult, AdapterError> {
3290                let events = self
3291                    .events_by_shard
3292                    .get(&shard_id)
3293                    .cloned()
3294                    .unwrap_or_default();
3295                let next_id = events.last().map(|e| e.id.clone());
3296                Ok(ShardPollResult {
3297                    events,
3298                    next_id,
3299                    has_more: false,
3300                })
3301            }
3302            fn name(&self) -> &'static str {
3303                "format-adapter-partial"
3304            }
3305        }
3306
3307        let mut events_by_shard: HashMap<u16, Vec<StoredEvent>> = HashMap::new();
3308        // Both shards emit Redis-format ids on the adapter side.
3309        events_by_shard.insert(
3310            0,
3311            vec![StoredEvent::from_value(
3312                "1700-0".to_string(),
3313                json!({"shard": 0}),
3314                100,
3315                0,
3316            )],
3317        );
3318        events_by_shard.insert(
3319            1,
3320            vec![StoredEvent::from_value(
3321                "1700-0".to_string(),
3322                json!({"shard": 1}),
3323                100,
3324                1,
3325            )],
3326        );
3327
3328        // Seed cursor: shard 0 in Numeric format (will be refused),
3329        // shard 1 in Redis format (will be accepted).
3330        let mut cursor = CompositeCursor::new();
3331        cursor.set(0, "41".to_string());
3332        cursor.set(1, "1600-0".to_string());
3333        let encoded = cursor.encode().unwrap();
3334
3335        let adapter: Arc<dyn Adapter> = Arc::new(FormatAdapter { events_by_shard });
3336        let merger = PollMerger::new(adapter, vec![0, 1]);
3337        let mut req = ConsumeRequest::new(10);
3338        req.from_id = Some(encoded);
3339        // NOT(empty-AND) is a tautology — `Filter::and(vec![])`
3340        // matches nothing, so the negation matches everything.
3341        // We only need filter=Some(...) so the `new_cursor` clone
3342        // is built and the Step-1 set_checked path runs.
3343        req.filter = Some(crate::consumer::Filter::not(crate::consumer::Filter::and(
3344            Vec::new(),
3345        )));
3346
3347        let response = merger.poll(req).await.unwrap();
3348
3349        assert_eq!(
3350            response.events.len(),
3351            1,
3352            "shard 1's event must still be delivered; shard 0's must be dropped",
3353        );
3354        assert_eq!(response.events[0].shard_id, 1);
3355        assert_eq!(response.failed_shards, vec![0u16]);
3356    }
3357
3358    /// Regression: BUG_REPORT.md #52 — `sort_by_key(|e| e.insertion_ts)`
3359    /// is stable but ties across shards depend on `join_all`'s
3360    /// completion order, which is non-deterministic. Combined with
3361    /// `truncate(limit)`, this could drop or duplicate events at the
3362    /// limit boundary across consecutive polls. The fix breaks ties
3363    /// deterministically on `(shard_id, id)`.
3364    #[tokio::test]
3365    async fn sort_breaks_ties_deterministically_across_shards() {
3366        // Two shards with events that share `insertion_ts` so the
3367        // tiebreaker controls the order.
3368        let adapter = Arc::new(MockAdapter::new());
3369        adapter.add_events(
3370            0,
3371            vec![
3372                StoredEvent::from_value("0-a".to_string(), json!({}), 100, 0),
3373                StoredEvent::from_value("0-b".to_string(), json!({}), 100, 0),
3374            ],
3375        );
3376        adapter.add_events(
3377            1,
3378            vec![
3379                StoredEvent::from_value("1-a".to_string(), json!({}), 100, 1),
3380                StoredEvent::from_value("1-b".to_string(), json!({}), 100, 1),
3381            ],
3382        );
3383
3384        // Poll many times; the order must be stable.
3385        let merger = PollMerger::new(adapter, vec![0, 1]);
3386        let mut prior_order: Option<Vec<String>> = None;
3387        for iter in 0..20 {
3388            let r = merger
3389                .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
3390                .await
3391                .unwrap();
3392            let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
3393            if let Some(prev) = &prior_order {
3394                assert_eq!(
3395                    &ids, prev,
3396                    "iter {iter}: order is non-deterministic — sort tie-break failed (#52)"
3397                );
3398            }
3399            prior_order = Some(ids);
3400        }
3401
3402        // And the order must match `(shard_id, id)`.
3403        let r = merger
3404            .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
3405            .await
3406            .unwrap();
3407        let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
3408        assert_eq!(ids, vec!["0-a", "0-b", "1-a", "1-b"]);
3409    }
3410}