Skip to main content

net/consumer/
merge.rs

1//! Cross-shard poll merge layer.
2//!
3//! This module handles polling from multiple shards and merging the results
4//! into a unified stream with proper cursor management.
5//!
6//! # Composite Cursor
7//!
8//! When polling multiple shards, we track position in each shard using a
9//! composite cursor encoded as base64 JSON:
10//!
11//! ```json
12//! {"0": "1702123456789-0", "1": "1702123456790-0", ...}
13//! ```
14
15use std::cmp::Ordering as CmpOrdering;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
20use serde::{Deserialize, Serialize};
21
22use crate::adapter::{Adapter, ShardPollResult};
23use crate::consumer::filter::Filter;
24use crate::error::{AdapterError, ConsumerError};
25use crate::event::StoredEvent;
26
27/// Compare two adapter-emitted stream ids using numeric semantics for
28/// the formats both built-in adapters produce, with a lex fallback for
29/// already-lex-comparable opaque ids (ULID, UUIDv7, fixed-width hex).
30///
31/// A raw `str::cmp` would invert ordering on the unpadded numeric
32/// ids the JetStream adapter emits (`seq.to_string()`) and the
33/// Redis Streams server-side `{ms}-{seq}` format — `"9" > "10"`
34/// lexicographically would wedge the cursor at every decade
35/// boundary. The structured comparator below handles both formats
36/// numerically. Mixed-padding comparisons across upgrades still
37/// compare correctly because parse-then-compare ignores leading
38/// zeros.
39///
40/// Order of attempts:
41/// 1. Both ids parse as `<u64>-<u64>` (Redis Streams).
42/// 2. Both ids parse as `<u128>` (raw numeric, padded or unpadded).
43/// 3. Lex compare (ULID, UUID, hex digests, etc.).
44///
45/// We deliberately do not try to mix formats — if one side is `123`
46/// and the other is `456-0`, the lex fallback kicks in. In practice
47/// a single adapter emits a single format, so this only matters for
48/// pathological mixed-source streams.
49pub(crate) fn compare_stream_ids(a: &str, b: &str) -> CmpOrdering {
50    // Redis Streams `<ms>-<seq>`.
51    if let (Some((a_ms, a_seq)), Some((b_ms, b_seq))) = (split_redis_id(a), split_redis_id(b)) {
52        return (a_ms, a_seq).cmp(&(b_ms, b_seq));
53    }
54    // Plain numeric (JetStream `seq.to_string()` or future zero-padded form).
55    if let (Ok(an), Ok(bn)) = (a.parse::<u128>(), b.parse::<u128>()) {
56        return an.cmp(&bn);
57    }
58    // Opaque id — assumed already lex-comparable.
59    a.cmp(b)
60}
61
62fn split_redis_id(s: &str) -> Option<(u64, u64)> {
63    let (ms, seq) = s.split_once('-')?;
64    Some((ms.parse().ok()?, seq.parse().ok()?))
65}
66
67/// Coarse classifier for a stream id. Two ids of the same
68/// format compare safely via `compare_stream_ids`; two ids of
69/// different formats fall through to a lex compare that may
70/// wedge the cursor (see `update_from_events`).
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub(crate) enum IdFormat {
73    /// Redis Streams `<ms>-<seq>`.
74    Redis,
75    /// Plain numeric (JetStream `seq.to_string()`).
76    Numeric,
77    /// Anything else — assumed lex-comparable.
78    Opaque,
79}
80
81pub(crate) fn id_format(s: &str) -> IdFormat {
82    if split_redis_id(s).is_some() {
83        IdFormat::Redis
84    } else if s.parse::<u128>().is_ok() {
85        IdFormat::Numeric
86    } else {
87        IdFormat::Opaque
88    }
89}
90
91/// Backing type for per-shard cursor positions. `Arc<str>` makes
92/// cursor clones (and internal copies during poll merging) cheap by
93/// reference-counting the id bytes rather than copying them.
94type CursorPos = Arc<str>;
95
96/// Ordering mode for consumed events.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum Ordering {
99    /// Return events in arbitrary order (fastest).
100    #[default]
101    None,
102    /// Sort events by insertion timestamp (cross-shard ordering).
103    InsertionTs,
104}
105
106/// Composite cursor tracking position across multiple shards.
107#[derive(Debug, Clone, Default, Serialize, Deserialize)]
108pub struct CompositeCursor {
109    /// Per-shard positions (shard_id -> stream_id).
110    ///
111    /// Stored as `Arc<str>` so internal copies (e.g. `cursor.clone()`
112    /// inside the poll merger) bump a refcount instead of duplicating
113    /// each id's bytes.
114    #[serde(flatten)]
115    pub positions: HashMap<u16, CursorPos>,
116}
117
118impl CompositeCursor {
119    /// Create an empty cursor.
120    pub fn new() -> Self {
121        Self::default()
122    }
123
124    /// Encode the cursor as a base64 string.
125    ///
126    /// Pre-fix used `unwrap_or_default()`, which silently
127    /// produced an empty string on serialization failure. The
128    /// empty cursor then base64-encoded to an empty string and
129    /// the consumer's next poll restarted from the beginning of
130    /// the stream — silent rewind. For the current `positions`
131    /// schema (`HashMap<u16, Arc<str>>`), serialization is
132    /// infallible, so the failure path is unreachable.
133    ///
134    /// We surface that as a `ConsumerError::InvalidCursor` rather
135    /// than a panic. `poll()` is an `async fn`, and a panic that
136    /// propagates from there can abort the surrounding tokio
137    /// runtime worker. Returning `Err` lets `poll()` stay
138    /// non-panicking even if a future schema change breaks the
139    /// invariant; the caller will see a structured error and can
140    /// retry / log instead of taking down a worker.
141    pub fn encode(&self) -> Result<String, ConsumerError> {
142        let json = serde_json::to_string(&self.positions).map_err(|e| {
143            ConsumerError::InvalidCursor(format!(
144                "CompositeCursor::encode failed to serialize positions \
145                 (HashMap<u16, Arc<str>> should be infallible): {e}"
146            ))
147        })?;
148        Ok(BASE64.encode(json.as_bytes()))
149    }
150
151    /// Decode a cursor from a base64 string.
152    pub fn decode(s: &str) -> Result<Self, ConsumerError> {
153        let bytes = BASE64
154            .decode(s)
155            .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
156
157        // Two-pass parse so non-canonical shard-id keys (e.g.
158        // `"00"` aliasing `"0"`) are rejected explicitly. Pre-fix
159        // we deserialized straight into `HashMap<u16, _>`; serde
160        // parses each string key as u16, so `"0"` and `"00"`
161        // both produce key 0 and the second insert silently
162        // overwrites the first. The collision is benign in
163        // production (no caller emits non-canonical keys) but
164        // a malicious or buggy producer could inject a hostile
165        // cursor that ambiguates which shard's position a
166        // consumer ended up with on round-trip.
167        let raw_positions: HashMap<String, CursorPos> = serde_json::from_slice(&bytes)
168            .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
169        let mut positions: HashMap<u16, CursorPos> = HashMap::with_capacity(raw_positions.len());
170        for (key, val) in raw_positions {
171            let id: u16 = key.parse().map_err(|_| {
172                ConsumerError::InvalidCursor(format!("shard key {key:?} is not a valid u16"))
173            })?;
174            // Reject non-canonical stringifications. The
175            // round-trip `u16 → String` is the canonical form;
176            // any other string that parses to the same u16 is
177            // a non-canonical alias.
178            if id.to_string() != key {
179                return Err(ConsumerError::InvalidCursor(format!(
180                    "non-canonical shard key {key:?} (parses to {id}, \
181                     canonical form is {id})"
182                )));
183            }
184            if positions.insert(id, val).is_some() {
185                // Defensive: if non-canonical detection above
186                // missed something (it shouldn't), a duplicate
187                // canonical key is also a structural error.
188                return Err(ConsumerError::InvalidCursor(format!(
189                    "duplicate shard key {id} after canonicalization"
190                )));
191            }
192        }
193
194        Ok(Self { positions })
195    }
196
197    /// Get the position for a specific shard.
198    pub fn get(&self, shard_id: u16) -> Option<&str> {
199        self.positions.get(&shard_id).map(|s| s.as_ref())
200    }
201
202    /// Set the position for a specific shard.
203    ///
204    /// Accepts anything that converts into an `Arc<str>` — notably
205    /// `String`, `&str`, and `Arc<str>` itself. This lets adapters
206    /// hand us a freshly-allocated `String` (becomes a single boxed
207    /// allocation) without forcing a second copy for the cursor.
208    pub fn set(&mut self, shard_id: u16, position: impl Into<CursorPos>) {
209        self.positions.insert(shard_id, position.into());
210    }
211
212    /// Update positions from consumed events.
213    ///
214    /// Per-shard CAS routed through `compare_stream_ids`, which
215    /// understands the Redis (`<ms>-<seq>`) and JetStream (`<u64>`)
216    /// formats numerically and falls back to lex for opaque ids
217    /// (ULID, UUIDv7, hex digests). The cursor cannot regress and
218    /// decade-rollovers cannot freeze it. Unconditional inserts
219    /// would let whichever event for a given `shard_id` appeared
220    /// *last* in the slice win regardless of stream order; a plain
221    /// `str::cmp` CAS would wedge on the unpadded numeric ids both
222    /// built-in adapters emit (`"9" > "10"` lexicographically).
223    pub fn update_from_events(&mut self, events: &[StoredEvent]) {
224        for event in events {
225            let new_id = event.id.as_str();
226            match self.positions.get(&event.shard_id) {
227                Some(existing) => {
228                    // Detect a backend-format change before
229                    // calling the comparator. Pre-fix a cursor
230                    // at `"42"` (JetStream numeric) confronted
231                    // with a new `"1700-0"` (Redis) fell through
232                    // both structured branches of
233                    // `compare_stream_ids`, hit the lex fallback
234                    // (`'4' > '1'`), and the CAS guard refused
235                    // to update — silent stall requiring manual
236                    // cursor reset. Detect the mismatch
237                    // explicitly: surface a loud error and
238                    // refuse the update so operators see the
239                    // backend migration in logs and reset the
240                    // cursor deliberately. Keeping the existing
241                    // value (rather than blindly accepting the
242                    // new one) avoids a potential regression in
243                    // the other direction.
244                    let existing_fmt = id_format(existing.as_ref());
245                    let new_fmt = id_format(new_id);
246                    if existing_fmt != new_fmt {
247                        tracing::error!(
248                            shard_id = event.shard_id,
249                            existing = %existing,
250                            new = %new_id,
251                            existing_format = ?existing_fmt,
252                            new_format = ?new_fmt,
253                            "stream id format change detected — likely a \
254                             backend migration (e.g. JetStream → Redis). \
255                             Refusing to advance the cursor; operator must \
256                             explicitly reset to consume from the new \
257                             backend.",
258                        );
259                        continue;
260                    }
261                    if compare_stream_ids(existing.as_ref(), new_id) == CmpOrdering::Less {
262                        self.positions.insert(event.shard_id, Arc::from(new_id));
263                    }
264                    // Existing is >= new_id under the structured
265                    // comparator — don't regress.
266                }
267                None => {
268                    self.positions.insert(event.shard_id, Arc::from(new_id));
269                }
270            }
271        }
272    }
273}
274
275/// Request for consuming events.
276#[derive(Debug, Clone, Default)]
277pub struct ConsumeRequest {
278    /// Start cursor (opaque to caller). None means from the beginning.
279    pub from_id: Option<String>,
280    /// Maximum number of events to return.
281    pub limit: usize,
282    /// Optional filter to apply.
283    pub filter: Option<Filter>,
284    /// Ordering mode.
285    pub ordering: Ordering,
286    /// Specific shards to poll. None means all shards.
287    pub shards: Option<Vec<u16>>,
288}
289
290impl ConsumeRequest {
291    /// Create a new consume request.
292    pub fn new(limit: usize) -> Self {
293        Self {
294            limit,
295            ..Default::default()
296        }
297    }
298
299    /// Set the starting cursor.
300    pub fn from(mut self, cursor: impl Into<String>) -> Self {
301        self.from_id = Some(cursor.into());
302        self
303    }
304
305    /// Set the filter.
306    pub fn filter(mut self, filter: Filter) -> Self {
307        self.filter = Some(filter);
308        self
309    }
310
311    /// Set the ordering mode.
312    pub fn ordering(mut self, ordering: Ordering) -> Self {
313        self.ordering = ordering;
314        self
315    }
316
317    /// Set specific shards to poll.
318    pub fn shards(mut self, shards: Vec<u16>) -> Self {
319        self.shards = Some(shards);
320        self
321    }
322}
323
324/// Response from consuming events.
325#[derive(Debug, Clone)]
326pub struct ConsumeResponse {
327    /// Events matching the request.
328    pub events: Vec<StoredEvent>,
329    /// Cursor for the next poll. None if no events returned.
330    pub next_id: Option<String>,
331    /// True if there are more events available.
332    pub has_more: bool,
333    /// `true` if the per-shard fetch was clamped by the internal
334    /// `PER_SHARD_FETCH_CAP` (10 000). Callers requesting very
335    /// large `limit` values across few shards may receive fewer
336    /// events than `limit` per `poll()` even when the underlying
337    /// streams have more — pagination via `next_id` still works.
338    ///
339    /// Pre-fix this clamp was silent. The default is
340    /// `false`; tools building observability around large polls
341    /// can detect under-delivery via this flag.
342    pub truncated_at_per_shard_cap: bool,
343    /// Shards that reported `has_more=true` but contributed no
344    /// events and no cursor advance to this poll. The merger
345    /// suppresses the aggregate `has_more` flag for caller-
346    /// protection (preventing infinite loops), but operators
347    /// monitoring adapter health should know which shards are
348    /// stuck.
349    ///
350    /// Pre-fix the suppression was logged at warn but
351    /// invisible to callers. Empty on the happy path; populated
352    /// only when a stall was detected and suppressed.
353    pub stalled_shards: Vec<u16>,
354    /// Shards whose adapter call returned an error during this
355    /// poll. The merger logs each error at WARN and continues
356    /// with the surviving shards, so the response's `events`
357    /// can come from a strict subset of the configured shards
358    /// and silently miss data the operator expected to see.
359    /// Operators monitoring adapter health need to know WHICH
360    /// shards failed (not just that something logged a warn) so
361    /// they can correlate alerts with specific Redis / JetStream
362    /// nodes.
363    ///
364    /// Pre-fix this signal lived only in the warn log; an
365    /// observer parsing `ConsumeResponse` saw a clean partial-
366    /// shards response with no field indicating *which* shards
367    /// were missing, in contrast to `stalled_shards` which IS
368    /// surfaced. Empty on the happy path; populated only when at
369    /// least one shard's poll errored.
370    pub failed_shards: Vec<u16>,
371}
372
373impl ConsumeResponse {
374    /// Create an empty response.
375    pub fn empty() -> Self {
376        Self {
377            events: Vec::new(),
378            next_id: None,
379            has_more: false,
380            truncated_at_per_shard_cap: false,
381            stalled_shards: Vec::new(),
382            failed_shards: Vec::new(),
383        }
384    }
385}
386
387/// Internal cap on per-shard `direct_get` / `XRANGE` fetch sizes,
388/// applied in `PollMerger::poll`. Bounds the adapter's per-call
389/// memory pressure for a single poll. Callers needing larger
390/// effective limits should paginate.
391///
392/// Marked `#[doc(hidden)]` because the value is an internal
393/// tuning knob, not part of the consumer's public API. Surfacing
394/// it on the docs would invite downstreams to match against it
395/// and turn a silent tuning change into a breaking-change
396/// negotiation. Callers that need to know whether a poll was
397/// truncated should read `ConsumeResponse::truncated_at_per_shard_cap`
398/// rather than comparing against this constant directly.
399#[doc(hidden)]
400pub const PER_SHARD_FETCH_CAP: usize = 10_000;
401
402/// Match a `StoredEvent` against a filter, surfacing parse failures.
403///
404/// Returns `true` iff the event parses as JSON AND the filter matches
405/// the parsed value. A parse failure is logged at WARN with the event's
406/// id and shard so on-disk corruption or framing bugs in upstream
407/// adapters are observable from the filtered-poll path; without this,
408/// corrupt events were silently dropped from filtered results while the
409/// unfiltered path still returned them — a confusing inconsistency.
410fn event_matches_filter(event: &StoredEvent, filter: &Filter) -> bool {
411    match event.parse() {
412        Ok(value) => filter.matches(&value),
413        Err(e) => {
414            tracing::warn!(
415                event_id = %event.id,
416                shard_id = event.shard_id,
417                error = %e,
418                "dropping unparseable event from filtered poll result"
419            );
420            false
421        }
422    }
423}
424
425/// Poll merger for cross-shard aggregation.
426pub struct PollMerger {
427    /// Adapter for polling shards.
428    adapter: Arc<dyn Adapter>,
429    /// Active shard IDs to poll when the request omits an explicit
430    /// `shards` list.
431    ///
432    /// Previously stored only `num_shards: u16` and generated
433    /// `(0..num_shards)` on every default-shards poll. After a dynamic
434    /// scale-down (`ShardMapper::scale_down` evicts the lowest-weight
435    /// shard, not necessarily the highest id), the active id set can
436    /// become sparse — e.g. `{1, 2}` after id 0 was drained — but
437    /// `num_shards == 2` still produces `[0, 1]`, polling a stale or
438    /// nonexistent shard 0 and skipping the live shard 2 entirely.
439    /// Captured at construction; the bus replaces the merger via
440    /// `ArcSwap` whenever topology changes (`add_shard`,
441    /// `remove_shard_internal`).
442    shard_ids: Vec<u16>,
443}
444
445impl PollMerger {
446    /// Create a new poll merger.
447    ///
448    /// `shard_ids` should be the snapshot of currently-active shard IDs
449    /// (e.g. `ShardManager::shard_ids()`). Passing `0..num_shards` is
450    /// only correct when ids are guaranteed dense from 0 — i.e. the
451    /// static-shards path with no scaling.
452    pub fn new(adapter: Arc<dyn Adapter>, shard_ids: Vec<u16>) -> Self {
453        Self { adapter, shard_ids }
454    }
455
456    /// Poll events according to the request.
457    pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
458        if request.limit == 0 {
459            return Ok(ConsumeResponse::empty());
460        }
461
462        // Decode cursor
463        let cursor = match &request.from_id {
464            Some(s) => CompositeCursor::decode(s)?,
465            None => CompositeCursor::new(),
466        };
467
468        // Determine which shards to poll
469        let shards: Vec<u16> = request
470            .shards
471            .clone()
472            .unwrap_or_else(|| self.shard_ids.clone());
473
474        if shards.is_empty() {
475            return Ok(ConsumeResponse::empty());
476        }
477
478        // Calculate per-shard limit (over-fetch to account for filtering)
479        // Use ceiling division to avoid truncating to 0 when limit < shard count.
480        //
481        // Pre-fix this `min(10_000)` clamp was silent —
482        // a caller with `limit=200_000` over 10 shards expected
483        // 20 000/shard plus over-fetch but got 10 000/shard with
484        // no diagnostic. Track whether the clamp triggered and
485        // surface it on the response so callers building
486        // observability around large polls can detect under-
487        // delivery.
488        let over_fetch_factor = if request.filter.is_some() { 3 } else { 2 };
489        let unclamped_per_shard = request
490            .limit
491            .div_ceil(shards.len())
492            .max(1)
493            .saturating_mul(over_fetch_factor);
494        let per_shard_limit = unclamped_per_shard.min(PER_SHARD_FETCH_CAP);
495        let truncated_at_per_shard_cap = unclamped_per_shard > PER_SHARD_FETCH_CAP;
496
497        // Poll all shards in parallel. Each future borrows its start
498        // position directly from `cursor` (which outlives `join_all` below),
499        // avoiding a per-shard `String` allocation on every poll.
500        let poll_futures: Vec<_> = shards
501            .iter()
502            .map(|&shard_id| {
503                let adapter = self.adapter.clone();
504                let from: Option<&str> = cursor.get(shard_id);
505                async move {
506                    let result = adapter.poll_shard(shard_id, from, per_shard_limit).await;
507                    (shard_id, result)
508                }
509            })
510            .collect();
511
512        let shard_results: Vec<(u16, Result<ShardPollResult, AdapterError>)> =
513            futures::future::join_all(poll_futures).await;
514
515        // Collect results, tracking errors. Pre-allocate to the exact total
516        // event count so extend() below never reallocates.
517        let total_events: usize = shard_results
518            .iter()
519            .filter_map(|(_, r)| r.as_ref().ok().map(|sr| sr.events.len()))
520            .sum();
521        let mut all_events = Vec::with_capacity(total_events);
522        let mut any_has_more = false;
523        // Track which shards reported `has_more=true` so
524        // we can surface them on the response when the merger
525        // suppresses has_more for caller-protection. Pre-fix, an
526        // adapter stuck reporting `has_more=true` with no events
527        // and no cursor advance was logged at warn but invisible
528        // to callers — they saw a clean "no more events" and
529        // exited.
530        let mut shards_reporting_has_more: Vec<u16> = Vec::new();
531        // Per-shard adapter errors. Pre-fix these were logged at
532        // warn and then dropped on the floor; the response's
533        // `events` was a strict subset of the configured shards
534        // with no field indicating WHICH shards were missing.
535        // Surface the failed shard ids so observers can correlate
536        // alerts with specific Redis / JetStream nodes (parallel
537        // to the existing `stalled_shards` field).
538        let mut failed_shards: Vec<u16> = Vec::new();
539        // `new_cursor` (fetched-position tracking) is only consulted on the
540        // filter path — building it for unfiltered polls wastes a full
541        // HashMap clone plus a `set()` per shard every poll.
542        let mut new_cursor = if request.filter.is_some() {
543            Some(cursor.clone())
544        } else {
545            None
546        };
547
548        for (shard_id, result) in shard_results {
549            match result {
550                Ok(shard_result) => {
551                    // Destructure to move `next_id` out without cloning the
552                    // String that the adapter already allocated for us.
553                    let ShardPollResult {
554                        events,
555                        next_id,
556                        has_more,
557                    } = shard_result;
558                    if let (Some(nc), Some(next_id)) = (new_cursor.as_mut(), next_id) {
559                        nc.set(shard_id, next_id);
560                    }
561                    if has_more {
562                        any_has_more = true;
563                        shards_reporting_has_more.push(shard_id);
564                    }
565                    all_events.extend(events);
566                }
567                Err(e) => {
568                    tracing::warn!(
569                        shard_id = shard_id,
570                        error = %e,
571                        "Failed to poll shard, skipping"
572                    );
573                    failed_shards.push(shard_id);
574                    // Continue with other shards
575                }
576            }
577        }
578
579        // Apply filter.
580        //
581        // IMPORTANT: Use `new_cursor` (which tracks fetched positions) as
582        // the base cursor so that shards whose events are entirely filtered
583        // out still advance past those events. Without this, filtered-out
584        // events would be re-fetched on every subsequent poll, causing an
585        // infinite loop.
586        //
587        // Parse failures: a `StoredEvent` whose `raw` bytes don't
588        // deserialize as JSON cannot match a filter, so it is dropped
589        // from the filtered result. Previously this drop was silent
590        // (`unwrap_or(false)`), making on-disk corruption or
591        // adapter-side framing bugs invisible to operators — only
592        // *unfiltered* polls would surface the bad event.
593        //
594        // The previous `Ordering::None` path had a `break` once
595        // `kept.len() >= limit + 1`, which discarded events from later
596        // shards without ever filtering them. Combined with the cursor
597        // advancing past every fetched event, that meant matching
598        // events on un-inspected shards were silently lost. The fix
599        // uses a single full `retain` pass for both ordering modes;
600        // the `lazy parse` micro-optimization is gone, but per-event
601        // filter-matching is cheap and consistent semantics with the
602        // sort path is worth more than parse-skip on over-fetches.
603        if let Some(filter) = &request.filter {
604            all_events.retain(|e| event_matches_filter(e, filter));
605        }
606
607        // Apply ordering
608        match request.ordering {
609            Ordering::None => {
610                // Keep arbitrary order
611            }
612            Ordering::InsertionTs => {
613                // `insertion_ts` is monotonic *per shard*, not
614                // globally (see `event.rs:233`), so two events from
615                // different shards can carry the same timestamp. With
616                // a stable sort on `insertion_ts` alone, ties were
617                // broken by the input order — which depends on
618                // `futures::future::join_all`'s completion ordering
619                // and is non-deterministic across polls. Combined
620                // with `truncate(limit)` and the cursor-rollback step,
621                // the same logical event could be returned twice or
622                // skipped at the limit boundary across consecutive
623                // polls.
624                //
625                // Add `(shard_id, id)` as deterministic
626                // tiebreakers. `id` is the storage backend's
627                // identifier and is unique within a shard, so the
628                // composite is a strict total order.
629                //
630                // The id tiebreak routes through
631                // `compare_stream_ids`, which understands the
632                // unpadded numeric formats both built-in adapters
633                // emit (Redis `<ms>-<seq>`, JetStream `<u64>`) and
634                // falls back to lex for opaque ids (ULID, UUIDv7,
635                // hex digests). The `(insertion_ts, shard_id)`
636                // chain resolves the common cases first; when two
637                // events from the same shard land at the same
638                // `insertion_ts` (rare but legal at millisecond
639                // granularity), the structured id compare is
640                // correct on every adapter format.
641                all_events.sort_by(|a, b| {
642                    a.insertion_ts
643                        .cmp(&b.insertion_ts)
644                        .then(a.shard_id.cmp(&b.shard_id))
645                        .then(compare_stream_ids(&a.id, &b.id))
646                });
647            }
648        }
649
650        // Track per-shard match counts *before* truncate. After
651        // truncation, any shard whose match count shrank means matches
652        // were dropped — and those matches must be re-fetched on the
653        // next poll, otherwise they are silently lost (the cursor
654        // would otherwise advance past them via `new_cursor`).
655        let mut matched_per_shard: std::collections::HashMap<u16, usize> =
656            std::collections::HashMap::new();
657        if request.filter.is_some() {
658            for e in &all_events {
659                *matched_per_shard.entry(e.shard_id).or_insert(0) += 1;
660            }
661        }
662
663        // Truncate to requested limit
664        let had_extra = all_events.len() > request.limit;
665        all_events.truncate(request.limit);
666
667        // Build the final cursor.
668        //
669        // With filtering: start from `new_cursor` (fetched positions) so
670        // shards whose events were entirely filtered out advance past
671        // them. Then:
672        //   1. For shards that had matches truncated (returned <
673        //      total_matched), roll the cursor *back* to the original
674        //      pre-poll position. The override step then bumps it
675        //      forward to the last returned match for that shard, so
676        //      the unreturned matches re-appear on the next poll.
677        //   2. Override with the last *returned* event id per shard —
678        //      this also prevents skipping matching events that were
679        //      fetched but truncated by the limit on shards that did
680        //      land in the returned set.
681        //
682        // Without filtering: start from the original `cursor` so shards
683        // with no returned events (due to limit truncation) don't skip
684        // ahead.
685        let mut final_cursor = match new_cursor {
686            Some(nc) => nc,
687            None => cursor.clone(),
688        };
689
690        // Step 1: rollback for shards with truncated matches. We
691        // track which shards we rolled back here so Step 2 only
692        // overrides those shards (rather than every shard with a
693        // returned event, which would throw away the adapter's
694        // `next_id` advance for shards that returned all their
695        // matches).
696        let mut rolled_back: std::collections::HashSet<u16> = std::collections::HashSet::new();
697        if request.filter.is_some() && had_extra {
698            let mut returned_per_shard: std::collections::HashMap<u16, usize> =
699                std::collections::HashMap::new();
700            for e in &all_events {
701                *returned_per_shard.entry(e.shard_id).or_insert(0) += 1;
702            }
703            for (shard_id, &total_matched) in &matched_per_shard {
704                let returned = returned_per_shard.get(shard_id).copied().unwrap_or(0);
705                if returned < total_matched {
706                    // Some matches for this shard were truncated. Roll
707                    // back to the original cursor so they're re-fetched.
708                    // The override below will move us forward to the
709                    // last *returned* match (if any), so we still make
710                    // progress per poll.
711                    match cursor.positions.get(shard_id) {
712                        Some(orig) => final_cursor.set(*shard_id, orig.clone()),
713                        None => {
714                            final_cursor.positions.remove(shard_id);
715                        }
716                    }
717                    rolled_back.insert(*shard_id);
718                }
719            }
720        }
721
722        // Step 2: override to last returned event id per shard.
723        // Only the last returned event per shard matters for the
724        // cursor, so iterate in reverse and skip shards already seen.
725        // This reduces id clones from O(all_events.len()) to
726        // O(shards.len()).
727        //
728        // When the filter path is active, Step 1 has already
729        // populated `final_cursor` with the adapter's `next_id`
730        // (a position past the last *fetched* event for each
731        // shard). A blanket Step 2 override here would move the
732        // cursor back to the last *matched* event id, which is
733        // BEHIND the last fetched event for any shard with
734        // non-matched events — subsequent polls would re-fetch and
735        // re-filter those non-matches, wasting work proportional
736        // to `over_fetch_factor` on low-match-rate streams. So we
737        // only Step 2-override for shards that were actually
738        // rolled back in Step 1 (those need a forward push past
739        // the last *returned* match), OR when the filter path
740        // wasn't used at all (filter is None).
741        //
742        // For filter=None, the previous Step-2 behavior was correct
743        // — `final_cursor` started as `cursor.clone()` (no
744        // `new_cursor` advance) and the only progress signal is
745        // the last returned event id.
746        let mut seen_shards: std::collections::HashSet<u16> =
747            std::collections::HashSet::with_capacity(shards.len());
748        for event in all_events.iter().rev() {
749            if seen_shards.insert(event.shard_id) {
750                let should_override =
751                    request.filter.is_none() || rolled_back.contains(&event.shard_id);
752                if should_override {
753                    final_cursor.set(event.shard_id, event.id.clone());
754                }
755                // Else: the adapter's `next_id` (already in
756                // `final_cursor` via the `new_cursor` initial
757                // value) is more advanced than the last matched
758                // event — preserve it.
759            }
760        }
761
762        let cursor_advanced = final_cursor.positions != cursor.positions;
763        // When filtering removed everything but we did advance past fetched
764        // events, signal has_more so the caller keeps polling forward.
765        let all_filtered = request.filter.is_some() && all_events.is_empty() && cursor_advanced;
766        // Previously `has_more = any_has_more || had_extra ||
767        // all_filtered`. If a single adapter returned
768        // `ShardPollResult { events: [], next_id: None,
769        // has_more: true }` (legal under the trait contract — nothing
770        // forbids it), then `any_has_more=true` propagated even
771        // though we made *no* progress. The caller observed
772        // `(has_more=true, next_id=None)` and re-polled from the
773        // same starting cursor indefinitely.
774        //
775        // Suppress `has_more` when the merger itself made no progress
776        // at all (no events returned AND the cursor didn't advance).
777        // The caller then sees a clean "nothing to do right now"
778        // response and must back off rather than spin.
779        let we_made_progress = !all_events.is_empty() || cursor_advanced;
780        let has_more = (any_has_more || had_extra || all_filtered) && we_made_progress;
781        // When we suppress has_more for caller-protection
782        // (an adapter stuck at has_more=true with no progress),
783        // surface the offending shard ids on the response so
784        // operators can alert. Pre-fix the suppression was warn-
785        // logged only.
786        let stalled_shards: Vec<u16> = if any_has_more && !we_made_progress {
787            tracing::warn!(
788                stalled_shards = ?shards_reporting_has_more,
789                "PollMerger: an adapter reported has_more=true with no events \
790                 and no cursor advance — suppressing to avoid caller infinite-loop"
791            );
792            shards_reporting_has_more
793        } else {
794            Vec::new()
795        };
796        // Return the cursor even when all events were filtered out, so the
797        // caller advances past the filtered region instead of re-fetching
798        // the same events forever. When the poll made no progress at
799        // all, echo back the caller's input cursor (if any) instead of
800        // returning `None` — pre-fix a stalled poll dropped the cursor,
801        // so a caller that interpreted `next_id == None` as "no events,
802        // restart from the beginning" silently regressed pagination
803        // across the stall. Echoing preserves the cursor across stalls.
804        let next_id = if we_made_progress {
805            Some(final_cursor.encode()?)
806        } else {
807            request.from_id.clone()
808        };
809
810        Ok(ConsumeResponse {
811            events: all_events,
812            next_id,
813            has_more,
814            truncated_at_per_shard_cap,
815            stalled_shards,
816            failed_shards,
817        })
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use super::*;
824    use serde_json::json;
825
826    #[test]
827    fn test_cursor_encode_decode() {
828        let mut cursor = CompositeCursor::new();
829        cursor.set(0, "1702123456789-0".to_string());
830        cursor.set(1, "1702123456790-0".to_string());
831        cursor.set(5, "1702123456795-0".to_string());
832
833        let encoded = cursor.encode().unwrap();
834        let decoded = CompositeCursor::decode(&encoded).unwrap();
835
836        assert_eq!(decoded.get(0), Some("1702123456789-0"));
837        assert_eq!(decoded.get(1), Some("1702123456790-0"));
838        assert_eq!(decoded.get(5), Some("1702123456795-0"));
839        assert_eq!(decoded.get(2), None);
840    }
841
842    #[test]
843    fn test_cursor_update_from_events() {
844        let mut cursor = CompositeCursor::new();
845
846        let events = vec![
847            StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
848            StoredEvent::from_value("200-0".to_string(), json!({}), 200, 1),
849            // This used to be "later event in shard 0" by
850            // virtue of being LAST in the input slice, but its id
851            // (150-0) is stream-order BEFORE 200-0 — wait, this is
852            // for shard 0 not shard 1. shard 0 only had 100-0
853            // before this; 150-0 > 100-0, so it advances normally.
854            StoredEvent::from_value("150-0".to_string(), json!({}), 150, 0),
855        ];
856
857        cursor.update_from_events(&events);
858
859        // Should have the highest id seen for each shard.
860        assert_eq!(cursor.get(0), Some("150-0"));
861        assert_eq!(cursor.get(1), Some("200-0"));
862    }
863
864    /// Cursor must NOT regress when events arrive in a
865    /// non-ascending order for the same shard. Pre-fix the cursor
866    /// for shard 0 would land on `100-0` (the last item in the
867    /// slice), regressing past `200-0`.
868    #[test]
869    fn cursor_does_not_regress_on_unsorted_per_shard_events() {
870        let mut cursor = CompositeCursor::new();
871        // For shard 0: stream-order 100-0 → 200-0, but the slice
872        // has them reversed (consumer received 200-0 first,
873        // then 100-0 because of merge ordering).
874        let events = vec![
875            StoredEvent::from_value("200-0".to_string(), json!({}), 200, 0),
876            StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
877        ];
878        cursor.update_from_events(&events);
879        assert_eq!(
880            cursor.get(0),
881            Some("200-0"),
882            "cursor must hold the highest id, not the last-in-slice id",
883        );
884    }
885
886    /// A partial overlap (advance for one shard, regression
887    /// attempt for another shard) must keep both cursors at their
888    /// respective max.
889    #[test]
890    fn cursor_compare_and_set_is_per_shard() {
891        let mut cursor = CompositeCursor::new();
892        cursor.update_from_events(&[
893            StoredEvent::from_value("500-0".to_string(), json!({}), 500, 0),
894            StoredEvent::from_value("500-0".to_string(), json!({}), 500, 1),
895        ]);
896        // Now "advance" with one regress attempt for shard 0 + a
897        // legitimate advance for shard 1.
898        cursor.update_from_events(&[
899            StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0), // regress
900            StoredEvent::from_value("700-0".to_string(), json!({}), 700, 1), // advance
901        ]);
902        assert_eq!(cursor.get(0), Some("500-0"), "shard 0 must not regress");
903        assert_eq!(cursor.get(1), Some("700-0"), "shard 1 must advance");
904    }
905
906    /// CR-1: pre-fix the cursor used `str::cmp` which inverts at
907    /// every decade rollover for unpadded numeric ids. JetStream
908    /// emits `seq.to_string()` (unpadded) — once seq=10 lands, lex
909    /// compare says `"10" < "9"` and the cursor freezes at "9".
910    #[test]
911    fn cursor_does_not_wedge_on_jetstream_decade_rollover() {
912        let mut cursor = CompositeCursor::new();
913        for seq in 1u64..=20 {
914            let ev = StoredEvent::from_value(seq.to_string(), json!({}), seq, 0);
915            cursor.update_from_events(&[ev]);
916        }
917        assert_eq!(
918            cursor.get(0),
919            Some("20"),
920            "cursor must reach 20; lex compare would wedge at \"9\""
921        );
922    }
923
924    /// CR-1: same hazard for Redis's `<ms>-<seq>` format when seq
925    /// rolls past a decade within a single ms.
926    #[test]
927    fn cursor_does_not_wedge_on_redis_seq_decade_rollover() {
928        let mut cursor = CompositeCursor::new();
929        // All within one ms — Redis collides on seq when many events
930        // hit in the same millisecond.
931        for seq in 1u64..=20 {
932            let id = format!("1700000000000-{}", seq);
933            let ev = StoredEvent::from_value(id, json!({}), 1700000000000, 0);
934            cursor.update_from_events(&[ev]);
935        }
936        assert_eq!(
937            cursor.get(0),
938            Some("1700000000000-20"),
939            "cursor must reach -20; lex compare would wedge at -9"
940        );
941    }
942
943    /// Regression: a backend migration (e.g. JetStream → Redis)
944    /// that lands an id of a different format than the existing
945    /// cursor must NOT silently advance OR silently stall. Pre-
946    /// fix `compare_stream_ids` fell through both structured
947    /// branches and hit the lex fallback: `"42" > "1700-0"` (because
948    /// `'4' > '1'`), so the CAS guard refused to update the
949    /// cursor. Result: the consumer kept seeing `"42"` forever
950    /// while the new Redis backend kept emitting Redis-formatted
951    /// ids, with no surfaced error.
952    ///
953    /// Post-fix: format-mismatch is detected explicitly and
954    /// surfaced via `tracing::error!`. The cursor stays at its
955    /// current value (so we don't regress), and an operator must
956    /// reset the cursor deliberately to consume from the new
957    /// backend. This test pins the "stays at existing value"
958    /// half of the contract; the loud error is observability,
959    /// not behavior, so it isn't asserted here.
960    #[test]
961    fn cursor_refuses_to_advance_across_backend_format_change() {
962        let mut cursor = CompositeCursor::new();
963        // Cursor starts at JetStream-style numeric "42".
964        cursor.update_from_events(&[StoredEvent::from_value("42".to_string(), json!({}), 42, 0)]);
965        assert_eq!(cursor.get(0), Some("42"));
966
967        // A new event arrives in Redis format. Pre-fix this would
968        // hit the lex fallback and the cursor would refuse to
969        // advance silently; post-fix the format mismatch is
970        // detected and the cursor STILL refuses to advance, but
971        // a `tracing::error!` is emitted so operators see the
972        // backend migration.
973        cursor.update_from_events(&[StoredEvent::from_value(
974            "1700000000000-0".to_string(),
975            json!({}),
976            1700000000000,
977            0,
978        )]);
979
980        // Cursor must still be at the original numeric id (not
981        // the new Redis id, which would be a regression-like
982        // jump back in time, AND not unset, which would lose
983        // progress).
984        assert_eq!(
985            cursor.get(0),
986            Some("42"),
987            "regression: cursor must not silently advance through a \
988             backend-format change. The pre-fix lex fallback also \
989             happened to keep the existing value (by `'4' > '1'`), \
990             but only by accident; this test pins the explicit \
991             format-mismatch refusal."
992        );
993
994        // And the reverse direction: a Redis cursor confronted
995        // with a new numeric id must also stay put.
996        let mut cursor = CompositeCursor::new();
997        cursor.update_from_events(&[StoredEvent::from_value(
998            "1700000000000-0".to_string(),
999            json!({}),
1000            1700000000000,
1001            0,
1002        )]);
1003        cursor.update_from_events(&[StoredEvent::from_value(
1004            "9000".to_string(),
1005            json!({}),
1006            9000,
1007            0,
1008        )]);
1009        assert_eq!(
1010            cursor.get(0),
1011            Some("1700000000000-0"),
1012            "regression (reverse direction): Redis cursor must not be \
1013             silently overwritten by an incoming numeric id"
1014        );
1015    }
1016
1017    /// CR-1: cross-decade compare on JetStream-style ids.
1018    #[test]
1019    fn cursor_advances_from_unpadded_9_to_unpadded_10() {
1020        let mut cursor = CompositeCursor::new();
1021        cursor.update_from_events(&[StoredEvent::from_value("9".to_string(), json!({}), 9, 0)]);
1022        cursor.update_from_events(&[StoredEvent::from_value("10".to_string(), json!({}), 10, 0)]);
1023        assert_eq!(cursor.get(0), Some("10"));
1024    }
1025
1026    /// CR-1: ULID / opaque ids must still work via lex fallback.
1027    /// ULIDs are designed to be lex-sortable and we should NOT route
1028    /// them through the numeric parsers.
1029    #[test]
1030    fn cursor_advances_on_ulid_ids_via_lex_fallback() {
1031        let mut cursor = CompositeCursor::new();
1032        // Two real ULIDs in ascending stream order. Different
1033        // timestamp prefixes ensure lex order matches stream order.
1034        let earlier = "01HZ0000000000000000000000";
1035        let later = "01HZ0000010000000000000000";
1036        cursor.update_from_events(&[StoredEvent::from_value(
1037            earlier.to_string(),
1038            json!({}),
1039            1,
1040            0,
1041        )]);
1042        cursor.update_from_events(&[StoredEvent::from_value(later.to_string(), json!({}), 2, 0)]);
1043        assert_eq!(cursor.get(0), Some(later));
1044        // Now feed the earlier id again — must NOT regress.
1045        cursor.update_from_events(&[StoredEvent::from_value(
1046            earlier.to_string(),
1047            json!({}),
1048            1,
1049            0,
1050        )]);
1051        assert_eq!(cursor.get(0), Some(later));
1052    }
1053
1054    /// CR-1: direct `compare_stream_ids` unit coverage.
1055    #[test]
1056    fn compare_stream_ids_handles_known_formats() {
1057        // JetStream-style: numeric compare, padded-or-not.
1058        assert_eq!(compare_stream_ids("9", "10"), CmpOrdering::Less);
1059        assert_eq!(compare_stream_ids("10", "9"), CmpOrdering::Greater);
1060        assert_eq!(compare_stream_ids("100", "100"), CmpOrdering::Equal);
1061        // Padding-insensitive: leading zeros do not change numeric value.
1062        assert_eq!(compare_stream_ids("00000010", "9"), CmpOrdering::Greater);
1063
1064        // Redis-style: tuple compare on (ms, seq).
1065        assert_eq!(
1066            compare_stream_ids("1700-9", "1700-10"),
1067            CmpOrdering::Less,
1068            "seq must compare numerically, not lex"
1069        );
1070        assert_eq!(
1071            compare_stream_ids("1700-9", "1701-0"),
1072            CmpOrdering::Less,
1073            "ms wins over seq"
1074        );
1075        assert_eq!(
1076            compare_stream_ids("1700-100", "1700-9"),
1077            CmpOrdering::Greater
1078        );
1079
1080        // ULID-shaped ids fall through to lex compare.
1081        let ulid_a = "01HZ0000000000000000000000";
1082        let ulid_b = "01HZ0000010000000000000000";
1083        assert_eq!(compare_stream_ids(ulid_a, ulid_b), CmpOrdering::Less);
1084
1085        // Mixed format (one Redis, one numeric) — neither structured
1086        // path applies, falls through to lex. Documented limitation.
1087        // (Adapters emit a single format, so this is pathological.)
1088        let _ = compare_stream_ids("1700-0", "9999");
1089    }
1090
1091    /// CR-1: in `Ordering::InsertionTs` mode, two events from the
1092    /// same shard with the same `insertion_ts` and unpadded numeric
1093    /// ids must sort by numeric id, not lex id.
1094    #[test]
1095    fn insertion_ts_sort_breaks_tie_on_id_numerically() {
1096        // Same shard, same ts — only the id tiebreak fires.
1097        let mut events = [
1098            StoredEvent::from_value("10".to_string(), json!({}), 1000, 0),
1099            StoredEvent::from_value("9".to_string(), json!({}), 1000, 0),
1100            StoredEvent::from_value("11".to_string(), json!({}), 1000, 0),
1101        ];
1102        events.sort_by(|a, b| {
1103            a.insertion_ts
1104                .cmp(&b.insertion_ts)
1105                .then(a.shard_id.cmp(&b.shard_id))
1106                .then(compare_stream_ids(&a.id, &b.id))
1107        });
1108        let ordered: Vec<&str> = events.iter().map(|e| e.id.as_str()).collect();
1109        assert_eq!(
1110            ordered,
1111            vec!["9", "10", "11"],
1112            "id tiebreak must be numeric, not lex"
1113        );
1114    }
1115
1116    #[test]
1117    fn test_consume_request_builder() {
1118        let request = ConsumeRequest::new(100)
1119            .from("some_cursor")
1120            .ordering(Ordering::InsertionTs)
1121            .shards(vec![0, 1, 2])
1122            .filter(Filter::eq("type", json!("token")));
1123
1124        assert_eq!(request.limit, 100);
1125        assert_eq!(request.from_id, Some("some_cursor".to_string()));
1126        assert_eq!(request.ordering, Ordering::InsertionTs);
1127        assert_eq!(request.shards, Some(vec![0, 1, 2]));
1128        assert!(request.filter.is_some());
1129    }
1130
1131    #[test]
1132    fn test_invalid_cursor() {
1133        let result = CompositeCursor::decode("not_valid_base64!!!");
1134        assert!(result.is_err());
1135
1136        // Valid base64 but not valid JSON
1137        let result = CompositeCursor::decode(&BASE64.encode(b"not json"));
1138        assert!(result.is_err());
1139    }
1140
1141    /// Regression: non-canonical shard-id keys must be rejected
1142    /// at decode time. Pre-fix `serde_json::from_slice::<HashMap<u16,
1143    /// _>>` parsed `"00"` and `"0"` both as u16 0; the second
1144    /// insert silently overwrote the first, leaving the consumer
1145    /// with whichever entry happened to come later in the JSON.
1146    /// Two distinct stringifications collapsed to one shard
1147    /// position with no surfaced error.
1148    #[test]
1149    fn cursor_decode_rejects_non_canonical_shard_keys() {
1150        // Construct a JSON cursor with `"00"` aliasing `"0"`.
1151        // Both round-trip to u16 0 under standard parsing.
1152        let hostile = br#"{"00":"id_a","1":"id_b"}"#;
1153        let encoded = BASE64.encode(hostile);
1154        let result = CompositeCursor::decode(&encoded);
1155        assert!(
1156            result.is_err(),
1157            "non-canonical shard key `\"00\"` must reject; \
1158             pre-fix this silently parsed as shard 0"
1159        );
1160
1161        // Boundary: the canonical "0" (no leading zero) decodes
1162        // normally.
1163        let canonical = br#"{"0":"id_a","1":"id_b"}"#;
1164        let encoded_ok = BASE64.encode(canonical);
1165        let cursor =
1166            CompositeCursor::decode(&encoded_ok).expect("canonical shard keys must decode cleanly");
1167        assert_eq!(cursor.get(0), Some("id_a"));
1168        assert_eq!(cursor.get(1), Some("id_b"));
1169    }
1170
1171    #[test]
1172    fn test_composite_cursor_new() {
1173        let cursor = CompositeCursor::new();
1174        assert!(cursor.positions.is_empty());
1175    }
1176
1177    #[test]
1178    fn test_composite_cursor_default() {
1179        let cursor = CompositeCursor::default();
1180        assert!(cursor.positions.is_empty());
1181    }
1182
1183    #[test]
1184    fn test_composite_cursor_get_nonexistent() {
1185        let cursor = CompositeCursor::new();
1186        assert!(cursor.get(0).is_none());
1187        assert!(cursor.get(100).is_none());
1188    }
1189
1190    #[test]
1191    fn test_composite_cursor_set_overwrites() {
1192        let mut cursor = CompositeCursor::new();
1193        cursor.set(0, "first".to_string());
1194        assert_eq!(cursor.get(0), Some("first"));
1195
1196        cursor.set(0, "second".to_string());
1197        assert_eq!(cursor.get(0), Some("second"));
1198    }
1199
1200    #[test]
1201    fn test_composite_cursor_empty_encode() {
1202        let cursor = CompositeCursor::new();
1203        let encoded = cursor.encode().unwrap();
1204        let decoded = CompositeCursor::decode(&encoded).unwrap();
1205        assert!(decoded.positions.is_empty());
1206    }
1207
1208    #[test]
1209    fn test_composite_cursor_clone() {
1210        let mut cursor = CompositeCursor::new();
1211        cursor.set(0, "pos-0".to_string());
1212        cursor.set(1, "pos-1".to_string());
1213
1214        let cloned = cursor.clone();
1215        assert_eq!(cloned.get(0), Some("pos-0"));
1216        assert_eq!(cloned.get(1), Some("pos-1"));
1217    }
1218
1219    #[test]
1220    fn test_composite_cursor_debug() {
1221        let mut cursor = CompositeCursor::new();
1222        cursor.set(0, "test".to_string());
1223        let debug = format!("{:?}", cursor);
1224        assert!(debug.contains("CompositeCursor"));
1225        assert!(debug.contains("positions"));
1226    }
1227
1228    #[test]
1229    fn test_ordering_default() {
1230        let ordering = Ordering::default();
1231        assert_eq!(ordering, Ordering::None);
1232    }
1233
1234    #[test]
1235    fn test_ordering_clone_copy() {
1236        let ordering = Ordering::InsertionTs;
1237        let cloned = ordering;
1238        assert_eq!(cloned, Ordering::InsertionTs);
1239    }
1240
1241    #[test]
1242    fn test_ordering_debug() {
1243        assert!(format!("{:?}", Ordering::None).contains("None"));
1244        assert!(format!("{:?}", Ordering::InsertionTs).contains("InsertionTs"));
1245    }
1246
1247    #[test]
1248    fn test_consume_request_new() {
1249        let request = ConsumeRequest::new(50);
1250        assert_eq!(request.limit, 50);
1251        assert!(request.from_id.is_none());
1252        assert!(request.filter.is_none());
1253        assert_eq!(request.ordering, Ordering::None);
1254        assert!(request.shards.is_none());
1255    }
1256
1257    #[test]
1258    fn test_consume_request_default() {
1259        let request = ConsumeRequest::default();
1260        assert_eq!(request.limit, 0);
1261        assert!(request.from_id.is_none());
1262        assert!(request.filter.is_none());
1263        assert_eq!(request.ordering, Ordering::None);
1264        assert!(request.shards.is_none());
1265    }
1266
1267    #[test]
1268    fn test_consume_request_from_string() {
1269        let request = ConsumeRequest::new(10).from(String::from("cursor123"));
1270        assert_eq!(request.from_id, Some("cursor123".to_string()));
1271    }
1272
1273    #[test]
1274    fn test_consume_request_clone() {
1275        let request = ConsumeRequest::new(100)
1276            .from("cursor")
1277            .ordering(Ordering::InsertionTs)
1278            .shards(vec![0, 1]);
1279
1280        let cloned = request.clone();
1281        assert_eq!(cloned.limit, 100);
1282        assert_eq!(cloned.from_id, Some("cursor".to_string()));
1283        assert_eq!(cloned.ordering, Ordering::InsertionTs);
1284        assert_eq!(cloned.shards, Some(vec![0, 1]));
1285    }
1286
1287    #[test]
1288    fn test_consume_request_debug() {
1289        let request = ConsumeRequest::new(10);
1290        let debug = format!("{:?}", request);
1291        assert!(debug.contains("ConsumeRequest"));
1292        assert!(debug.contains("limit"));
1293    }
1294
1295    #[test]
1296    fn test_consume_response_empty() {
1297        let response = ConsumeResponse::empty();
1298        assert!(response.events.is_empty());
1299        assert!(response.next_id.is_none());
1300        assert!(!response.has_more);
1301    }
1302
1303    #[test]
1304    fn test_consume_response_clone() {
1305        let mut response = ConsumeResponse::empty();
1306        response.next_id = Some("cursor".to_string());
1307        response.has_more = true;
1308
1309        let cloned = response.clone();
1310        assert_eq!(cloned.next_id, Some("cursor".to_string()));
1311        assert!(cloned.has_more);
1312    }
1313
1314    #[test]
1315    fn test_consume_response_debug() {
1316        let response = ConsumeResponse::empty();
1317        let debug = format!("{:?}", response);
1318        assert!(debug.contains("ConsumeResponse"));
1319        assert!(debug.contains("events"));
1320    }
1321
1322    #[test]
1323    fn test_cursor_update_from_empty_events() {
1324        let mut cursor = CompositeCursor::new();
1325        cursor.set(0, "original".to_string());
1326
1327        let events: Vec<StoredEvent> = vec![];
1328        cursor.update_from_events(&events);
1329
1330        // Cursor should be unchanged
1331        assert_eq!(cursor.get(0), Some("original"));
1332    }
1333
1334    #[test]
1335    fn test_cursor_many_shards() {
1336        let mut cursor = CompositeCursor::new();
1337        for i in 0..100u16 {
1338            cursor.set(i, format!("pos-{}", i));
1339        }
1340
1341        let encoded = cursor.encode().unwrap();
1342        let decoded = CompositeCursor::decode(&encoded).unwrap();
1343
1344        for i in 0..100u16 {
1345            assert_eq!(decoded.get(i), Some(format!("pos-{}", i).as_str()));
1346        }
1347    }
1348
1349    #[test]
1350    fn test_consume_request_empty_shards() {
1351        let request = ConsumeRequest::new(100).shards(vec![]);
1352        assert_eq!(request.shards, Some(vec![]));
1353    }
1354
1355    #[test]
1356    fn test_consume_request_ordering_none() {
1357        let request = ConsumeRequest::new(100).ordering(Ordering::None);
1358        assert_eq!(request.ordering, Ordering::None);
1359    }
1360
1361    #[test]
1362    fn test_ordering_equality() {
1363        assert_eq!(Ordering::None, Ordering::None);
1364        assert_eq!(Ordering::InsertionTs, Ordering::InsertionTs);
1365        assert_ne!(Ordering::None, Ordering::InsertionTs);
1366    }
1367
1368    // Mock adapter for testing PollMerger
1369    use crate::adapter::{Adapter, ShardPollResult};
1370    use crate::error::AdapterError;
1371    use crate::event::Batch;
1372    use async_trait::async_trait;
1373    use std::collections::HashMap;
1374    use std::sync::RwLock;
1375
1376    struct MockAdapter {
1377        events: RwLock<HashMap<u16, Vec<StoredEvent>>>,
1378    }
1379
1380    impl MockAdapter {
1381        fn new() -> Self {
1382            Self {
1383                events: RwLock::new(HashMap::new()),
1384            }
1385        }
1386
1387        fn add_events(&self, shard_id: u16, events: Vec<StoredEvent>) {
1388            let mut map = self.events.write().unwrap();
1389            map.entry(shard_id).or_default().extend(events);
1390        }
1391    }
1392
1393    #[async_trait]
1394    impl Adapter for MockAdapter {
1395        async fn init(&mut self) -> Result<(), AdapterError> {
1396            Ok(())
1397        }
1398
1399        async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
1400            Ok(())
1401        }
1402
1403        async fn flush(&self) -> Result<(), AdapterError> {
1404            Ok(())
1405        }
1406
1407        async fn shutdown(&self) -> Result<(), AdapterError> {
1408            Ok(())
1409        }
1410
1411        async fn poll_shard(
1412            &self,
1413            shard_id: u16,
1414            from_id: Option<&str>,
1415            limit: usize,
1416        ) -> Result<ShardPollResult, AdapterError> {
1417            let map = self.events.read().unwrap();
1418            let events = map.get(&shard_id).cloned().unwrap_or_default();
1419
1420            // Filter by from_id if provided
1421            let filtered: Vec<_> = if let Some(from) = from_id {
1422                events
1423                    .into_iter()
1424                    .skip_while(|e| e.id != from)
1425                    .skip(1) // Skip the from_id itself
1426                    .collect()
1427            } else {
1428                events
1429            };
1430
1431            let has_more = filtered.len() > limit;
1432            let events: Vec<_> = filtered.into_iter().take(limit).collect();
1433            let next_id = events.last().map(|e| e.id.clone());
1434
1435            Ok(ShardPollResult {
1436                events,
1437                next_id,
1438                has_more,
1439            })
1440        }
1441
1442        fn name(&self) -> &'static str {
1443            "mock"
1444        }
1445    }
1446
1447    #[tokio::test]
1448    async fn test_poll_merger_new() {
1449        let adapter = Arc::new(MockAdapter::new());
1450        let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1451        assert_eq!(merger.shard_ids, vec![0, 1, 2, 3]);
1452    }
1453
1454    /// When the active shard id set is sparse (e.g. shard 0
1455    /// was scaled down, leaving `{1, 2}`), a poll with no explicit
1456    /// `request.shards` must hit shards 1 and 2 — not generate
1457    /// `[0, 1]` from a stale count and miss the live shard 2.
1458    ///
1459    /// Pre-fix, `PollMerger` stored only `num_shards: u16` and the
1460    /// default branch generated `(0..num_shards).collect()`, so
1461    /// shard 2's events were silently invisible to default-shards
1462    /// consumers after a scale-down.
1463    #[tokio::test]
1464    async fn poll_merger_default_shards_uses_active_id_set_after_scale_down() {
1465        let adapter = Arc::new(MockAdapter::new());
1466
1467        // Shard 0 (drained / no longer active): would mis-poll
1468        // pre-fix and could return stale data on adapters that
1469        // recreate streams on demand. We don't add events here.
1470        // Shard 1 + Shard 2: active set after a scale-down that
1471        // evicted shard 0.
1472        adapter.add_events(
1473            1,
1474            vec![StoredEvent::from_value(
1475                "1-a".to_string(),
1476                json!({"shard": 1}),
1477                100,
1478                1,
1479            )],
1480        );
1481        adapter.add_events(
1482            2,
1483            vec![StoredEvent::from_value(
1484                "2-a".to_string(),
1485                json!({"shard": 2}),
1486                200,
1487                2,
1488            )],
1489        );
1490
1491        // Sparse id set — shard 0 is NOT in the active list.
1492        let merger = PollMerger::new(adapter, vec![1, 2]);
1493
1494        // Default-shards request (no `shards` override).
1495        let request = ConsumeRequest::new(100);
1496        let response = merger.poll(request).await.unwrap();
1497
1498        let returned: std::collections::HashSet<u16> =
1499            response.events.iter().map(|e| e.shard_id).collect();
1500        assert!(
1501            returned.contains(&1),
1502            "default-shards poll must include shard 1 (active)",
1503        );
1504        assert!(
1505            returned.contains(&2),
1506            "default-shards poll must include shard 2 — pre-fix this was silently \
1507             skipped because the merger generated `0..num_shards` = `[0, 1]`",
1508        );
1509        assert!(
1510            !returned.contains(&0),
1511            "default-shards poll must NOT touch shard 0 — it was evicted",
1512        );
1513        assert_eq!(response.events.len(), 2);
1514    }
1515
1516    #[tokio::test]
1517    async fn test_poll_merger_empty_limit() {
1518        let adapter = Arc::new(MockAdapter::new());
1519        let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1520
1521        let request = ConsumeRequest::new(0);
1522        let response = merger.poll(request).await.unwrap();
1523
1524        assert!(response.events.is_empty());
1525        assert!(response.next_id.is_none());
1526        assert!(!response.has_more);
1527    }
1528
1529    #[tokio::test]
1530    async fn test_poll_merger_empty_shards() {
1531        let adapter = Arc::new(MockAdapter::new());
1532        let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1533
1534        let request = ConsumeRequest::new(100).shards(vec![]);
1535        let response = merger.poll(request).await.unwrap();
1536
1537        assert!(response.events.is_empty());
1538        assert!(response.next_id.is_none());
1539        assert!(!response.has_more);
1540    }
1541
1542    /// Regression: per-shard adapter errors must surface on
1543    /// `ConsumeResponse.failed_shards`, not silently disappear
1544    /// after a `tracing::warn!`. Pre-fix the merger logged the
1545    /// error and continued; the response carried events from
1546    /// the surviving shards with no field indicating WHICH
1547    /// shards failed (in contrast to `stalled_shards` which IS
1548    /// surfaced). Operators correlating alerts with specific
1549    /// Redis / JetStream nodes had to grep logs instead of
1550    /// reading a structured response field.
1551    #[tokio::test]
1552    async fn poll_response_surfaces_failed_shard_ids() {
1553        // Mock that fails on a specific shard id.
1554        struct FailingShardMock {
1555            inner: MockAdapter,
1556            fail_shard: u16,
1557        }
1558
1559        #[async_trait]
1560        impl Adapter for FailingShardMock {
1561            async fn init(&mut self) -> Result<(), AdapterError> {
1562                Ok(())
1563            }
1564            async fn on_batch(&self, _b: Batch) -> Result<(), AdapterError> {
1565                Ok(())
1566            }
1567            async fn flush(&self) -> Result<(), AdapterError> {
1568                Ok(())
1569            }
1570            async fn shutdown(&self) -> Result<(), AdapterError> {
1571                Ok(())
1572            }
1573            async fn poll_shard(
1574                &self,
1575                shard_id: u16,
1576                from_id: Option<&str>,
1577                limit: usize,
1578            ) -> Result<ShardPollResult, AdapterError> {
1579                if shard_id == self.fail_shard {
1580                    return Err(AdapterError::Transient(format!(
1581                        "synthetic failure on shard {shard_id}"
1582                    )));
1583                }
1584                self.inner.poll_shard(shard_id, from_id, limit).await
1585            }
1586            fn name(&self) -> &'static str {
1587                "failing-mock"
1588            }
1589        }
1590
1591        let inner = MockAdapter::new();
1592        // Shard 0 has events, shard 1 will fail, shard 2 has events.
1593        inner.add_events(
1594            0,
1595            vec![StoredEvent::from_value(
1596                "0-1".to_string(),
1597                json!({"shard": 0}),
1598                100,
1599                0,
1600            )],
1601        );
1602        inner.add_events(
1603            2,
1604            vec![StoredEvent::from_value(
1605                "2-1".to_string(),
1606                json!({"shard": 2}),
1607                100,
1608                2,
1609            )],
1610        );
1611
1612        let adapter = Arc::new(FailingShardMock {
1613            inner,
1614            fail_shard: 1,
1615        });
1616        let merger = PollMerger::new(adapter, vec![0, 1, 2]);
1617
1618        let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
1619
1620        // Surviving shards' events still come through.
1621        assert_eq!(
1622            response.events.len(),
1623            2,
1624            "events from non-failing shards must still be returned"
1625        );
1626
1627        // The failed shard id is surfaced on the response.
1628        assert_eq!(
1629            response.failed_shards,
1630            vec![1],
1631            "regression: failed_shards must list the shard whose adapter \
1632             errored. Pre-fix this list didn't exist; observers couldn't \
1633             tell which shard was missing without log scraping."
1634        );
1635    }
1636
1637    #[tokio::test]
1638    async fn test_poll_merger_with_events() {
1639        let adapter = Arc::new(MockAdapter::new());
1640
1641        // Add events to shard 0
1642        adapter.add_events(
1643            0,
1644            vec![
1645                StoredEvent::from_value("0-1".to_string(), json!({"type": "a"}), 100, 0),
1646                StoredEvent::from_value("0-2".to_string(), json!({"type": "b"}), 200, 0),
1647            ],
1648        );
1649
1650        // Add events to shard 1
1651        adapter.add_events(
1652            1,
1653            vec![StoredEvent::from_value(
1654                "1-1".to_string(),
1655                json!({"type": "c"}),
1656                150,
1657                1,
1658            )],
1659        );
1660
1661        let merger = PollMerger::new(adapter, vec![0, 1]);
1662
1663        let request = ConsumeRequest::new(100);
1664        let response = merger.poll(request).await.unwrap();
1665
1666        assert_eq!(response.events.len(), 3);
1667        assert!(response.next_id.is_some());
1668    }
1669
1670    #[tokio::test]
1671    async fn test_poll_merger_with_ordering() {
1672        let adapter = Arc::new(MockAdapter::new());
1673
1674        // Add events with different timestamps
1675        adapter.add_events(
1676            0,
1677            vec![
1678                StoredEvent::from_value("0-1".to_string(), json!({}), 300, 0),
1679                StoredEvent::from_value("0-2".to_string(), json!({}), 100, 0),
1680            ],
1681        );
1682        adapter.add_events(
1683            1,
1684            vec![StoredEvent::from_value(
1685                "1-1".to_string(),
1686                json!({}),
1687                200,
1688                1,
1689            )],
1690        );
1691
1692        let merger = PollMerger::new(adapter, vec![0, 1]);
1693
1694        let request = ConsumeRequest::new(100).ordering(Ordering::InsertionTs);
1695        let response = merger.poll(request).await.unwrap();
1696
1697        // Events should be sorted by insertion_ts
1698        assert_eq!(response.events.len(), 3);
1699        assert_eq!(response.events[0].insertion_ts, 100);
1700        assert_eq!(response.events[1].insertion_ts, 200);
1701        assert_eq!(response.events[2].insertion_ts, 300);
1702    }
1703
1704    #[tokio::test]
1705    async fn test_poll_merger_with_filter() {
1706        let adapter = Arc::new(MockAdapter::new());
1707
1708        adapter.add_events(
1709            0,
1710            vec![
1711                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
1712                StoredEvent::from_value("0-2".to_string(), json!({"type": "message"}), 200, 0),
1713                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 300, 0),
1714            ],
1715        );
1716
1717        let merger = PollMerger::new(adapter, vec![0]);
1718
1719        let request = ConsumeRequest::new(100).filter(Filter::eq("type", json!("token")));
1720        let response = merger.poll(request).await.unwrap();
1721
1722        assert_eq!(response.events.len(), 2);
1723        for event in &response.events {
1724            assert!(event.raw_str().unwrap().contains("token"));
1725        }
1726    }
1727
1728    #[tokio::test]
1729    async fn test_poll_merger_with_limit() {
1730        let adapter = Arc::new(MockAdapter::new());
1731
1732        adapter.add_events(
1733            0,
1734            vec![
1735                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1736                StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
1737                StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
1738            ],
1739        );
1740
1741        let merger = PollMerger::new(adapter, vec![0]);
1742
1743        let request = ConsumeRequest::new(2);
1744        let response = merger.poll(request).await.unwrap();
1745
1746        assert_eq!(response.events.len(), 2);
1747        assert!(response.has_more);
1748    }
1749
1750    #[tokio::test]
1751    async fn test_poll_merger_specific_shards() {
1752        let adapter = Arc::new(MockAdapter::new());
1753
1754        adapter.add_events(
1755            0,
1756            vec![StoredEvent::from_value(
1757                "0-1".to_string(),
1758                json!({"shard": 0}),
1759                100,
1760                0,
1761            )],
1762        );
1763        adapter.add_events(
1764            1,
1765            vec![StoredEvent::from_value(
1766                "1-1".to_string(),
1767                json!({"shard": 1}),
1768                100,
1769                1,
1770            )],
1771        );
1772        adapter.add_events(
1773            2,
1774            vec![StoredEvent::from_value(
1775                "2-1".to_string(),
1776                json!({"shard": 2}),
1777                100,
1778                2,
1779            )],
1780        );
1781
1782        let merger = PollMerger::new(adapter, vec![0, 1, 2]);
1783
1784        // Only poll shard 0 and 2
1785        let request = ConsumeRequest::new(100).shards(vec![0, 2]);
1786        let response = merger.poll(request).await.unwrap();
1787
1788        assert_eq!(response.events.len(), 2);
1789        let shard_ids: Vec<_> = response.events.iter().map(|e| e.shard_id).collect();
1790        assert!(shard_ids.contains(&0));
1791        assert!(shard_ids.contains(&2));
1792        assert!(!shard_ids.contains(&1));
1793    }
1794
1795    #[tokio::test]
1796    async fn test_poll_merger_with_cursor() {
1797        let adapter = Arc::new(MockAdapter::new());
1798
1799        adapter.add_events(
1800            0,
1801            vec![
1802                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1803                StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
1804                StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
1805            ],
1806        );
1807
1808        let merger = PollMerger::new(adapter, vec![0]);
1809
1810        // First poll
1811        let request = ConsumeRequest::new(2);
1812        let response1 = merger.poll(request).await.unwrap();
1813        assert_eq!(response1.events.len(), 2);
1814
1815        // Second poll with cursor
1816        let cursor = response1.next_id.unwrap();
1817        let request2 = ConsumeRequest::new(10).from(cursor);
1818        let response2 = merger.poll(request2).await.unwrap();
1819
1820        assert_eq!(response2.events.len(), 1);
1821        assert_eq!(response2.events[0].id, "0-3");
1822    }
1823
1824    #[tokio::test]
1825    async fn test_poll_merger_pagination_multi_shard() {
1826        // Test that pagination across multiple shards doesn't skip events
1827        let adapter = Arc::new(MockAdapter::new());
1828
1829        // Shard 0: 10 events
1830        let shard0_events: Vec<_> = (1..=10)
1831            .map(|i| {
1832                StoredEvent::from_value(
1833                    format!("0-{}", i),
1834                    json!({"shard": 0, "idx": i}),
1835                    i as u64 * 10,
1836                    0,
1837                )
1838            })
1839            .collect();
1840        adapter.add_events(0, shard0_events);
1841
1842        // Shard 1: 15 events
1843        let shard1_events: Vec<_> = (1..=15)
1844            .map(|i| {
1845                StoredEvent::from_value(
1846                    format!("1-{}", i),
1847                    json!({"shard": 1, "idx": i}),
1848                    i as u64 * 10 + 5,
1849                    1,
1850                )
1851            })
1852            .collect();
1853        adapter.add_events(1, shard1_events);
1854
1855        let merger = PollMerger::new(adapter, vec![0, 1]);
1856
1857        // Poll in pages of 10 and collect all events
1858        let mut all_events = Vec::new();
1859        let mut cursor: Option<String> = None;
1860        let mut iterations = 0;
1861
1862        loop {
1863            iterations += 1;
1864            let request = match &cursor {
1865                Some(c) => ConsumeRequest::new(10).from(c.clone()),
1866                None => ConsumeRequest::new(10),
1867            };
1868
1869            let response = merger.poll(request).await.unwrap();
1870            all_events.extend(response.events);
1871
1872            if !response.has_more {
1873                break;
1874            }
1875            cursor = response.next_id;
1876
1877            // Safety: prevent infinite loop
1878            if iterations > 10 {
1879                panic!("Too many iterations");
1880            }
1881        }
1882
1883        // Should get all 25 events (10 from shard 0 + 15 from shard 1)
1884        assert_eq!(
1885            all_events.len(),
1886            25,
1887            "Expected 25 events, got {}. Iterations: {}",
1888            all_events.len(),
1889            iterations
1890        );
1891
1892        // Verify we got events from both shards
1893        let shard0_count = all_events.iter().filter(|e| e.shard_id == 0).count();
1894        let shard1_count = all_events.iter().filter(|e| e.shard_id == 1).count();
1895        assert_eq!(shard0_count, 10, "Expected 10 events from shard 0");
1896        assert_eq!(shard1_count, 15, "Expected 15 events from shard 1");
1897    }
1898
1899    #[tokio::test]
1900    async fn test_poll_merger_pagination_no_duplicates() {
1901        // Test that pagination doesn't return duplicate events
1902        let adapter = Arc::new(MockAdapter::new());
1903
1904        // Add events to both shards
1905        for shard_id in 0..2u16 {
1906            let events: Vec<_> = (1..=20)
1907                .map(|i| {
1908                    StoredEvent::from_value(
1909                        format!("{}-{}", shard_id, i),
1910                        json!({"shard": shard_id, "idx": i}),
1911                        i as u64 * 10,
1912                        shard_id,
1913                    )
1914                })
1915                .collect();
1916            adapter.add_events(shard_id, events);
1917        }
1918
1919        let merger = PollMerger::new(adapter, vec![0, 1]);
1920
1921        // Poll in small pages
1922        let mut all_event_ids = Vec::new();
1923        let mut cursor: Option<String> = None;
1924
1925        for _ in 0..20 {
1926            let request = match &cursor {
1927                Some(c) => ConsumeRequest::new(5).from(c.clone()),
1928                None => ConsumeRequest::new(5),
1929            };
1930
1931            let response = merger.poll(request).await.unwrap();
1932            all_event_ids.extend(response.events.iter().map(|e| e.id.clone()));
1933
1934            if !response.has_more {
1935                break;
1936            }
1937            cursor = response.next_id;
1938        }
1939
1940        // Check for duplicates
1941        let unique_count = {
1942            let mut ids = all_event_ids.clone();
1943            ids.sort();
1944            ids.dedup();
1945            ids.len()
1946        };
1947
1948        assert_eq!(
1949            unique_count,
1950            all_event_ids.len(),
1951            "Found duplicate events! Total: {}, Unique: {}",
1952            all_event_ids.len(),
1953            unique_count
1954        );
1955
1956        // Should have all 40 events
1957        assert_eq!(all_event_ids.len(), 40);
1958    }
1959
1960    #[tokio::test]
1961    async fn test_poll_merger_pagination_with_ordering() {
1962        // Test pagination with timestamp ordering
1963        let adapter = Arc::new(MockAdapter::new());
1964
1965        // Add events with interleaved timestamps across shards
1966        adapter.add_events(
1967            0,
1968            vec![
1969                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1970                StoredEvent::from_value("0-2".to_string(), json!({}), 300, 0),
1971                StoredEvent::from_value("0-3".to_string(), json!({}), 500, 0),
1972            ],
1973        );
1974        adapter.add_events(
1975            1,
1976            vec![
1977                StoredEvent::from_value("1-1".to_string(), json!({}), 200, 1),
1978                StoredEvent::from_value("1-2".to_string(), json!({}), 400, 1),
1979            ],
1980        );
1981
1982        let merger = PollMerger::new(adapter, vec![0, 1]);
1983
1984        // Poll with ordering, page size 2
1985        let mut all_events = Vec::new();
1986        let mut cursor: Option<String> = None;
1987
1988        for _ in 0..5 {
1989            let mut request = ConsumeRequest::new(2).ordering(Ordering::InsertionTs);
1990            if let Some(c) = &cursor {
1991                request = request.from(c.clone());
1992            }
1993
1994            let response = merger.poll(request).await.unwrap();
1995            all_events.extend(response.events);
1996
1997            if !response.has_more {
1998                break;
1999            }
2000            cursor = response.next_id;
2001        }
2002
2003        // Should get all 5 events
2004        assert_eq!(all_events.len(), 5);
2005
2006        // Verify ordering is maintained
2007        let timestamps: Vec<_> = all_events.iter().map(|e| e.insertion_ts).collect();
2008        let mut sorted = timestamps.clone();
2009        sorted.sort();
2010        assert_eq!(timestamps, sorted, "Events should be sorted by timestamp");
2011    }
2012
2013    #[tokio::test]
2014    async fn test_poll_merger_cursor_tracks_returned_events_only() {
2015        // Test that cursor tracks position based on returned events, not fetched events
2016        let adapter = Arc::new(MockAdapter::new());
2017
2018        // Shard 0: 3 events
2019        adapter.add_events(
2020            0,
2021            vec![
2022                StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2023                StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
2024                StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
2025            ],
2026        );
2027
2028        // Shard 1: 3 events
2029        adapter.add_events(
2030            1,
2031            vec![
2032                StoredEvent::from_value("1-1".to_string(), json!({}), 150, 1),
2033                StoredEvent::from_value("1-2".to_string(), json!({}), 250, 1),
2034                StoredEvent::from_value("1-3".to_string(), json!({}), 350, 1),
2035            ],
2036        );
2037
2038        let merger = PollMerger::new(adapter, vec![0, 1]);
2039
2040        // First poll with limit 2 - should get 2 events and cursor should reflect only those 2
2041        let response1 = merger.poll(ConsumeRequest::new(2)).await.unwrap();
2042        assert_eq!(response1.events.len(), 2);
2043        assert!(response1.has_more);
2044
2045        // Decode cursor to verify it tracks returned events
2046        let next_id = response1.next_id.clone().unwrap();
2047        let cursor = CompositeCursor::decode(&next_id).unwrap();
2048
2049        // Cursor should only have positions for shards that had events in the returned set
2050        let returned_shard_ids: std::collections::HashSet<_> =
2051            response1.events.iter().map(|e| e.shard_id).collect();
2052
2053        for shard_id in 0..2u16 {
2054            if returned_shard_ids.contains(&shard_id) {
2055                // Shard had returned events, cursor should have position
2056                assert!(
2057                    cursor.get(shard_id).is_some(),
2058                    "Cursor should have position for shard {} which had returned events",
2059                    shard_id
2060                );
2061            }
2062        }
2063
2064        // Second poll should continue from where we left off
2065        let response2 = merger
2066            .poll(ConsumeRequest::new(10).from(next_id))
2067            .await
2068            .unwrap();
2069
2070        // Should get remaining 4 events
2071        assert_eq!(response2.events.len(), 4, "Should get remaining 4 events");
2072    }
2073
2074    /// When the per-shard fetch hits the
2075    /// PER_SHARD_FETCH_CAP clamp, the response must surface
2076    /// `truncated_at_per_shard_cap = true` so callers can detect
2077    /// the silent under-delivery.
2078    #[tokio::test]
2079    async fn poll_merger_surfaces_per_shard_cap_truncation() {
2080        let adapter = Arc::new(MockAdapter::new());
2081        // Single shard — over-fetch factor 2 — request limit
2082        // 50 000 → unclamped per_shard would be 100 000 → clamped
2083        // to PER_SHARD_FETCH_CAP (10 000).
2084        adapter.add_events(
2085            0,
2086            (0..1)
2087                .map(|i| StoredEvent::from_value(format!("0-{}", i), json!({}), 100, 0))
2088                .collect(),
2089        );
2090
2091        let merger = PollMerger::new(adapter, vec![0]);
2092        let response = merger.poll(ConsumeRequest::new(50_000)).await.unwrap();
2093        assert!(
2094            response.truncated_at_per_shard_cap,
2095            "large limit must flag the per-shard cap clamp",
2096        );
2097    }
2098
2099    /// A single-shard request with a filter where every fetched
2100    /// event matches AND the per-shard cap clamps the fetch must
2101    /// not stall — each poll must advance the cursor by
2102    /// `request.limit` matches and the loop must drain in
2103    /// `ceil(total / limit)` iterations.
2104    ///
2105    /// Without the rollback+override pairing, the rollback step
2106    /// would roll the cursor back to the original position with
2107    /// no matching forward override, so the cursor never
2108    /// advances — re-fetching the same events on every poll. The
2109    /// current logic resolves this: Step 1 rolls back for shards
2110    /// with truncated matches; Step 2 overrides to the last
2111    /// *returned* match's id so each poll makes `request.limit`
2112    /// worth of forward progress.
2113    #[tokio::test]
2114    async fn poll_merger_does_not_stall_on_single_shard_filter_under_cap() {
2115        // Adapter holds 50 events on shard 0. With a filter that
2116        // matches every event, request.limit = 10 → per_shard_limit
2117        // could be 30 (limit×3 over_fetch_factor) without the cap;
2118        // here we keep numbers small so the test is deterministic.
2119        let adapter = Arc::new(MockAdapter::new());
2120        let mut events: Vec<StoredEvent> = (0..50)
2121            .map(|i| {
2122                StoredEvent::from_value(
2123                    format!("0-{}", i),
2124                    json!({"keep": true}),
2125                    (i + 1) as u64,
2126                    0,
2127                )
2128            })
2129            .collect();
2130        adapter.add_events(0, events.split_off(0));
2131
2132        let merger = PollMerger::new(adapter.clone(), vec![0]);
2133        // Filter that matches everything ('keep': true on all events).
2134        let make_request = |from_id: Option<String>| ConsumeRequest {
2135            limit: 10,
2136            from_id,
2137            shards: None,
2138            filter: Some(Filter::eq("keep", json!(true))),
2139            ordering: Ordering::None,
2140        };
2141
2142        let mut cursor: Option<String> = None;
2143        let mut total = 0;
2144        let mut polls = 0;
2145        loop {
2146            polls += 1;
2147            assert!(
2148                polls < 20,
2149                "poll loop must terminate; got {} polls without draining \
2150                 (cursor={:?}, total={})",
2151                polls,
2152                cursor,
2153                total,
2154            );
2155            let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2156            total += response.events.len();
2157            let new_cursor = response.next_id.clone();
2158            assert!(
2159                new_cursor.is_some(),
2160                "response must surface a cursor on every progress poll \
2161                 (poll={}, returned={})",
2162                polls,
2163                response.events.len(),
2164            );
2165            // The cursor MUST advance OR has_more must be false.
2166            // Pre-fix the cursor would echo back unchanged on a
2167            // stall; we'd hit the polls<20 watchdog above.
2168            if cursor == new_cursor {
2169                assert!(
2170                    !response.has_more,
2171                    "cursor stuck at {:?} but has_more=true → stall",
2172                    cursor,
2173                );
2174                break;
2175            }
2176            cursor = new_cursor;
2177            if !response.has_more && response.events.is_empty() {
2178                break;
2179            }
2180        }
2181        assert_eq!(
2182            total, 50,
2183            "full draining of 50 events must succeed without stall \
2184             (got {} in {} polls)",
2185            total, polls,
2186        );
2187    }
2188
2189    /// Multi-shard variant. The rollback+override logic must
2190    /// drain every match across multiple shards when each poll
2191    /// truncates matches from BOTH shards (so both shards land
2192    /// in the `rolled_back` set on the same poll). Without it,
2193    /// a stall on either shard would leave matches stranded;
2194    /// this exercises the dual-rollback path the single-shard
2195    /// test doesn't.
2196    #[tokio::test]
2197    async fn poll_merger_does_not_stall_on_multi_shard_filter_truncation() {
2198        let adapter = Arc::new(MockAdapter::new());
2199        // Two shards, 30 matching events each. Limit=10 means
2200        // every poll truncates matches from both shards
2201        // simultaneously (the global sort interleaves them, and
2202        // 10 < 2*30).
2203        for shard_id in 0..2u16 {
2204            let events: Vec<StoredEvent> = (0..30)
2205                .map(|i| {
2206                    StoredEvent::from_value(
2207                        format!("{}-{}", shard_id, i),
2208                        json!({"keep": true}),
2209                        // Stagger timestamps so the global sort
2210                        // interleaves shards: shard 0 ts even,
2211                        // shard 1 ts odd.
2212                        (i * 2 + shard_id as usize) as u64 + 1,
2213                        shard_id,
2214                    )
2215                })
2216                .collect();
2217            adapter.add_events(shard_id, events);
2218        }
2219
2220        let merger = PollMerger::new(adapter, vec![0, 1]);
2221        let make_request = |from_id: Option<String>| ConsumeRequest {
2222            limit: 10,
2223            from_id,
2224            shards: None,
2225            filter: Some(Filter::eq("keep", json!(true))),
2226            ordering: Ordering::None,
2227        };
2228
2229        let mut cursor: Option<String> = None;
2230        let mut returned: std::collections::HashSet<String> = std::collections::HashSet::new();
2231        let mut polls = 0;
2232        loop {
2233            polls += 1;
2234            assert!(
2235                polls < 30,
2236                "multi-shard: poll loop must terminate; \
2237                 got {} polls without draining (cursor={:?}, returned={})",
2238                polls,
2239                cursor,
2240                returned.len(),
2241            );
2242            let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2243            for e in &response.events {
2244                returned.insert(e.id.clone());
2245            }
2246            let new_cursor = response.next_id.clone();
2247            // Same stall guard as the single-shard test: the
2248            // cursor must advance OR has_more must be false on
2249            // any poll that returned events.
2250            if cursor == new_cursor {
2251                assert!(
2252                    !response.has_more,
2253                    "multi-shard: cursor stuck at {:?} but \
2254                     has_more=true → stall",
2255                    cursor,
2256                );
2257                break;
2258            }
2259            cursor = new_cursor;
2260            if !response.has_more && response.events.is_empty() {
2261                break;
2262            }
2263        }
2264        assert_eq!(
2265            returned.len(),
2266            60,
2267            "multi-shard: every match across both shards must \
2268             surface exactly once (got {} unique in {} polls)",
2269            returned.len(),
2270            polls,
2271        );
2272    }
2273
2274    /// Corollary: a small request that fits well below
2275    /// the cap must NOT flag truncation.
2276    #[tokio::test]
2277    async fn poll_merger_does_not_flag_truncation_on_small_limit() {
2278        let adapter = Arc::new(MockAdapter::new());
2279        adapter.add_events(
2280            0,
2281            vec![StoredEvent::from_value(
2282                "0-1".to_string(),
2283                json!({}),
2284                100,
2285                0,
2286            )],
2287        );
2288        let merger = PollMerger::new(adapter, vec![0]);
2289        let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2290        assert!(
2291            !response.truncated_at_per_shard_cap,
2292            "small limits must not flag the cap",
2293        );
2294    }
2295
2296    #[tokio::test]
2297    async fn test_poll_merger_small_limit_many_shards() {
2298        // Regression: limit < shard count caused integer division truncation to 0,
2299        // making per-shard fetch too small. Now uses ceiling division.
2300        let adapter = Arc::new(MockAdapter::new());
2301        let num_shards = 8u16;
2302
2303        for shard_id in 0..num_shards {
2304            adapter.add_events(
2305                shard_id,
2306                vec![StoredEvent::from_value(
2307                    format!("{}-1", shard_id),
2308                    json!({"shard": shard_id}),
2309                    100,
2310                    shard_id,
2311                )],
2312            );
2313        }
2314
2315        let merger = PollMerger::new(adapter, (0..num_shards).collect());
2316
2317        // Request fewer events than shards — should still work
2318        let request = ConsumeRequest::new(3);
2319        let response = merger.poll(request).await.unwrap();
2320
2321        assert_eq!(response.events.len(), 3);
2322        assert!(response.has_more);
2323    }
2324
2325    #[tokio::test]
2326    async fn test_regression_filtered_shards_cursor_advances() {
2327        // Bug 3: "Cursor never advances for filtered-out shards"
2328        //
2329        // When shard 1's events are entirely filtered out, the cursor for shard 1
2330        // must still advance past those events. Otherwise, subsequent polls will
2331        // re-fetch the same filtered-out events forever.
2332        let adapter = Arc::new(MockAdapter::new());
2333
2334        // Shard 0: events matching the filter
2335        adapter.add_events(
2336            0,
2337            vec![
2338                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2339                StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 200, 0),
2340            ],
2341        );
2342
2343        // Shard 1: events that will be filtered out
2344        adapter.add_events(
2345            1,
2346            vec![
2347                StoredEvent::from_value("1-1".to_string(), json!({"type": "message"}), 150, 1),
2348                StoredEvent::from_value("1-2".to_string(), json!({"type": "message"}), 250, 1),
2349            ],
2350        );
2351
2352        let merger = PollMerger::new(adapter, vec![0, 1]);
2353        let filter = Filter::eq("type", json!("token"));
2354
2355        // First poll: should return only the "token" events from shard 0
2356        let response1 = merger
2357            .poll(ConsumeRequest::new(100).filter(filter.clone()))
2358            .await
2359            .unwrap();
2360
2361        assert_eq!(response1.events.len(), 2, "Should get 2 token events");
2362        for event in &response1.events {
2363            assert_eq!(
2364                event.shard_id, 0,
2365                "All returned events should be from shard 0"
2366            );
2367        }
2368
2369        let cursor1 = response1
2370            .next_id
2371            .expect("Should have a cursor after first poll");
2372
2373        // Verify the cursor advanced for shard 1 even though its events were filtered out
2374        let decoded = CompositeCursor::decode(&cursor1).unwrap();
2375        assert!(
2376            decoded.get(1).is_some(),
2377            "Cursor must advance for shard 1 even though all its events were filtered out"
2378        );
2379        assert_eq!(
2380            decoded.get(1),
2381            Some("1-2"),
2382            "Shard 1 cursor should point to its last fetched event"
2383        );
2384
2385        // Second poll with the cursor: should NOT re-fetch shard 1's events
2386        let response2 = merger
2387            .poll(ConsumeRequest::new(100).filter(filter).from(cursor1))
2388            .await
2389            .unwrap();
2390
2391        assert!(
2392            response2.events.is_empty(),
2393            "Second poll should return no events (all events already consumed or filtered)"
2394        );
2395    }
2396
2397    #[tokio::test]
2398    async fn test_regression_poll_merger_filter_does_not_infinite_loop() {
2399        // Regression: when one shard has events matching the filter and another
2400        // shard has events that are all filtered out, polling in pages must
2401        // terminate and return all matching events without looping forever.
2402        let adapter = Arc::new(MockAdapter::new());
2403
2404        // Shard 0: 100 events all matching filter
2405        let shard0_events: Vec<_> = (1..=100)
2406            .map(|i| {
2407                StoredEvent::from_value(
2408                    format!("0-{}", i),
2409                    json!({"type": "token", "idx": i}),
2410                    i as u64 * 10,
2411                    0,
2412                )
2413            })
2414            .collect();
2415        adapter.add_events(0, shard0_events);
2416
2417        // Shard 1: 100 events none matching filter
2418        let shard1_events: Vec<_> = (1..=100)
2419            .map(|i| {
2420                StoredEvent::from_value(
2421                    format!("1-{}", i),
2422                    json!({"type": "message", "idx": i}),
2423                    i as u64 * 10 + 5,
2424                    1,
2425                )
2426            })
2427            .collect();
2428        adapter.add_events(1, shard1_events);
2429
2430        let merger = PollMerger::new(adapter, vec![0, 1]);
2431        let filter = Filter::eq("type", json!("token"));
2432
2433        let mut all_events = Vec::new();
2434        let mut cursor: Option<String> = None;
2435        let max_iterations = 50;
2436        let mut iterations = 0;
2437
2438        loop {
2439            iterations += 1;
2440            if iterations > max_iterations {
2441                panic!(
2442                    "Infinite loop detected after {} iterations! Collected {} events so far.",
2443                    max_iterations,
2444                    all_events.len()
2445                );
2446            }
2447
2448            let mut request = ConsumeRequest::new(50).filter(filter.clone());
2449            if let Some(c) = &cursor {
2450                request = request.from(c.clone());
2451            }
2452
2453            let response = merger.poll(request).await.unwrap();
2454            all_events.extend(response.events);
2455
2456            if !response.has_more {
2457                break;
2458            }
2459            cursor = response.next_id;
2460        }
2461
2462        // Should have collected exactly 100 matching events from shard 0
2463        assert_eq!(
2464            all_events.len(),
2465            100,
2466            "Expected 100 matching events, got {}. Iterations: {}",
2467            all_events.len(),
2468            iterations
2469        );
2470
2471        // All events should be from shard 0 (the "token" shard)
2472        for event in &all_events {
2473            assert_eq!(
2474                event.shard_id, 0,
2475                "All matching events should come from shard 0"
2476            );
2477        }
2478
2479        // Verify no duplicates
2480        let mut ids: Vec<_> = all_events.iter().map(|e| e.id.clone()).collect();
2481        ids.sort();
2482        ids.dedup();
2483        assert_eq!(ids.len(), 100, "Should have no duplicate events");
2484    }
2485
2486    #[tokio::test]
2487    async fn test_regression_all_events_filtered_returns_cursor() {
2488        // Regression: when every fetched event was filtered out, next_id was
2489        // None, leaving the caller stuck re-fetching the same events forever.
2490        let adapter = Arc::new(MockAdapter::new());
2491
2492        // Only non-matching events
2493        adapter.add_events(
2494            0,
2495            vec![
2496                StoredEvent::from_value("0-1".to_string(), json!({"type": "noise"}), 100, 0),
2497                StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 200, 0),
2498            ],
2499        );
2500
2501        let merger = PollMerger::new(adapter, vec![0]);
2502        let filter = Filter::eq("type", json!("signal"));
2503
2504        let response = merger
2505            .poll(ConsumeRequest::new(100).filter(filter))
2506            .await
2507            .unwrap();
2508
2509        // No events match, but cursor must still advance
2510        assert!(response.events.is_empty());
2511        assert!(
2512            response.next_id.is_some(),
2513            "cursor must advance past filtered events even when none match"
2514        );
2515    }
2516
2517    /// Exercises the non-lazy filter branch: when `Ordering::InsertionTs`
2518    /// is requested we can't short-circuit at `limit + 1` matches (the
2519    /// sort needs every event first), so the code falls through to
2520    /// `retain` → sort → truncate. This test pins:
2521    /// - Results are globally sorted by `insertion_ts` (not input order).
2522    /// - Only filter-matching events come through.
2523    /// - Truncation picks the `limit` *earliest* matches by ts.
2524    /// - `has_more` is set when matches exceed `limit`.
2525    #[tokio::test]
2526    async fn test_poll_merger_filter_insertion_ts_truncates_after_sort() {
2527        let adapter = Arc::new(MockAdapter::new());
2528
2529        // Interleave shards with out-of-order timestamps and a mix of
2530        // matching / non-matching events. Matching timestamps: 120, 200,
2531        // 260, 400. Non-matching timestamps: 100, 300.
2532        adapter.add_events(
2533            0,
2534            vec![
2535                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 400, 0),
2536                StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 100, 0),
2537                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 200, 0),
2538            ],
2539        );
2540        adapter.add_events(
2541            1,
2542            vec![
2543                StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 260, 1),
2544                StoredEvent::from_value("1-2".to_string(), json!({"type": "noise"}), 300, 1),
2545                StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 120, 1),
2546            ],
2547        );
2548
2549        let merger = PollMerger::new(adapter, vec![0, 1]);
2550        let filter = Filter::eq("type", json!("token"));
2551
2552        // 4 matches exist; asking for 2 must yield the two earliest
2553        // after a full sort (120, 200) and signal has_more.
2554        let response = merger
2555            .poll(
2556                ConsumeRequest::new(2)
2557                    .filter(filter)
2558                    .ordering(Ordering::InsertionTs),
2559            )
2560            .await
2561            .unwrap();
2562
2563        assert_eq!(response.events.len(), 2);
2564        assert_eq!(
2565            response.events[0].insertion_ts, 120,
2566            "earliest match must come first"
2567        );
2568        assert_eq!(response.events[1].insertion_ts, 200);
2569        assert!(
2570            response.has_more,
2571            "two more matching events remain past the limit"
2572        );
2573    }
2574
2575    #[tokio::test]
2576    async fn test_regression_corrupt_event_filter_drop_is_consistent_and_logged() {
2577        // Regression: corrupt events (raw bytes that don't deserialize as
2578        // JSON) used to be silently dropped from the filtered poll path
2579        // via `event.parse().map(...).unwrap_or(false)`, while the
2580        // unfiltered path returned them as-is. That inconsistency hid
2581        // upstream framing/storage corruption from anyone running with a
2582        // filter (i.e. most consumers).
2583        //
2584        // The fix routes parse failures through `event_matches_filter`,
2585        // which emits `tracing::warn!` per dropped event. We don't have
2586        // a tracing-test subscriber wired up so we don't assert on the
2587        // log line itself; instead we pin the behavioral surface so a
2588        // future regression that re-silences corruption (e.g., dropping
2589        // the helper) shows up in code review:
2590        //   - filtered poll: corrupt event is dropped, valid event kept
2591        //   - unfiltered poll: corrupt event flows through unchanged
2592        //
2593        // If the helper is removed or the warn! is downgraded to debug!,
2594        // this test still passes — but the helper's doc-comment names
2595        // the inconsistency and is the artifact that protects the
2596        // observability requirement.
2597        let adapter = Arc::new(MockAdapter::new());
2598        adapter.add_events(
2599            0,
2600            vec![
2601                StoredEvent::from_value("0-1".to_string(), json!({"type": "ok"}), 100, 0),
2602                // Raw bytes that don't parse as JSON — a torn write or
2603                // upstream framing bug surface.
2604                StoredEvent::new(
2605                    "0-2".to_string(),
2606                    bytes::Bytes::from_static(b"\xff\xff not json \xff"),
2607                    200,
2608                    0,
2609                ),
2610            ],
2611        );
2612
2613        let merger = PollMerger::new(adapter, vec![0]);
2614
2615        // Filtered: corrupt event must be dropped, valid event kept.
2616        let filtered = merger
2617            .poll(ConsumeRequest::new(100).filter(Filter::eq("type", json!("ok"))))
2618            .await
2619            .unwrap();
2620        assert_eq!(
2621            filtered.events.len(),
2622            1,
2623            "filtered poll must drop the corrupt event"
2624        );
2625        assert_eq!(filtered.events[0].id, "0-1");
2626
2627        // Unfiltered: corrupt event flows through. Documenting that the
2628        // unfiltered path is the *only* way an operator currently sees
2629        // the corrupt bytes — without the warn! the filtered path is
2630        // a black hole.
2631        let unfiltered = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2632        assert_eq!(
2633            unfiltered.events.len(),
2634            2,
2635            "unfiltered poll must surface the corrupt event verbatim"
2636        );
2637        let ids: Vec<_> = unfiltered.events.iter().map(|e| e.id.as_str()).collect();
2638        assert!(ids.contains(&"0-1"));
2639        assert!(ids.contains(&"0-2"));
2640    }
2641
2642    /// Regression: BUG_REPORT.md #2 — `Ordering::None` filter previously
2643    /// broke out of the drain loop once `kept.len() >= limit + 1`,
2644    /// which silently discarded events from later shards without
2645    /// checking the filter. Combined with `new_cursor` advancing for
2646    /// every polled shard, that meant matching events on un-inspected
2647    /// shards were lost forever.
2648    ///
2649    /// Setup: shard 0 has matches followed by shard 1 with matches.
2650    /// With `limit=2`, shard 0's first three events (two matches plus
2651    /// one extra to trigger has_more) used to satisfy the early break,
2652    /// silently dropping shard 1's matches AND advancing past them.
2653    /// The fix runs a full `retain` pass over every fetched event,
2654    /// then rolls back the cursor for shards whose matches were
2655    /// truncated so they're re-fetched on the next poll.
2656    #[tokio::test]
2657    async fn test_regression_ordering_none_filter_does_not_strand_later_shards() {
2658        let adapter = Arc::new(MockAdapter::new());
2659
2660        // Shard 0: 3 matching events.
2661        adapter.add_events(
2662            0,
2663            vec![
2664                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2665                StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2666                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2667            ],
2668        );
2669        // Shard 1: 3 matching events.
2670        adapter.add_events(
2671            1,
2672            vec![
2673                StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 200, 1),
2674                StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 210, 1),
2675                StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 220, 1),
2676            ],
2677        );
2678
2679        let merger = PollMerger::new(adapter, vec![0, 1]);
2680        let filter = Filter::eq("type", json!("token"));
2681
2682        // Page through with a small limit — over many polls every
2683        // matching event must surface exactly once. Bound iterations
2684        // to detect either a stall or an explosion.
2685        let mut all_returned: Vec<String> = Vec::new();
2686        let mut cursor: Option<String> = None;
2687        for _ in 0..20 {
2688            let mut req = ConsumeRequest::new(2).filter(filter.clone());
2689            if let Some(c) = &cursor {
2690                req = req.from(c.clone());
2691            }
2692            let resp = merger.poll(req).await.unwrap();
2693            for e in &resp.events {
2694                all_returned.push(e.id.clone());
2695            }
2696            if !resp.has_more {
2697                break;
2698            }
2699            cursor = resp.next_id;
2700        }
2701
2702        all_returned.sort();
2703        all_returned.dedup();
2704        assert_eq!(
2705            all_returned,
2706            vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2707            "every matching event from every shard must be returned exactly once"
2708        );
2709    }
2710
2711    /// Regression: BUG_REPORT.md #23 — `Ordering::InsertionTs` filter
2712    /// previously stranded matches on shards whose matching events
2713    /// all sorted later than `limit` matches from other shards. The
2714    /// global sort+truncate dropped them, the cursor-override only
2715    /// fired for shards present in the *returned* set, and so the
2716    /// cursor for the un-returned shard advanced to its fetched
2717    /// position via `new_cursor` — silently skipping the matches.
2718    ///
2719    /// Setup: shard 0 has 3 early-ts matches and shard 1 has 3
2720    /// late-ts matches. With `limit=2` and `InsertionTs` ordering,
2721    /// the first poll returns the two earliest from shard 0;
2722    /// shard 1's matches must NOT be lost. The fix detects that
2723    /// shard 1 had matches truncated and rolls its cursor back so
2724    /// they're re-fetched on the next poll.
2725    #[tokio::test]
2726    async fn test_regression_insertion_ts_filter_does_not_strand_late_shard() {
2727        let adapter = Arc::new(MockAdapter::new());
2728
2729        adapter.add_events(
2730            0,
2731            vec![
2732                StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2733                StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2734                StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2735            ],
2736        );
2737        adapter.add_events(
2738            1,
2739            vec![
2740                StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 1000, 1),
2741                StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 1010, 1),
2742                StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 1020, 1),
2743            ],
2744        );
2745
2746        let merger = PollMerger::new(adapter, vec![0, 1]);
2747        let filter = Filter::eq("type", json!("token"));
2748
2749        let mut all_returned: Vec<String> = Vec::new();
2750        let mut cursor: Option<String> = None;
2751        for _ in 0..20 {
2752            let mut req = ConsumeRequest::new(2)
2753                .filter(filter.clone())
2754                .ordering(Ordering::InsertionTs);
2755            if let Some(c) = &cursor {
2756                req = req.from(c.clone());
2757            }
2758            let resp = merger.poll(req).await.unwrap();
2759            for e in &resp.events {
2760                all_returned.push(e.id.clone());
2761            }
2762            if !resp.has_more {
2763                break;
2764            }
2765            cursor = resp.next_id;
2766        }
2767
2768        all_returned.sort();
2769        all_returned.dedup();
2770        assert_eq!(
2771            all_returned,
2772            vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2773            "matches from the late-ts shard must not be lost to truncation"
2774        );
2775    }
2776
2777    /// Regression: BUG_REPORT.md #50 — if any adapter returns
2778    /// `has_more: true` with no events and no `next_id`, the merger
2779    /// previously forwarded that as `(has_more=true, next_id=None)`,
2780    /// causing the caller to re-poll from the same starting cursor
2781    /// indefinitely. The fix suppresses `has_more` whenever the
2782    /// merger itself made no observable progress (no events AND no
2783    /// cursor advance).
2784    #[tokio::test]
2785    async fn has_more_is_suppressed_when_no_progress() {
2786        struct LiarAdapter;
2787
2788        #[async_trait]
2789        impl Adapter for LiarAdapter {
2790            async fn init(&mut self) -> Result<(), AdapterError> {
2791                Ok(())
2792            }
2793            async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2794                Ok(())
2795            }
2796            async fn flush(&self) -> Result<(), AdapterError> {
2797                Ok(())
2798            }
2799            async fn shutdown(&self) -> Result<(), AdapterError> {
2800                Ok(())
2801            }
2802            async fn poll_shard(
2803                &self,
2804                _shard_id: u16,
2805                _from_id: Option<&str>,
2806                _limit: usize,
2807            ) -> Result<ShardPollResult, AdapterError> {
2808                // The pathological case: claim has_more without
2809                // returning events or advancing the cursor.
2810                Ok(ShardPollResult {
2811                    events: Vec::new(),
2812                    next_id: None,
2813                    has_more: true,
2814                })
2815            }
2816            fn name(&self) -> &'static str {
2817                "liar"
2818            }
2819        }
2820
2821        let adapter: Arc<dyn Adapter> = Arc::new(LiarAdapter);
2822        let merger = PollMerger::new(adapter, vec![0, 1]);
2823        let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2824
2825        assert!(
2826            response.events.is_empty(),
2827            "no events were emitted, but merger returned {}",
2828            response.events.len()
2829        );
2830        // The whole point of this fix: don't let a misbehaving
2831        // adapter trick the caller into an infinite re-poll.
2832        assert!(
2833            !response.has_more,
2834            "has_more must be suppressed when merger made no progress (#50)"
2835        );
2836        assert!(
2837            response.next_id.is_none(),
2838            "next_id must remain None when no progress was made (#50)"
2839        );
2840    }
2841
2842    /// Pin: a stalled poll (no events, no cursor advance) that
2843    /// was given an input cursor must echo the cursor back to the
2844    /// caller. Pre-fix the merger returned `next_id = None` on no
2845    /// progress, so a caller that interpreted None as "no events
2846    /// — restart from the beginning" silently re-fetched from the
2847    /// stream's start across the stall, losing pagination
2848    /// continuity.
2849    #[tokio::test]
2850    async fn stalled_poll_echoes_caller_cursor_back() {
2851        struct EmptyAdapter;
2852
2853        #[async_trait]
2854        impl Adapter for EmptyAdapter {
2855            async fn init(&mut self) -> Result<(), AdapterError> {
2856                Ok(())
2857            }
2858            async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2859                Ok(())
2860            }
2861            async fn flush(&self) -> Result<(), AdapterError> {
2862                Ok(())
2863            }
2864            async fn shutdown(&self) -> Result<(), AdapterError> {
2865                Ok(())
2866            }
2867            async fn poll_shard(
2868                &self,
2869                _shard_id: u16,
2870                _from_id: Option<&str>,
2871                _limit: usize,
2872            ) -> Result<ShardPollResult, AdapterError> {
2873                Ok(ShardPollResult {
2874                    events: Vec::new(),
2875                    next_id: None,
2876                    has_more: false,
2877                })
2878            }
2879            fn name(&self) -> &'static str {
2880                "empty"
2881            }
2882        }
2883
2884        let adapter: Arc<dyn Adapter> = Arc::new(EmptyAdapter);
2885        let merger = PollMerger::new(adapter, vec![0, 1]);
2886
2887        // Build a real composite cursor for the request to echo.
2888        let mut cursor = CompositeCursor::new();
2889        cursor.set(0, "1702-0".to_string());
2890        cursor.set(1, "1703-0".to_string());
2891        let encoded = cursor.encode().unwrap();
2892
2893        let mut req = ConsumeRequest::new(100);
2894        req.from_id = Some(encoded.clone());
2895
2896        let response = merger.poll(req).await.unwrap();
2897
2898        assert!(response.events.is_empty());
2899        assert_eq!(
2900            response.next_id.as_deref(),
2901            Some(encoded.as_str()),
2902            "stalled poll with input cursor must echo cursor back \
2903             (got {:?}); pre-fix this was None and callers paged \
2904             back to the stream's start",
2905            response.next_id,
2906        );
2907
2908        // Without an input cursor, no progress + no input → still
2909        // None (preserving the prior behavior of #50).
2910        let response_no_cursor = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2911        assert!(response_no_cursor.next_id.is_none());
2912    }
2913
2914    /// Regression: BUG_REPORT.md #52 — `sort_by_key(|e| e.insertion_ts)`
2915    /// is stable but ties across shards depend on `join_all`'s
2916    /// completion order, which is non-deterministic. Combined with
2917    /// `truncate(limit)`, this could drop or duplicate events at the
2918    /// limit boundary across consecutive polls. The fix breaks ties
2919    /// deterministically on `(shard_id, id)`.
2920    #[tokio::test]
2921    async fn sort_breaks_ties_deterministically_across_shards() {
2922        // Two shards with events that share `insertion_ts` so the
2923        // tiebreaker controls the order.
2924        let adapter = Arc::new(MockAdapter::new());
2925        adapter.add_events(
2926            0,
2927            vec![
2928                StoredEvent::from_value("0-a".to_string(), json!({}), 100, 0),
2929                StoredEvent::from_value("0-b".to_string(), json!({}), 100, 0),
2930            ],
2931        );
2932        adapter.add_events(
2933            1,
2934            vec![
2935                StoredEvent::from_value("1-a".to_string(), json!({}), 100, 1),
2936                StoredEvent::from_value("1-b".to_string(), json!({}), 100, 1),
2937            ],
2938        );
2939
2940        // Poll many times; the order must be stable.
2941        let merger = PollMerger::new(adapter, vec![0, 1]);
2942        let mut prior_order: Option<Vec<String>> = None;
2943        for iter in 0..20 {
2944            let r = merger
2945                .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
2946                .await
2947                .unwrap();
2948            let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
2949            if let Some(prev) = &prior_order {
2950                assert_eq!(
2951                    &ids, prev,
2952                    "iter {iter}: order is non-deterministic — sort tie-break failed (#52)"
2953                );
2954            }
2955            prior_order = Some(ids);
2956        }
2957
2958        // And the order must match `(shard_id, id)`.
2959        let r = merger
2960            .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
2961            .await
2962            .unwrap();
2963        let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
2964        assert_eq!(ids, vec!["0-a", "0-b", "1-a", "1-b"]);
2965    }
2966}