1use std::cmp::Ordering as CmpOrdering;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
20use serde::{Deserialize, Serialize};
21
22use crate::adapter::{Adapter, ShardPollResult};
23use crate::consumer::filter::Filter;
24use crate::error::{AdapterError, ConsumerError};
25use crate::event::StoredEvent;
26
27pub(crate) fn compare_stream_ids(a: &str, b: &str) -> CmpOrdering {
50 if let (Some((a_ms, a_seq)), Some((b_ms, b_seq))) = (split_redis_id(a), split_redis_id(b)) {
52 return (a_ms, a_seq).cmp(&(b_ms, b_seq));
53 }
54 if let (Ok(an), Ok(bn)) = (a.parse::<u128>(), b.parse::<u128>()) {
56 return an.cmp(&bn);
57 }
58 a.cmp(b)
60}
61
62fn split_redis_id(s: &str) -> Option<(u64, u64)> {
63 let (ms, seq) = s.split_once('-')?;
64 Some((ms.parse().ok()?, seq.parse().ok()?))
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub(crate) enum IdFormat {
73 Redis,
75 Numeric,
77 Opaque,
79}
80
81pub(crate) fn id_format(s: &str) -> IdFormat {
82 if split_redis_id(s).is_some() {
83 IdFormat::Redis
84 } else if s.parse::<u128>().is_ok() {
85 IdFormat::Numeric
86 } else {
87 IdFormat::Opaque
88 }
89}
90
91type CursorPos = Arc<str>;
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
98pub enum Ordering {
99 #[default]
101 None,
102 InsertionTs,
104}
105
106#[derive(Debug, Clone, Default, Serialize, Deserialize)]
108pub struct CompositeCursor {
109 #[serde(flatten)]
115 pub positions: HashMap<u16, CursorPos>,
116}
117
118impl CompositeCursor {
119 pub fn new() -> Self {
121 Self::default()
122 }
123
124 pub fn encode(&self) -> Result<String, ConsumerError> {
142 let json = serde_json::to_string(&self.positions).map_err(|e| {
143 ConsumerError::InvalidCursor(format!(
144 "CompositeCursor::encode failed to serialize positions \
145 (HashMap<u16, Arc<str>> should be infallible): {e}"
146 ))
147 })?;
148 Ok(BASE64.encode(json.as_bytes()))
149 }
150
151 pub fn decode(s: &str) -> Result<Self, ConsumerError> {
153 let bytes = BASE64
154 .decode(s)
155 .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
156
157 let raw_positions: HashMap<String, CursorPos> = serde_json::from_slice(&bytes)
168 .map_err(|e| ConsumerError::InvalidCursor(e.to_string()))?;
169 let mut positions: HashMap<u16, CursorPos> = HashMap::with_capacity(raw_positions.len());
170 for (key, val) in raw_positions {
171 let id: u16 = key.parse().map_err(|_| {
172 ConsumerError::InvalidCursor(format!("shard key {key:?} is not a valid u16"))
173 })?;
174 if id.to_string() != key {
179 return Err(ConsumerError::InvalidCursor(format!(
180 "non-canonical shard key {key:?} (parses to {id}, \
181 canonical form is {id})"
182 )));
183 }
184 if positions.insert(id, val).is_some() {
185 return Err(ConsumerError::InvalidCursor(format!(
189 "duplicate shard key {id} after canonicalization"
190 )));
191 }
192 }
193
194 Ok(Self { positions })
195 }
196
197 pub fn get(&self, shard_id: u16) -> Option<&str> {
199 self.positions.get(&shard_id).map(|s| s.as_ref())
200 }
201
202 pub fn set(&mut self, shard_id: u16, position: impl Into<CursorPos>) {
209 self.positions.insert(shard_id, position.into());
210 }
211
212 pub fn update_from_events(&mut self, events: &[StoredEvent]) {
224 for event in events {
225 let new_id = event.id.as_str();
226 match self.positions.get(&event.shard_id) {
227 Some(existing) => {
228 let existing_fmt = id_format(existing.as_ref());
245 let new_fmt = id_format(new_id);
246 if existing_fmt != new_fmt {
247 tracing::error!(
248 shard_id = event.shard_id,
249 existing = %existing,
250 new = %new_id,
251 existing_format = ?existing_fmt,
252 new_format = ?new_fmt,
253 "stream id format change detected — likely a \
254 backend migration (e.g. JetStream → Redis). \
255 Refusing to advance the cursor; operator must \
256 explicitly reset to consume from the new \
257 backend.",
258 );
259 continue;
260 }
261 if compare_stream_ids(existing.as_ref(), new_id) == CmpOrdering::Less {
262 self.positions.insert(event.shard_id, Arc::from(new_id));
263 }
264 }
267 None => {
268 self.positions.insert(event.shard_id, Arc::from(new_id));
269 }
270 }
271 }
272 }
273}
274
275#[derive(Debug, Clone, Default)]
277pub struct ConsumeRequest {
278 pub from_id: Option<String>,
280 pub limit: usize,
282 pub filter: Option<Filter>,
284 pub ordering: Ordering,
286 pub shards: Option<Vec<u16>>,
288}
289
290impl ConsumeRequest {
291 pub fn new(limit: usize) -> Self {
293 Self {
294 limit,
295 ..Default::default()
296 }
297 }
298
299 pub fn from(mut self, cursor: impl Into<String>) -> Self {
301 self.from_id = Some(cursor.into());
302 self
303 }
304
305 pub fn filter(mut self, filter: Filter) -> Self {
307 self.filter = Some(filter);
308 self
309 }
310
311 pub fn ordering(mut self, ordering: Ordering) -> Self {
313 self.ordering = ordering;
314 self
315 }
316
317 pub fn shards(mut self, shards: Vec<u16>) -> Self {
319 self.shards = Some(shards);
320 self
321 }
322}
323
324#[derive(Debug, Clone)]
326pub struct ConsumeResponse {
327 pub events: Vec<StoredEvent>,
329 pub next_id: Option<String>,
331 pub has_more: bool,
333 pub truncated_at_per_shard_cap: bool,
343 pub stalled_shards: Vec<u16>,
354 pub failed_shards: Vec<u16>,
371}
372
373impl ConsumeResponse {
374 pub fn empty() -> Self {
376 Self {
377 events: Vec::new(),
378 next_id: None,
379 has_more: false,
380 truncated_at_per_shard_cap: false,
381 stalled_shards: Vec::new(),
382 failed_shards: Vec::new(),
383 }
384 }
385}
386
387#[doc(hidden)]
400pub const PER_SHARD_FETCH_CAP: usize = 10_000;
401
402fn event_matches_filter(event: &StoredEvent, filter: &Filter) -> bool {
411 match event.parse() {
412 Ok(value) => filter.matches(&value),
413 Err(e) => {
414 tracing::warn!(
415 event_id = %event.id,
416 shard_id = event.shard_id,
417 error = %e,
418 "dropping unparseable event from filtered poll result"
419 );
420 false
421 }
422 }
423}
424
425pub struct PollMerger {
427 adapter: Arc<dyn Adapter>,
429 shard_ids: Vec<u16>,
443}
444
445impl PollMerger {
446 pub fn new(adapter: Arc<dyn Adapter>, shard_ids: Vec<u16>) -> Self {
453 Self { adapter, shard_ids }
454 }
455
456 pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
458 if request.limit == 0 {
459 return Ok(ConsumeResponse::empty());
460 }
461
462 let cursor = match &request.from_id {
464 Some(s) => CompositeCursor::decode(s)?,
465 None => CompositeCursor::new(),
466 };
467
468 let shards: Vec<u16> = request
470 .shards
471 .clone()
472 .unwrap_or_else(|| self.shard_ids.clone());
473
474 if shards.is_empty() {
475 return Ok(ConsumeResponse::empty());
476 }
477
478 let over_fetch_factor = if request.filter.is_some() { 3 } else { 2 };
489 let unclamped_per_shard = request
490 .limit
491 .div_ceil(shards.len())
492 .max(1)
493 .saturating_mul(over_fetch_factor);
494 let per_shard_limit = unclamped_per_shard.min(PER_SHARD_FETCH_CAP);
495 let truncated_at_per_shard_cap = unclamped_per_shard > PER_SHARD_FETCH_CAP;
496
497 let poll_futures: Vec<_> = shards
501 .iter()
502 .map(|&shard_id| {
503 let adapter = self.adapter.clone();
504 let from: Option<&str> = cursor.get(shard_id);
505 async move {
506 let result = adapter.poll_shard(shard_id, from, per_shard_limit).await;
507 (shard_id, result)
508 }
509 })
510 .collect();
511
512 let shard_results: Vec<(u16, Result<ShardPollResult, AdapterError>)> =
513 futures::future::join_all(poll_futures).await;
514
515 let total_events: usize = shard_results
518 .iter()
519 .filter_map(|(_, r)| r.as_ref().ok().map(|sr| sr.events.len()))
520 .sum();
521 let mut all_events = Vec::with_capacity(total_events);
522 let mut any_has_more = false;
523 let mut shards_reporting_has_more: Vec<u16> = Vec::new();
531 let mut failed_shards: Vec<u16> = Vec::new();
539 let mut new_cursor = if request.filter.is_some() {
543 Some(cursor.clone())
544 } else {
545 None
546 };
547
548 for (shard_id, result) in shard_results {
549 match result {
550 Ok(shard_result) => {
551 let ShardPollResult {
554 events,
555 next_id,
556 has_more,
557 } = shard_result;
558 if let (Some(nc), Some(next_id)) = (new_cursor.as_mut(), next_id) {
559 nc.set(shard_id, next_id);
560 }
561 if has_more {
562 any_has_more = true;
563 shards_reporting_has_more.push(shard_id);
564 }
565 all_events.extend(events);
566 }
567 Err(e) => {
568 tracing::warn!(
569 shard_id = shard_id,
570 error = %e,
571 "Failed to poll shard, skipping"
572 );
573 failed_shards.push(shard_id);
574 }
576 }
577 }
578
579 if let Some(filter) = &request.filter {
604 all_events.retain(|e| event_matches_filter(e, filter));
605 }
606
607 match request.ordering {
609 Ordering::None => {
610 }
612 Ordering::InsertionTs => {
613 all_events.sort_by(|a, b| {
642 a.insertion_ts
643 .cmp(&b.insertion_ts)
644 .then(a.shard_id.cmp(&b.shard_id))
645 .then(compare_stream_ids(&a.id, &b.id))
646 });
647 }
648 }
649
650 let mut matched_per_shard: std::collections::HashMap<u16, usize> =
656 std::collections::HashMap::new();
657 if request.filter.is_some() {
658 for e in &all_events {
659 *matched_per_shard.entry(e.shard_id).or_insert(0) += 1;
660 }
661 }
662
663 let had_extra = all_events.len() > request.limit;
665 all_events.truncate(request.limit);
666
667 let mut final_cursor = match new_cursor {
686 Some(nc) => nc,
687 None => cursor.clone(),
688 };
689
690 let mut rolled_back: std::collections::HashSet<u16> = std::collections::HashSet::new();
697 if request.filter.is_some() && had_extra {
698 let mut returned_per_shard: std::collections::HashMap<u16, usize> =
699 std::collections::HashMap::new();
700 for e in &all_events {
701 *returned_per_shard.entry(e.shard_id).or_insert(0) += 1;
702 }
703 for (shard_id, &total_matched) in &matched_per_shard {
704 let returned = returned_per_shard.get(shard_id).copied().unwrap_or(0);
705 if returned < total_matched {
706 match cursor.positions.get(shard_id) {
712 Some(orig) => final_cursor.set(*shard_id, orig.clone()),
713 None => {
714 final_cursor.positions.remove(shard_id);
715 }
716 }
717 rolled_back.insert(*shard_id);
718 }
719 }
720 }
721
722 let mut seen_shards: std::collections::HashSet<u16> =
747 std::collections::HashSet::with_capacity(shards.len());
748 for event in all_events.iter().rev() {
749 if seen_shards.insert(event.shard_id) {
750 let should_override =
751 request.filter.is_none() || rolled_back.contains(&event.shard_id);
752 if should_override {
753 final_cursor.set(event.shard_id, event.id.clone());
754 }
755 }
760 }
761
762 let cursor_advanced = final_cursor.positions != cursor.positions;
763 let all_filtered = request.filter.is_some() && all_events.is_empty() && cursor_advanced;
766 let we_made_progress = !all_events.is_empty() || cursor_advanced;
780 let has_more = (any_has_more || had_extra || all_filtered) && we_made_progress;
781 let stalled_shards: Vec<u16> = if any_has_more && !we_made_progress {
787 tracing::warn!(
788 stalled_shards = ?shards_reporting_has_more,
789 "PollMerger: an adapter reported has_more=true with no events \
790 and no cursor advance — suppressing to avoid caller infinite-loop"
791 );
792 shards_reporting_has_more
793 } else {
794 Vec::new()
795 };
796 let next_id = if we_made_progress {
805 Some(final_cursor.encode()?)
806 } else {
807 request.from_id.clone()
808 };
809
810 Ok(ConsumeResponse {
811 events: all_events,
812 next_id,
813 has_more,
814 truncated_at_per_shard_cap,
815 stalled_shards,
816 failed_shards,
817 })
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use super::*;
824 use serde_json::json;
825
826 #[test]
827 fn test_cursor_encode_decode() {
828 let mut cursor = CompositeCursor::new();
829 cursor.set(0, "1702123456789-0".to_string());
830 cursor.set(1, "1702123456790-0".to_string());
831 cursor.set(5, "1702123456795-0".to_string());
832
833 let encoded = cursor.encode().unwrap();
834 let decoded = CompositeCursor::decode(&encoded).unwrap();
835
836 assert_eq!(decoded.get(0), Some("1702123456789-0"));
837 assert_eq!(decoded.get(1), Some("1702123456790-0"));
838 assert_eq!(decoded.get(5), Some("1702123456795-0"));
839 assert_eq!(decoded.get(2), None);
840 }
841
842 #[test]
843 fn test_cursor_update_from_events() {
844 let mut cursor = CompositeCursor::new();
845
846 let events = vec![
847 StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
848 StoredEvent::from_value("200-0".to_string(), json!({}), 200, 1),
849 StoredEvent::from_value("150-0".to_string(), json!({}), 150, 0),
855 ];
856
857 cursor.update_from_events(&events);
858
859 assert_eq!(cursor.get(0), Some("150-0"));
861 assert_eq!(cursor.get(1), Some("200-0"));
862 }
863
864 #[test]
869 fn cursor_does_not_regress_on_unsorted_per_shard_events() {
870 let mut cursor = CompositeCursor::new();
871 let events = vec![
875 StoredEvent::from_value("200-0".to_string(), json!({}), 200, 0),
876 StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0),
877 ];
878 cursor.update_from_events(&events);
879 assert_eq!(
880 cursor.get(0),
881 Some("200-0"),
882 "cursor must hold the highest id, not the last-in-slice id",
883 );
884 }
885
886 #[test]
890 fn cursor_compare_and_set_is_per_shard() {
891 let mut cursor = CompositeCursor::new();
892 cursor.update_from_events(&[
893 StoredEvent::from_value("500-0".to_string(), json!({}), 500, 0),
894 StoredEvent::from_value("500-0".to_string(), json!({}), 500, 1),
895 ]);
896 cursor.update_from_events(&[
899 StoredEvent::from_value("100-0".to_string(), json!({}), 100, 0), StoredEvent::from_value("700-0".to_string(), json!({}), 700, 1), ]);
902 assert_eq!(cursor.get(0), Some("500-0"), "shard 0 must not regress");
903 assert_eq!(cursor.get(1), Some("700-0"), "shard 1 must advance");
904 }
905
906 #[test]
911 fn cursor_does_not_wedge_on_jetstream_decade_rollover() {
912 let mut cursor = CompositeCursor::new();
913 for seq in 1u64..=20 {
914 let ev = StoredEvent::from_value(seq.to_string(), json!({}), seq, 0);
915 cursor.update_from_events(&[ev]);
916 }
917 assert_eq!(
918 cursor.get(0),
919 Some("20"),
920 "cursor must reach 20; lex compare would wedge at \"9\""
921 );
922 }
923
924 #[test]
927 fn cursor_does_not_wedge_on_redis_seq_decade_rollover() {
928 let mut cursor = CompositeCursor::new();
929 for seq in 1u64..=20 {
932 let id = format!("1700000000000-{}", seq);
933 let ev = StoredEvent::from_value(id, json!({}), 1700000000000, 0);
934 cursor.update_from_events(&[ev]);
935 }
936 assert_eq!(
937 cursor.get(0),
938 Some("1700000000000-20"),
939 "cursor must reach -20; lex compare would wedge at -9"
940 );
941 }
942
943 #[test]
961 fn cursor_refuses_to_advance_across_backend_format_change() {
962 let mut cursor = CompositeCursor::new();
963 cursor.update_from_events(&[StoredEvent::from_value("42".to_string(), json!({}), 42, 0)]);
965 assert_eq!(cursor.get(0), Some("42"));
966
967 cursor.update_from_events(&[StoredEvent::from_value(
974 "1700000000000-0".to_string(),
975 json!({}),
976 1700000000000,
977 0,
978 )]);
979
980 assert_eq!(
985 cursor.get(0),
986 Some("42"),
987 "regression: cursor must not silently advance through a \
988 backend-format change. The pre-fix lex fallback also \
989 happened to keep the existing value (by `'4' > '1'`), \
990 but only by accident; this test pins the explicit \
991 format-mismatch refusal."
992 );
993
994 let mut cursor = CompositeCursor::new();
997 cursor.update_from_events(&[StoredEvent::from_value(
998 "1700000000000-0".to_string(),
999 json!({}),
1000 1700000000000,
1001 0,
1002 )]);
1003 cursor.update_from_events(&[StoredEvent::from_value(
1004 "9000".to_string(),
1005 json!({}),
1006 9000,
1007 0,
1008 )]);
1009 assert_eq!(
1010 cursor.get(0),
1011 Some("1700000000000-0"),
1012 "regression (reverse direction): Redis cursor must not be \
1013 silently overwritten by an incoming numeric id"
1014 );
1015 }
1016
1017 #[test]
1019 fn cursor_advances_from_unpadded_9_to_unpadded_10() {
1020 let mut cursor = CompositeCursor::new();
1021 cursor.update_from_events(&[StoredEvent::from_value("9".to_string(), json!({}), 9, 0)]);
1022 cursor.update_from_events(&[StoredEvent::from_value("10".to_string(), json!({}), 10, 0)]);
1023 assert_eq!(cursor.get(0), Some("10"));
1024 }
1025
1026 #[test]
1030 fn cursor_advances_on_ulid_ids_via_lex_fallback() {
1031 let mut cursor = CompositeCursor::new();
1032 let earlier = "01HZ0000000000000000000000";
1035 let later = "01HZ0000010000000000000000";
1036 cursor.update_from_events(&[StoredEvent::from_value(
1037 earlier.to_string(),
1038 json!({}),
1039 1,
1040 0,
1041 )]);
1042 cursor.update_from_events(&[StoredEvent::from_value(later.to_string(), json!({}), 2, 0)]);
1043 assert_eq!(cursor.get(0), Some(later));
1044 cursor.update_from_events(&[StoredEvent::from_value(
1046 earlier.to_string(),
1047 json!({}),
1048 1,
1049 0,
1050 )]);
1051 assert_eq!(cursor.get(0), Some(later));
1052 }
1053
1054 #[test]
1056 fn compare_stream_ids_handles_known_formats() {
1057 assert_eq!(compare_stream_ids("9", "10"), CmpOrdering::Less);
1059 assert_eq!(compare_stream_ids("10", "9"), CmpOrdering::Greater);
1060 assert_eq!(compare_stream_ids("100", "100"), CmpOrdering::Equal);
1061 assert_eq!(compare_stream_ids("00000010", "9"), CmpOrdering::Greater);
1063
1064 assert_eq!(
1066 compare_stream_ids("1700-9", "1700-10"),
1067 CmpOrdering::Less,
1068 "seq must compare numerically, not lex"
1069 );
1070 assert_eq!(
1071 compare_stream_ids("1700-9", "1701-0"),
1072 CmpOrdering::Less,
1073 "ms wins over seq"
1074 );
1075 assert_eq!(
1076 compare_stream_ids("1700-100", "1700-9"),
1077 CmpOrdering::Greater
1078 );
1079
1080 let ulid_a = "01HZ0000000000000000000000";
1082 let ulid_b = "01HZ0000010000000000000000";
1083 assert_eq!(compare_stream_ids(ulid_a, ulid_b), CmpOrdering::Less);
1084
1085 let _ = compare_stream_ids("1700-0", "9999");
1089 }
1090
1091 #[test]
1095 fn insertion_ts_sort_breaks_tie_on_id_numerically() {
1096 let mut events = [
1098 StoredEvent::from_value("10".to_string(), json!({}), 1000, 0),
1099 StoredEvent::from_value("9".to_string(), json!({}), 1000, 0),
1100 StoredEvent::from_value("11".to_string(), json!({}), 1000, 0),
1101 ];
1102 events.sort_by(|a, b| {
1103 a.insertion_ts
1104 .cmp(&b.insertion_ts)
1105 .then(a.shard_id.cmp(&b.shard_id))
1106 .then(compare_stream_ids(&a.id, &b.id))
1107 });
1108 let ordered: Vec<&str> = events.iter().map(|e| e.id.as_str()).collect();
1109 assert_eq!(
1110 ordered,
1111 vec!["9", "10", "11"],
1112 "id tiebreak must be numeric, not lex"
1113 );
1114 }
1115
1116 #[test]
1117 fn test_consume_request_builder() {
1118 let request = ConsumeRequest::new(100)
1119 .from("some_cursor")
1120 .ordering(Ordering::InsertionTs)
1121 .shards(vec![0, 1, 2])
1122 .filter(Filter::eq("type", json!("token")));
1123
1124 assert_eq!(request.limit, 100);
1125 assert_eq!(request.from_id, Some("some_cursor".to_string()));
1126 assert_eq!(request.ordering, Ordering::InsertionTs);
1127 assert_eq!(request.shards, Some(vec![0, 1, 2]));
1128 assert!(request.filter.is_some());
1129 }
1130
1131 #[test]
1132 fn test_invalid_cursor() {
1133 let result = CompositeCursor::decode("not_valid_base64!!!");
1134 assert!(result.is_err());
1135
1136 let result = CompositeCursor::decode(&BASE64.encode(b"not json"));
1138 assert!(result.is_err());
1139 }
1140
1141 #[test]
1149 fn cursor_decode_rejects_non_canonical_shard_keys() {
1150 let hostile = br#"{"00":"id_a","1":"id_b"}"#;
1153 let encoded = BASE64.encode(hostile);
1154 let result = CompositeCursor::decode(&encoded);
1155 assert!(
1156 result.is_err(),
1157 "non-canonical shard key `\"00\"` must reject; \
1158 pre-fix this silently parsed as shard 0"
1159 );
1160
1161 let canonical = br#"{"0":"id_a","1":"id_b"}"#;
1164 let encoded_ok = BASE64.encode(canonical);
1165 let cursor =
1166 CompositeCursor::decode(&encoded_ok).expect("canonical shard keys must decode cleanly");
1167 assert_eq!(cursor.get(0), Some("id_a"));
1168 assert_eq!(cursor.get(1), Some("id_b"));
1169 }
1170
1171 #[test]
1172 fn test_composite_cursor_new() {
1173 let cursor = CompositeCursor::new();
1174 assert!(cursor.positions.is_empty());
1175 }
1176
1177 #[test]
1178 fn test_composite_cursor_default() {
1179 let cursor = CompositeCursor::default();
1180 assert!(cursor.positions.is_empty());
1181 }
1182
1183 #[test]
1184 fn test_composite_cursor_get_nonexistent() {
1185 let cursor = CompositeCursor::new();
1186 assert!(cursor.get(0).is_none());
1187 assert!(cursor.get(100).is_none());
1188 }
1189
1190 #[test]
1191 fn test_composite_cursor_set_overwrites() {
1192 let mut cursor = CompositeCursor::new();
1193 cursor.set(0, "first".to_string());
1194 assert_eq!(cursor.get(0), Some("first"));
1195
1196 cursor.set(0, "second".to_string());
1197 assert_eq!(cursor.get(0), Some("second"));
1198 }
1199
1200 #[test]
1201 fn test_composite_cursor_empty_encode() {
1202 let cursor = CompositeCursor::new();
1203 let encoded = cursor.encode().unwrap();
1204 let decoded = CompositeCursor::decode(&encoded).unwrap();
1205 assert!(decoded.positions.is_empty());
1206 }
1207
1208 #[test]
1209 fn test_composite_cursor_clone() {
1210 let mut cursor = CompositeCursor::new();
1211 cursor.set(0, "pos-0".to_string());
1212 cursor.set(1, "pos-1".to_string());
1213
1214 let cloned = cursor.clone();
1215 assert_eq!(cloned.get(0), Some("pos-0"));
1216 assert_eq!(cloned.get(1), Some("pos-1"));
1217 }
1218
1219 #[test]
1220 fn test_composite_cursor_debug() {
1221 let mut cursor = CompositeCursor::new();
1222 cursor.set(0, "test".to_string());
1223 let debug = format!("{:?}", cursor);
1224 assert!(debug.contains("CompositeCursor"));
1225 assert!(debug.contains("positions"));
1226 }
1227
1228 #[test]
1229 fn test_ordering_default() {
1230 let ordering = Ordering::default();
1231 assert_eq!(ordering, Ordering::None);
1232 }
1233
1234 #[test]
1235 fn test_ordering_clone_copy() {
1236 let ordering = Ordering::InsertionTs;
1237 let cloned = ordering;
1238 assert_eq!(cloned, Ordering::InsertionTs);
1239 }
1240
1241 #[test]
1242 fn test_ordering_debug() {
1243 assert!(format!("{:?}", Ordering::None).contains("None"));
1244 assert!(format!("{:?}", Ordering::InsertionTs).contains("InsertionTs"));
1245 }
1246
1247 #[test]
1248 fn test_consume_request_new() {
1249 let request = ConsumeRequest::new(50);
1250 assert_eq!(request.limit, 50);
1251 assert!(request.from_id.is_none());
1252 assert!(request.filter.is_none());
1253 assert_eq!(request.ordering, Ordering::None);
1254 assert!(request.shards.is_none());
1255 }
1256
1257 #[test]
1258 fn test_consume_request_default() {
1259 let request = ConsumeRequest::default();
1260 assert_eq!(request.limit, 0);
1261 assert!(request.from_id.is_none());
1262 assert!(request.filter.is_none());
1263 assert_eq!(request.ordering, Ordering::None);
1264 assert!(request.shards.is_none());
1265 }
1266
1267 #[test]
1268 fn test_consume_request_from_string() {
1269 let request = ConsumeRequest::new(10).from(String::from("cursor123"));
1270 assert_eq!(request.from_id, Some("cursor123".to_string()));
1271 }
1272
1273 #[test]
1274 fn test_consume_request_clone() {
1275 let request = ConsumeRequest::new(100)
1276 .from("cursor")
1277 .ordering(Ordering::InsertionTs)
1278 .shards(vec![0, 1]);
1279
1280 let cloned = request.clone();
1281 assert_eq!(cloned.limit, 100);
1282 assert_eq!(cloned.from_id, Some("cursor".to_string()));
1283 assert_eq!(cloned.ordering, Ordering::InsertionTs);
1284 assert_eq!(cloned.shards, Some(vec![0, 1]));
1285 }
1286
1287 #[test]
1288 fn test_consume_request_debug() {
1289 let request = ConsumeRequest::new(10);
1290 let debug = format!("{:?}", request);
1291 assert!(debug.contains("ConsumeRequest"));
1292 assert!(debug.contains("limit"));
1293 }
1294
1295 #[test]
1296 fn test_consume_response_empty() {
1297 let response = ConsumeResponse::empty();
1298 assert!(response.events.is_empty());
1299 assert!(response.next_id.is_none());
1300 assert!(!response.has_more);
1301 }
1302
1303 #[test]
1304 fn test_consume_response_clone() {
1305 let mut response = ConsumeResponse::empty();
1306 response.next_id = Some("cursor".to_string());
1307 response.has_more = true;
1308
1309 let cloned = response.clone();
1310 assert_eq!(cloned.next_id, Some("cursor".to_string()));
1311 assert!(cloned.has_more);
1312 }
1313
1314 #[test]
1315 fn test_consume_response_debug() {
1316 let response = ConsumeResponse::empty();
1317 let debug = format!("{:?}", response);
1318 assert!(debug.contains("ConsumeResponse"));
1319 assert!(debug.contains("events"));
1320 }
1321
1322 #[test]
1323 fn test_cursor_update_from_empty_events() {
1324 let mut cursor = CompositeCursor::new();
1325 cursor.set(0, "original".to_string());
1326
1327 let events: Vec<StoredEvent> = vec![];
1328 cursor.update_from_events(&events);
1329
1330 assert_eq!(cursor.get(0), Some("original"));
1332 }
1333
1334 #[test]
1335 fn test_cursor_many_shards() {
1336 let mut cursor = CompositeCursor::new();
1337 for i in 0..100u16 {
1338 cursor.set(i, format!("pos-{}", i));
1339 }
1340
1341 let encoded = cursor.encode().unwrap();
1342 let decoded = CompositeCursor::decode(&encoded).unwrap();
1343
1344 for i in 0..100u16 {
1345 assert_eq!(decoded.get(i), Some(format!("pos-{}", i).as_str()));
1346 }
1347 }
1348
1349 #[test]
1350 fn test_consume_request_empty_shards() {
1351 let request = ConsumeRequest::new(100).shards(vec![]);
1352 assert_eq!(request.shards, Some(vec![]));
1353 }
1354
1355 #[test]
1356 fn test_consume_request_ordering_none() {
1357 let request = ConsumeRequest::new(100).ordering(Ordering::None);
1358 assert_eq!(request.ordering, Ordering::None);
1359 }
1360
1361 #[test]
1362 fn test_ordering_equality() {
1363 assert_eq!(Ordering::None, Ordering::None);
1364 assert_eq!(Ordering::InsertionTs, Ordering::InsertionTs);
1365 assert_ne!(Ordering::None, Ordering::InsertionTs);
1366 }
1367
1368 use crate::adapter::{Adapter, ShardPollResult};
1370 use crate::error::AdapterError;
1371 use crate::event::Batch;
1372 use async_trait::async_trait;
1373 use std::collections::HashMap;
1374 use std::sync::RwLock;
1375
1376 struct MockAdapter {
1377 events: RwLock<HashMap<u16, Vec<StoredEvent>>>,
1378 }
1379
1380 impl MockAdapter {
1381 fn new() -> Self {
1382 Self {
1383 events: RwLock::new(HashMap::new()),
1384 }
1385 }
1386
1387 fn add_events(&self, shard_id: u16, events: Vec<StoredEvent>) {
1388 let mut map = self.events.write().unwrap();
1389 map.entry(shard_id).or_default().extend(events);
1390 }
1391 }
1392
1393 #[async_trait]
1394 impl Adapter for MockAdapter {
1395 async fn init(&mut self) -> Result<(), AdapterError> {
1396 Ok(())
1397 }
1398
1399 async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
1400 Ok(())
1401 }
1402
1403 async fn flush(&self) -> Result<(), AdapterError> {
1404 Ok(())
1405 }
1406
1407 async fn shutdown(&self) -> Result<(), AdapterError> {
1408 Ok(())
1409 }
1410
1411 async fn poll_shard(
1412 &self,
1413 shard_id: u16,
1414 from_id: Option<&str>,
1415 limit: usize,
1416 ) -> Result<ShardPollResult, AdapterError> {
1417 let map = self.events.read().unwrap();
1418 let events = map.get(&shard_id).cloned().unwrap_or_default();
1419
1420 let filtered: Vec<_> = if let Some(from) = from_id {
1422 events
1423 .into_iter()
1424 .skip_while(|e| e.id != from)
1425 .skip(1) .collect()
1427 } else {
1428 events
1429 };
1430
1431 let has_more = filtered.len() > limit;
1432 let events: Vec<_> = filtered.into_iter().take(limit).collect();
1433 let next_id = events.last().map(|e| e.id.clone());
1434
1435 Ok(ShardPollResult {
1436 events,
1437 next_id,
1438 has_more,
1439 })
1440 }
1441
1442 fn name(&self) -> &'static str {
1443 "mock"
1444 }
1445 }
1446
1447 #[tokio::test]
1448 async fn test_poll_merger_new() {
1449 let adapter = Arc::new(MockAdapter::new());
1450 let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1451 assert_eq!(merger.shard_ids, vec![0, 1, 2, 3]);
1452 }
1453
1454 #[tokio::test]
1464 async fn poll_merger_default_shards_uses_active_id_set_after_scale_down() {
1465 let adapter = Arc::new(MockAdapter::new());
1466
1467 adapter.add_events(
1473 1,
1474 vec![StoredEvent::from_value(
1475 "1-a".to_string(),
1476 json!({"shard": 1}),
1477 100,
1478 1,
1479 )],
1480 );
1481 adapter.add_events(
1482 2,
1483 vec![StoredEvent::from_value(
1484 "2-a".to_string(),
1485 json!({"shard": 2}),
1486 200,
1487 2,
1488 )],
1489 );
1490
1491 let merger = PollMerger::new(adapter, vec![1, 2]);
1493
1494 let request = ConsumeRequest::new(100);
1496 let response = merger.poll(request).await.unwrap();
1497
1498 let returned: std::collections::HashSet<u16> =
1499 response.events.iter().map(|e| e.shard_id).collect();
1500 assert!(
1501 returned.contains(&1),
1502 "default-shards poll must include shard 1 (active)",
1503 );
1504 assert!(
1505 returned.contains(&2),
1506 "default-shards poll must include shard 2 — pre-fix this was silently \
1507 skipped because the merger generated `0..num_shards` = `[0, 1]`",
1508 );
1509 assert!(
1510 !returned.contains(&0),
1511 "default-shards poll must NOT touch shard 0 — it was evicted",
1512 );
1513 assert_eq!(response.events.len(), 2);
1514 }
1515
1516 #[tokio::test]
1517 async fn test_poll_merger_empty_limit() {
1518 let adapter = Arc::new(MockAdapter::new());
1519 let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1520
1521 let request = ConsumeRequest::new(0);
1522 let response = merger.poll(request).await.unwrap();
1523
1524 assert!(response.events.is_empty());
1525 assert!(response.next_id.is_none());
1526 assert!(!response.has_more);
1527 }
1528
1529 #[tokio::test]
1530 async fn test_poll_merger_empty_shards() {
1531 let adapter = Arc::new(MockAdapter::new());
1532 let merger = PollMerger::new(adapter, vec![0, 1, 2, 3]);
1533
1534 let request = ConsumeRequest::new(100).shards(vec![]);
1535 let response = merger.poll(request).await.unwrap();
1536
1537 assert!(response.events.is_empty());
1538 assert!(response.next_id.is_none());
1539 assert!(!response.has_more);
1540 }
1541
1542 #[tokio::test]
1552 async fn poll_response_surfaces_failed_shard_ids() {
1553 struct FailingShardMock {
1555 inner: MockAdapter,
1556 fail_shard: u16,
1557 }
1558
1559 #[async_trait]
1560 impl Adapter for FailingShardMock {
1561 async fn init(&mut self) -> Result<(), AdapterError> {
1562 Ok(())
1563 }
1564 async fn on_batch(&self, _b: Batch) -> Result<(), AdapterError> {
1565 Ok(())
1566 }
1567 async fn flush(&self) -> Result<(), AdapterError> {
1568 Ok(())
1569 }
1570 async fn shutdown(&self) -> Result<(), AdapterError> {
1571 Ok(())
1572 }
1573 async fn poll_shard(
1574 &self,
1575 shard_id: u16,
1576 from_id: Option<&str>,
1577 limit: usize,
1578 ) -> Result<ShardPollResult, AdapterError> {
1579 if shard_id == self.fail_shard {
1580 return Err(AdapterError::Transient(format!(
1581 "synthetic failure on shard {shard_id}"
1582 )));
1583 }
1584 self.inner.poll_shard(shard_id, from_id, limit).await
1585 }
1586 fn name(&self) -> &'static str {
1587 "failing-mock"
1588 }
1589 }
1590
1591 let inner = MockAdapter::new();
1592 inner.add_events(
1594 0,
1595 vec![StoredEvent::from_value(
1596 "0-1".to_string(),
1597 json!({"shard": 0}),
1598 100,
1599 0,
1600 )],
1601 );
1602 inner.add_events(
1603 2,
1604 vec![StoredEvent::from_value(
1605 "2-1".to_string(),
1606 json!({"shard": 2}),
1607 100,
1608 2,
1609 )],
1610 );
1611
1612 let adapter = Arc::new(FailingShardMock {
1613 inner,
1614 fail_shard: 1,
1615 });
1616 let merger = PollMerger::new(adapter, vec![0, 1, 2]);
1617
1618 let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
1619
1620 assert_eq!(
1622 response.events.len(),
1623 2,
1624 "events from non-failing shards must still be returned"
1625 );
1626
1627 assert_eq!(
1629 response.failed_shards,
1630 vec![1],
1631 "regression: failed_shards must list the shard whose adapter \
1632 errored. Pre-fix this list didn't exist; observers couldn't \
1633 tell which shard was missing without log scraping."
1634 );
1635 }
1636
1637 #[tokio::test]
1638 async fn test_poll_merger_with_events() {
1639 let adapter = Arc::new(MockAdapter::new());
1640
1641 adapter.add_events(
1643 0,
1644 vec![
1645 StoredEvent::from_value("0-1".to_string(), json!({"type": "a"}), 100, 0),
1646 StoredEvent::from_value("0-2".to_string(), json!({"type": "b"}), 200, 0),
1647 ],
1648 );
1649
1650 adapter.add_events(
1652 1,
1653 vec![StoredEvent::from_value(
1654 "1-1".to_string(),
1655 json!({"type": "c"}),
1656 150,
1657 1,
1658 )],
1659 );
1660
1661 let merger = PollMerger::new(adapter, vec![0, 1]);
1662
1663 let request = ConsumeRequest::new(100);
1664 let response = merger.poll(request).await.unwrap();
1665
1666 assert_eq!(response.events.len(), 3);
1667 assert!(response.next_id.is_some());
1668 }
1669
1670 #[tokio::test]
1671 async fn test_poll_merger_with_ordering() {
1672 let adapter = Arc::new(MockAdapter::new());
1673
1674 adapter.add_events(
1676 0,
1677 vec![
1678 StoredEvent::from_value("0-1".to_string(), json!({}), 300, 0),
1679 StoredEvent::from_value("0-2".to_string(), json!({}), 100, 0),
1680 ],
1681 );
1682 adapter.add_events(
1683 1,
1684 vec![StoredEvent::from_value(
1685 "1-1".to_string(),
1686 json!({}),
1687 200,
1688 1,
1689 )],
1690 );
1691
1692 let merger = PollMerger::new(adapter, vec![0, 1]);
1693
1694 let request = ConsumeRequest::new(100).ordering(Ordering::InsertionTs);
1695 let response = merger.poll(request).await.unwrap();
1696
1697 assert_eq!(response.events.len(), 3);
1699 assert_eq!(response.events[0].insertion_ts, 100);
1700 assert_eq!(response.events[1].insertion_ts, 200);
1701 assert_eq!(response.events[2].insertion_ts, 300);
1702 }
1703
1704 #[tokio::test]
1705 async fn test_poll_merger_with_filter() {
1706 let adapter = Arc::new(MockAdapter::new());
1707
1708 adapter.add_events(
1709 0,
1710 vec![
1711 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
1712 StoredEvent::from_value("0-2".to_string(), json!({"type": "message"}), 200, 0),
1713 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 300, 0),
1714 ],
1715 );
1716
1717 let merger = PollMerger::new(adapter, vec![0]);
1718
1719 let request = ConsumeRequest::new(100).filter(Filter::eq("type", json!("token")));
1720 let response = merger.poll(request).await.unwrap();
1721
1722 assert_eq!(response.events.len(), 2);
1723 for event in &response.events {
1724 assert!(event.raw_str().unwrap().contains("token"));
1725 }
1726 }
1727
1728 #[tokio::test]
1729 async fn test_poll_merger_with_limit() {
1730 let adapter = Arc::new(MockAdapter::new());
1731
1732 adapter.add_events(
1733 0,
1734 vec![
1735 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1736 StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
1737 StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
1738 ],
1739 );
1740
1741 let merger = PollMerger::new(adapter, vec![0]);
1742
1743 let request = ConsumeRequest::new(2);
1744 let response = merger.poll(request).await.unwrap();
1745
1746 assert_eq!(response.events.len(), 2);
1747 assert!(response.has_more);
1748 }
1749
1750 #[tokio::test]
1751 async fn test_poll_merger_specific_shards() {
1752 let adapter = Arc::new(MockAdapter::new());
1753
1754 adapter.add_events(
1755 0,
1756 vec![StoredEvent::from_value(
1757 "0-1".to_string(),
1758 json!({"shard": 0}),
1759 100,
1760 0,
1761 )],
1762 );
1763 adapter.add_events(
1764 1,
1765 vec![StoredEvent::from_value(
1766 "1-1".to_string(),
1767 json!({"shard": 1}),
1768 100,
1769 1,
1770 )],
1771 );
1772 adapter.add_events(
1773 2,
1774 vec![StoredEvent::from_value(
1775 "2-1".to_string(),
1776 json!({"shard": 2}),
1777 100,
1778 2,
1779 )],
1780 );
1781
1782 let merger = PollMerger::new(adapter, vec![0, 1, 2]);
1783
1784 let request = ConsumeRequest::new(100).shards(vec![0, 2]);
1786 let response = merger.poll(request).await.unwrap();
1787
1788 assert_eq!(response.events.len(), 2);
1789 let shard_ids: Vec<_> = response.events.iter().map(|e| e.shard_id).collect();
1790 assert!(shard_ids.contains(&0));
1791 assert!(shard_ids.contains(&2));
1792 assert!(!shard_ids.contains(&1));
1793 }
1794
1795 #[tokio::test]
1796 async fn test_poll_merger_with_cursor() {
1797 let adapter = Arc::new(MockAdapter::new());
1798
1799 adapter.add_events(
1800 0,
1801 vec![
1802 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1803 StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
1804 StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
1805 ],
1806 );
1807
1808 let merger = PollMerger::new(adapter, vec![0]);
1809
1810 let request = ConsumeRequest::new(2);
1812 let response1 = merger.poll(request).await.unwrap();
1813 assert_eq!(response1.events.len(), 2);
1814
1815 let cursor = response1.next_id.unwrap();
1817 let request2 = ConsumeRequest::new(10).from(cursor);
1818 let response2 = merger.poll(request2).await.unwrap();
1819
1820 assert_eq!(response2.events.len(), 1);
1821 assert_eq!(response2.events[0].id, "0-3");
1822 }
1823
1824 #[tokio::test]
1825 async fn test_poll_merger_pagination_multi_shard() {
1826 let adapter = Arc::new(MockAdapter::new());
1828
1829 let shard0_events: Vec<_> = (1..=10)
1831 .map(|i| {
1832 StoredEvent::from_value(
1833 format!("0-{}", i),
1834 json!({"shard": 0, "idx": i}),
1835 i as u64 * 10,
1836 0,
1837 )
1838 })
1839 .collect();
1840 adapter.add_events(0, shard0_events);
1841
1842 let shard1_events: Vec<_> = (1..=15)
1844 .map(|i| {
1845 StoredEvent::from_value(
1846 format!("1-{}", i),
1847 json!({"shard": 1, "idx": i}),
1848 i as u64 * 10 + 5,
1849 1,
1850 )
1851 })
1852 .collect();
1853 adapter.add_events(1, shard1_events);
1854
1855 let merger = PollMerger::new(adapter, vec![0, 1]);
1856
1857 let mut all_events = Vec::new();
1859 let mut cursor: Option<String> = None;
1860 let mut iterations = 0;
1861
1862 loop {
1863 iterations += 1;
1864 let request = match &cursor {
1865 Some(c) => ConsumeRequest::new(10).from(c.clone()),
1866 None => ConsumeRequest::new(10),
1867 };
1868
1869 let response = merger.poll(request).await.unwrap();
1870 all_events.extend(response.events);
1871
1872 if !response.has_more {
1873 break;
1874 }
1875 cursor = response.next_id;
1876
1877 if iterations > 10 {
1879 panic!("Too many iterations");
1880 }
1881 }
1882
1883 assert_eq!(
1885 all_events.len(),
1886 25,
1887 "Expected 25 events, got {}. Iterations: {}",
1888 all_events.len(),
1889 iterations
1890 );
1891
1892 let shard0_count = all_events.iter().filter(|e| e.shard_id == 0).count();
1894 let shard1_count = all_events.iter().filter(|e| e.shard_id == 1).count();
1895 assert_eq!(shard0_count, 10, "Expected 10 events from shard 0");
1896 assert_eq!(shard1_count, 15, "Expected 15 events from shard 1");
1897 }
1898
1899 #[tokio::test]
1900 async fn test_poll_merger_pagination_no_duplicates() {
1901 let adapter = Arc::new(MockAdapter::new());
1903
1904 for shard_id in 0..2u16 {
1906 let events: Vec<_> = (1..=20)
1907 .map(|i| {
1908 StoredEvent::from_value(
1909 format!("{}-{}", shard_id, i),
1910 json!({"shard": shard_id, "idx": i}),
1911 i as u64 * 10,
1912 shard_id,
1913 )
1914 })
1915 .collect();
1916 adapter.add_events(shard_id, events);
1917 }
1918
1919 let merger = PollMerger::new(adapter, vec![0, 1]);
1920
1921 let mut all_event_ids = Vec::new();
1923 let mut cursor: Option<String> = None;
1924
1925 for _ in 0..20 {
1926 let request = match &cursor {
1927 Some(c) => ConsumeRequest::new(5).from(c.clone()),
1928 None => ConsumeRequest::new(5),
1929 };
1930
1931 let response = merger.poll(request).await.unwrap();
1932 all_event_ids.extend(response.events.iter().map(|e| e.id.clone()));
1933
1934 if !response.has_more {
1935 break;
1936 }
1937 cursor = response.next_id;
1938 }
1939
1940 let unique_count = {
1942 let mut ids = all_event_ids.clone();
1943 ids.sort();
1944 ids.dedup();
1945 ids.len()
1946 };
1947
1948 assert_eq!(
1949 unique_count,
1950 all_event_ids.len(),
1951 "Found duplicate events! Total: {}, Unique: {}",
1952 all_event_ids.len(),
1953 unique_count
1954 );
1955
1956 assert_eq!(all_event_ids.len(), 40);
1958 }
1959
1960 #[tokio::test]
1961 async fn test_poll_merger_pagination_with_ordering() {
1962 let adapter = Arc::new(MockAdapter::new());
1964
1965 adapter.add_events(
1967 0,
1968 vec![
1969 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
1970 StoredEvent::from_value("0-2".to_string(), json!({}), 300, 0),
1971 StoredEvent::from_value("0-3".to_string(), json!({}), 500, 0),
1972 ],
1973 );
1974 adapter.add_events(
1975 1,
1976 vec![
1977 StoredEvent::from_value("1-1".to_string(), json!({}), 200, 1),
1978 StoredEvent::from_value("1-2".to_string(), json!({}), 400, 1),
1979 ],
1980 );
1981
1982 let merger = PollMerger::new(adapter, vec![0, 1]);
1983
1984 let mut all_events = Vec::new();
1986 let mut cursor: Option<String> = None;
1987
1988 for _ in 0..5 {
1989 let mut request = ConsumeRequest::new(2).ordering(Ordering::InsertionTs);
1990 if let Some(c) = &cursor {
1991 request = request.from(c.clone());
1992 }
1993
1994 let response = merger.poll(request).await.unwrap();
1995 all_events.extend(response.events);
1996
1997 if !response.has_more {
1998 break;
1999 }
2000 cursor = response.next_id;
2001 }
2002
2003 assert_eq!(all_events.len(), 5);
2005
2006 let timestamps: Vec<_> = all_events.iter().map(|e| e.insertion_ts).collect();
2008 let mut sorted = timestamps.clone();
2009 sorted.sort();
2010 assert_eq!(timestamps, sorted, "Events should be sorted by timestamp");
2011 }
2012
2013 #[tokio::test]
2014 async fn test_poll_merger_cursor_tracks_returned_events_only() {
2015 let adapter = Arc::new(MockAdapter::new());
2017
2018 adapter.add_events(
2020 0,
2021 vec![
2022 StoredEvent::from_value("0-1".to_string(), json!({}), 100, 0),
2023 StoredEvent::from_value("0-2".to_string(), json!({}), 200, 0),
2024 StoredEvent::from_value("0-3".to_string(), json!({}), 300, 0),
2025 ],
2026 );
2027
2028 adapter.add_events(
2030 1,
2031 vec![
2032 StoredEvent::from_value("1-1".to_string(), json!({}), 150, 1),
2033 StoredEvent::from_value("1-2".to_string(), json!({}), 250, 1),
2034 StoredEvent::from_value("1-3".to_string(), json!({}), 350, 1),
2035 ],
2036 );
2037
2038 let merger = PollMerger::new(adapter, vec![0, 1]);
2039
2040 let response1 = merger.poll(ConsumeRequest::new(2)).await.unwrap();
2042 assert_eq!(response1.events.len(), 2);
2043 assert!(response1.has_more);
2044
2045 let next_id = response1.next_id.clone().unwrap();
2047 let cursor = CompositeCursor::decode(&next_id).unwrap();
2048
2049 let returned_shard_ids: std::collections::HashSet<_> =
2051 response1.events.iter().map(|e| e.shard_id).collect();
2052
2053 for shard_id in 0..2u16 {
2054 if returned_shard_ids.contains(&shard_id) {
2055 assert!(
2057 cursor.get(shard_id).is_some(),
2058 "Cursor should have position for shard {} which had returned events",
2059 shard_id
2060 );
2061 }
2062 }
2063
2064 let response2 = merger
2066 .poll(ConsumeRequest::new(10).from(next_id))
2067 .await
2068 .unwrap();
2069
2070 assert_eq!(response2.events.len(), 4, "Should get remaining 4 events");
2072 }
2073
2074 #[tokio::test]
2079 async fn poll_merger_surfaces_per_shard_cap_truncation() {
2080 let adapter = Arc::new(MockAdapter::new());
2081 adapter.add_events(
2085 0,
2086 (0..1)
2087 .map(|i| StoredEvent::from_value(format!("0-{}", i), json!({}), 100, 0))
2088 .collect(),
2089 );
2090
2091 let merger = PollMerger::new(adapter, vec![0]);
2092 let response = merger.poll(ConsumeRequest::new(50_000)).await.unwrap();
2093 assert!(
2094 response.truncated_at_per_shard_cap,
2095 "large limit must flag the per-shard cap clamp",
2096 );
2097 }
2098
2099 #[tokio::test]
2114 async fn poll_merger_does_not_stall_on_single_shard_filter_under_cap() {
2115 let adapter = Arc::new(MockAdapter::new());
2120 let mut events: Vec<StoredEvent> = (0..50)
2121 .map(|i| {
2122 StoredEvent::from_value(
2123 format!("0-{}", i),
2124 json!({"keep": true}),
2125 (i + 1) as u64,
2126 0,
2127 )
2128 })
2129 .collect();
2130 adapter.add_events(0, events.split_off(0));
2131
2132 let merger = PollMerger::new(adapter.clone(), vec![0]);
2133 let make_request = |from_id: Option<String>| ConsumeRequest {
2135 limit: 10,
2136 from_id,
2137 shards: None,
2138 filter: Some(Filter::eq("keep", json!(true))),
2139 ordering: Ordering::None,
2140 };
2141
2142 let mut cursor: Option<String> = None;
2143 let mut total = 0;
2144 let mut polls = 0;
2145 loop {
2146 polls += 1;
2147 assert!(
2148 polls < 20,
2149 "poll loop must terminate; got {} polls without draining \
2150 (cursor={:?}, total={})",
2151 polls,
2152 cursor,
2153 total,
2154 );
2155 let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2156 total += response.events.len();
2157 let new_cursor = response.next_id.clone();
2158 assert!(
2159 new_cursor.is_some(),
2160 "response must surface a cursor on every progress poll \
2161 (poll={}, returned={})",
2162 polls,
2163 response.events.len(),
2164 );
2165 if cursor == new_cursor {
2169 assert!(
2170 !response.has_more,
2171 "cursor stuck at {:?} but has_more=true → stall",
2172 cursor,
2173 );
2174 break;
2175 }
2176 cursor = new_cursor;
2177 if !response.has_more && response.events.is_empty() {
2178 break;
2179 }
2180 }
2181 assert_eq!(
2182 total, 50,
2183 "full draining of 50 events must succeed without stall \
2184 (got {} in {} polls)",
2185 total, polls,
2186 );
2187 }
2188
2189 #[tokio::test]
2197 async fn poll_merger_does_not_stall_on_multi_shard_filter_truncation() {
2198 let adapter = Arc::new(MockAdapter::new());
2199 for shard_id in 0..2u16 {
2204 let events: Vec<StoredEvent> = (0..30)
2205 .map(|i| {
2206 StoredEvent::from_value(
2207 format!("{}-{}", shard_id, i),
2208 json!({"keep": true}),
2209 (i * 2 + shard_id as usize) as u64 + 1,
2213 shard_id,
2214 )
2215 })
2216 .collect();
2217 adapter.add_events(shard_id, events);
2218 }
2219
2220 let merger = PollMerger::new(adapter, vec![0, 1]);
2221 let make_request = |from_id: Option<String>| ConsumeRequest {
2222 limit: 10,
2223 from_id,
2224 shards: None,
2225 filter: Some(Filter::eq("keep", json!(true))),
2226 ordering: Ordering::None,
2227 };
2228
2229 let mut cursor: Option<String> = None;
2230 let mut returned: std::collections::HashSet<String> = std::collections::HashSet::new();
2231 let mut polls = 0;
2232 loop {
2233 polls += 1;
2234 assert!(
2235 polls < 30,
2236 "multi-shard: poll loop must terminate; \
2237 got {} polls without draining (cursor={:?}, returned={})",
2238 polls,
2239 cursor,
2240 returned.len(),
2241 );
2242 let response = merger.poll(make_request(cursor.clone())).await.unwrap();
2243 for e in &response.events {
2244 returned.insert(e.id.clone());
2245 }
2246 let new_cursor = response.next_id.clone();
2247 if cursor == new_cursor {
2251 assert!(
2252 !response.has_more,
2253 "multi-shard: cursor stuck at {:?} but \
2254 has_more=true → stall",
2255 cursor,
2256 );
2257 break;
2258 }
2259 cursor = new_cursor;
2260 if !response.has_more && response.events.is_empty() {
2261 break;
2262 }
2263 }
2264 assert_eq!(
2265 returned.len(),
2266 60,
2267 "multi-shard: every match across both shards must \
2268 surface exactly once (got {} unique in {} polls)",
2269 returned.len(),
2270 polls,
2271 );
2272 }
2273
2274 #[tokio::test]
2277 async fn poll_merger_does_not_flag_truncation_on_small_limit() {
2278 let adapter = Arc::new(MockAdapter::new());
2279 adapter.add_events(
2280 0,
2281 vec![StoredEvent::from_value(
2282 "0-1".to_string(),
2283 json!({}),
2284 100,
2285 0,
2286 )],
2287 );
2288 let merger = PollMerger::new(adapter, vec![0]);
2289 let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2290 assert!(
2291 !response.truncated_at_per_shard_cap,
2292 "small limits must not flag the cap",
2293 );
2294 }
2295
2296 #[tokio::test]
2297 async fn test_poll_merger_small_limit_many_shards() {
2298 let adapter = Arc::new(MockAdapter::new());
2301 let num_shards = 8u16;
2302
2303 for shard_id in 0..num_shards {
2304 adapter.add_events(
2305 shard_id,
2306 vec![StoredEvent::from_value(
2307 format!("{}-1", shard_id),
2308 json!({"shard": shard_id}),
2309 100,
2310 shard_id,
2311 )],
2312 );
2313 }
2314
2315 let merger = PollMerger::new(adapter, (0..num_shards).collect());
2316
2317 let request = ConsumeRequest::new(3);
2319 let response = merger.poll(request).await.unwrap();
2320
2321 assert_eq!(response.events.len(), 3);
2322 assert!(response.has_more);
2323 }
2324
2325 #[tokio::test]
2326 async fn test_regression_filtered_shards_cursor_advances() {
2327 let adapter = Arc::new(MockAdapter::new());
2333
2334 adapter.add_events(
2336 0,
2337 vec![
2338 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2339 StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 200, 0),
2340 ],
2341 );
2342
2343 adapter.add_events(
2345 1,
2346 vec![
2347 StoredEvent::from_value("1-1".to_string(), json!({"type": "message"}), 150, 1),
2348 StoredEvent::from_value("1-2".to_string(), json!({"type": "message"}), 250, 1),
2349 ],
2350 );
2351
2352 let merger = PollMerger::new(adapter, vec![0, 1]);
2353 let filter = Filter::eq("type", json!("token"));
2354
2355 let response1 = merger
2357 .poll(ConsumeRequest::new(100).filter(filter.clone()))
2358 .await
2359 .unwrap();
2360
2361 assert_eq!(response1.events.len(), 2, "Should get 2 token events");
2362 for event in &response1.events {
2363 assert_eq!(
2364 event.shard_id, 0,
2365 "All returned events should be from shard 0"
2366 );
2367 }
2368
2369 let cursor1 = response1
2370 .next_id
2371 .expect("Should have a cursor after first poll");
2372
2373 let decoded = CompositeCursor::decode(&cursor1).unwrap();
2375 assert!(
2376 decoded.get(1).is_some(),
2377 "Cursor must advance for shard 1 even though all its events were filtered out"
2378 );
2379 assert_eq!(
2380 decoded.get(1),
2381 Some("1-2"),
2382 "Shard 1 cursor should point to its last fetched event"
2383 );
2384
2385 let response2 = merger
2387 .poll(ConsumeRequest::new(100).filter(filter).from(cursor1))
2388 .await
2389 .unwrap();
2390
2391 assert!(
2392 response2.events.is_empty(),
2393 "Second poll should return no events (all events already consumed or filtered)"
2394 );
2395 }
2396
2397 #[tokio::test]
2398 async fn test_regression_poll_merger_filter_does_not_infinite_loop() {
2399 let adapter = Arc::new(MockAdapter::new());
2403
2404 let shard0_events: Vec<_> = (1..=100)
2406 .map(|i| {
2407 StoredEvent::from_value(
2408 format!("0-{}", i),
2409 json!({"type": "token", "idx": i}),
2410 i as u64 * 10,
2411 0,
2412 )
2413 })
2414 .collect();
2415 adapter.add_events(0, shard0_events);
2416
2417 let shard1_events: Vec<_> = (1..=100)
2419 .map(|i| {
2420 StoredEvent::from_value(
2421 format!("1-{}", i),
2422 json!({"type": "message", "idx": i}),
2423 i as u64 * 10 + 5,
2424 1,
2425 )
2426 })
2427 .collect();
2428 adapter.add_events(1, shard1_events);
2429
2430 let merger = PollMerger::new(adapter, vec![0, 1]);
2431 let filter = Filter::eq("type", json!("token"));
2432
2433 let mut all_events = Vec::new();
2434 let mut cursor: Option<String> = None;
2435 let max_iterations = 50;
2436 let mut iterations = 0;
2437
2438 loop {
2439 iterations += 1;
2440 if iterations > max_iterations {
2441 panic!(
2442 "Infinite loop detected after {} iterations! Collected {} events so far.",
2443 max_iterations,
2444 all_events.len()
2445 );
2446 }
2447
2448 let mut request = ConsumeRequest::new(50).filter(filter.clone());
2449 if let Some(c) = &cursor {
2450 request = request.from(c.clone());
2451 }
2452
2453 let response = merger.poll(request).await.unwrap();
2454 all_events.extend(response.events);
2455
2456 if !response.has_more {
2457 break;
2458 }
2459 cursor = response.next_id;
2460 }
2461
2462 assert_eq!(
2464 all_events.len(),
2465 100,
2466 "Expected 100 matching events, got {}. Iterations: {}",
2467 all_events.len(),
2468 iterations
2469 );
2470
2471 for event in &all_events {
2473 assert_eq!(
2474 event.shard_id, 0,
2475 "All matching events should come from shard 0"
2476 );
2477 }
2478
2479 let mut ids: Vec<_> = all_events.iter().map(|e| e.id.clone()).collect();
2481 ids.sort();
2482 ids.dedup();
2483 assert_eq!(ids.len(), 100, "Should have no duplicate events");
2484 }
2485
2486 #[tokio::test]
2487 async fn test_regression_all_events_filtered_returns_cursor() {
2488 let adapter = Arc::new(MockAdapter::new());
2491
2492 adapter.add_events(
2494 0,
2495 vec![
2496 StoredEvent::from_value("0-1".to_string(), json!({"type": "noise"}), 100, 0),
2497 StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 200, 0),
2498 ],
2499 );
2500
2501 let merger = PollMerger::new(adapter, vec![0]);
2502 let filter = Filter::eq("type", json!("signal"));
2503
2504 let response = merger
2505 .poll(ConsumeRequest::new(100).filter(filter))
2506 .await
2507 .unwrap();
2508
2509 assert!(response.events.is_empty());
2511 assert!(
2512 response.next_id.is_some(),
2513 "cursor must advance past filtered events even when none match"
2514 );
2515 }
2516
2517 #[tokio::test]
2526 async fn test_poll_merger_filter_insertion_ts_truncates_after_sort() {
2527 let adapter = Arc::new(MockAdapter::new());
2528
2529 adapter.add_events(
2533 0,
2534 vec![
2535 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 400, 0),
2536 StoredEvent::from_value("0-2".to_string(), json!({"type": "noise"}), 100, 0),
2537 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 200, 0),
2538 ],
2539 );
2540 adapter.add_events(
2541 1,
2542 vec![
2543 StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 260, 1),
2544 StoredEvent::from_value("1-2".to_string(), json!({"type": "noise"}), 300, 1),
2545 StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 120, 1),
2546 ],
2547 );
2548
2549 let merger = PollMerger::new(adapter, vec![0, 1]);
2550 let filter = Filter::eq("type", json!("token"));
2551
2552 let response = merger
2555 .poll(
2556 ConsumeRequest::new(2)
2557 .filter(filter)
2558 .ordering(Ordering::InsertionTs),
2559 )
2560 .await
2561 .unwrap();
2562
2563 assert_eq!(response.events.len(), 2);
2564 assert_eq!(
2565 response.events[0].insertion_ts, 120,
2566 "earliest match must come first"
2567 );
2568 assert_eq!(response.events[1].insertion_ts, 200);
2569 assert!(
2570 response.has_more,
2571 "two more matching events remain past the limit"
2572 );
2573 }
2574
2575 #[tokio::test]
2576 async fn test_regression_corrupt_event_filter_drop_is_consistent_and_logged() {
2577 let adapter = Arc::new(MockAdapter::new());
2598 adapter.add_events(
2599 0,
2600 vec![
2601 StoredEvent::from_value("0-1".to_string(), json!({"type": "ok"}), 100, 0),
2602 StoredEvent::new(
2605 "0-2".to_string(),
2606 bytes::Bytes::from_static(b"\xff\xff not json \xff"),
2607 200,
2608 0,
2609 ),
2610 ],
2611 );
2612
2613 let merger = PollMerger::new(adapter, vec![0]);
2614
2615 let filtered = merger
2617 .poll(ConsumeRequest::new(100).filter(Filter::eq("type", json!("ok"))))
2618 .await
2619 .unwrap();
2620 assert_eq!(
2621 filtered.events.len(),
2622 1,
2623 "filtered poll must drop the corrupt event"
2624 );
2625 assert_eq!(filtered.events[0].id, "0-1");
2626
2627 let unfiltered = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2632 assert_eq!(
2633 unfiltered.events.len(),
2634 2,
2635 "unfiltered poll must surface the corrupt event verbatim"
2636 );
2637 let ids: Vec<_> = unfiltered.events.iter().map(|e| e.id.as_str()).collect();
2638 assert!(ids.contains(&"0-1"));
2639 assert!(ids.contains(&"0-2"));
2640 }
2641
2642 #[tokio::test]
2657 async fn test_regression_ordering_none_filter_does_not_strand_later_shards() {
2658 let adapter = Arc::new(MockAdapter::new());
2659
2660 adapter.add_events(
2662 0,
2663 vec![
2664 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2665 StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2666 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2667 ],
2668 );
2669 adapter.add_events(
2671 1,
2672 vec![
2673 StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 200, 1),
2674 StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 210, 1),
2675 StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 220, 1),
2676 ],
2677 );
2678
2679 let merger = PollMerger::new(adapter, vec![0, 1]);
2680 let filter = Filter::eq("type", json!("token"));
2681
2682 let mut all_returned: Vec<String> = Vec::new();
2686 let mut cursor: Option<String> = None;
2687 for _ in 0..20 {
2688 let mut req = ConsumeRequest::new(2).filter(filter.clone());
2689 if let Some(c) = &cursor {
2690 req = req.from(c.clone());
2691 }
2692 let resp = merger.poll(req).await.unwrap();
2693 for e in &resp.events {
2694 all_returned.push(e.id.clone());
2695 }
2696 if !resp.has_more {
2697 break;
2698 }
2699 cursor = resp.next_id;
2700 }
2701
2702 all_returned.sort();
2703 all_returned.dedup();
2704 assert_eq!(
2705 all_returned,
2706 vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2707 "every matching event from every shard must be returned exactly once"
2708 );
2709 }
2710
2711 #[tokio::test]
2726 async fn test_regression_insertion_ts_filter_does_not_strand_late_shard() {
2727 let adapter = Arc::new(MockAdapter::new());
2728
2729 adapter.add_events(
2730 0,
2731 vec![
2732 StoredEvent::from_value("0-1".to_string(), json!({"type": "token"}), 100, 0),
2733 StoredEvent::from_value("0-2".to_string(), json!({"type": "token"}), 110, 0),
2734 StoredEvent::from_value("0-3".to_string(), json!({"type": "token"}), 120, 0),
2735 ],
2736 );
2737 adapter.add_events(
2738 1,
2739 vec![
2740 StoredEvent::from_value("1-1".to_string(), json!({"type": "token"}), 1000, 1),
2741 StoredEvent::from_value("1-2".to_string(), json!({"type": "token"}), 1010, 1),
2742 StoredEvent::from_value("1-3".to_string(), json!({"type": "token"}), 1020, 1),
2743 ],
2744 );
2745
2746 let merger = PollMerger::new(adapter, vec![0, 1]);
2747 let filter = Filter::eq("type", json!("token"));
2748
2749 let mut all_returned: Vec<String> = Vec::new();
2750 let mut cursor: Option<String> = None;
2751 for _ in 0..20 {
2752 let mut req = ConsumeRequest::new(2)
2753 .filter(filter.clone())
2754 .ordering(Ordering::InsertionTs);
2755 if let Some(c) = &cursor {
2756 req = req.from(c.clone());
2757 }
2758 let resp = merger.poll(req).await.unwrap();
2759 for e in &resp.events {
2760 all_returned.push(e.id.clone());
2761 }
2762 if !resp.has_more {
2763 break;
2764 }
2765 cursor = resp.next_id;
2766 }
2767
2768 all_returned.sort();
2769 all_returned.dedup();
2770 assert_eq!(
2771 all_returned,
2772 vec!["0-1", "0-2", "0-3", "1-1", "1-2", "1-3"],
2773 "matches from the late-ts shard must not be lost to truncation"
2774 );
2775 }
2776
2777 #[tokio::test]
2785 async fn has_more_is_suppressed_when_no_progress() {
2786 struct LiarAdapter;
2787
2788 #[async_trait]
2789 impl Adapter for LiarAdapter {
2790 async fn init(&mut self) -> Result<(), AdapterError> {
2791 Ok(())
2792 }
2793 async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2794 Ok(())
2795 }
2796 async fn flush(&self) -> Result<(), AdapterError> {
2797 Ok(())
2798 }
2799 async fn shutdown(&self) -> Result<(), AdapterError> {
2800 Ok(())
2801 }
2802 async fn poll_shard(
2803 &self,
2804 _shard_id: u16,
2805 _from_id: Option<&str>,
2806 _limit: usize,
2807 ) -> Result<ShardPollResult, AdapterError> {
2808 Ok(ShardPollResult {
2811 events: Vec::new(),
2812 next_id: None,
2813 has_more: true,
2814 })
2815 }
2816 fn name(&self) -> &'static str {
2817 "liar"
2818 }
2819 }
2820
2821 let adapter: Arc<dyn Adapter> = Arc::new(LiarAdapter);
2822 let merger = PollMerger::new(adapter, vec![0, 1]);
2823 let response = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2824
2825 assert!(
2826 response.events.is_empty(),
2827 "no events were emitted, but merger returned {}",
2828 response.events.len()
2829 );
2830 assert!(
2833 !response.has_more,
2834 "has_more must be suppressed when merger made no progress (#50)"
2835 );
2836 assert!(
2837 response.next_id.is_none(),
2838 "next_id must remain None when no progress was made (#50)"
2839 );
2840 }
2841
2842 #[tokio::test]
2850 async fn stalled_poll_echoes_caller_cursor_back() {
2851 struct EmptyAdapter;
2852
2853 #[async_trait]
2854 impl Adapter for EmptyAdapter {
2855 async fn init(&mut self) -> Result<(), AdapterError> {
2856 Ok(())
2857 }
2858 async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2859 Ok(())
2860 }
2861 async fn flush(&self) -> Result<(), AdapterError> {
2862 Ok(())
2863 }
2864 async fn shutdown(&self) -> Result<(), AdapterError> {
2865 Ok(())
2866 }
2867 async fn poll_shard(
2868 &self,
2869 _shard_id: u16,
2870 _from_id: Option<&str>,
2871 _limit: usize,
2872 ) -> Result<ShardPollResult, AdapterError> {
2873 Ok(ShardPollResult {
2874 events: Vec::new(),
2875 next_id: None,
2876 has_more: false,
2877 })
2878 }
2879 fn name(&self) -> &'static str {
2880 "empty"
2881 }
2882 }
2883
2884 let adapter: Arc<dyn Adapter> = Arc::new(EmptyAdapter);
2885 let merger = PollMerger::new(adapter, vec![0, 1]);
2886
2887 let mut cursor = CompositeCursor::new();
2889 cursor.set(0, "1702-0".to_string());
2890 cursor.set(1, "1703-0".to_string());
2891 let encoded = cursor.encode().unwrap();
2892
2893 let mut req = ConsumeRequest::new(100);
2894 req.from_id = Some(encoded.clone());
2895
2896 let response = merger.poll(req).await.unwrap();
2897
2898 assert!(response.events.is_empty());
2899 assert_eq!(
2900 response.next_id.as_deref(),
2901 Some(encoded.as_str()),
2902 "stalled poll with input cursor must echo cursor back \
2903 (got {:?}); pre-fix this was None and callers paged \
2904 back to the stream's start",
2905 response.next_id,
2906 );
2907
2908 let response_no_cursor = merger.poll(ConsumeRequest::new(100)).await.unwrap();
2911 assert!(response_no_cursor.next_id.is_none());
2912 }
2913
2914 #[tokio::test]
2921 async fn sort_breaks_ties_deterministically_across_shards() {
2922 let adapter = Arc::new(MockAdapter::new());
2925 adapter.add_events(
2926 0,
2927 vec![
2928 StoredEvent::from_value("0-a".to_string(), json!({}), 100, 0),
2929 StoredEvent::from_value("0-b".to_string(), json!({}), 100, 0),
2930 ],
2931 );
2932 adapter.add_events(
2933 1,
2934 vec![
2935 StoredEvent::from_value("1-a".to_string(), json!({}), 100, 1),
2936 StoredEvent::from_value("1-b".to_string(), json!({}), 100, 1),
2937 ],
2938 );
2939
2940 let merger = PollMerger::new(adapter, vec![0, 1]);
2942 let mut prior_order: Option<Vec<String>> = None;
2943 for iter in 0..20 {
2944 let r = merger
2945 .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
2946 .await
2947 .unwrap();
2948 let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
2949 if let Some(prev) = &prior_order {
2950 assert_eq!(
2951 &ids, prev,
2952 "iter {iter}: order is non-deterministic — sort tie-break failed (#52)"
2953 );
2954 }
2955 prior_order = Some(ids);
2956 }
2957
2958 let r = merger
2960 .poll(ConsumeRequest::new(10).ordering(Ordering::InsertionTs))
2961 .await
2962 .unwrap();
2963 let ids: Vec<String> = r.events.iter().map(|e| e.id.clone()).collect();
2964 assert_eq!(ids, vec!["0-a", "0-b", "1-a", "1-b"]);
2965 }
2966}