1use std::sync::Arc;
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use bytes::Bytes;
5use slatedb::object_store::ObjectStore;
6use slatedb::object_store::path::Path;
7use tokio_util::sync::CancellationToken;
8
9use crate::config::ConsumerConfig;
10use crate::error::{Error, Result};
11use crate::gc::GarbageCollector;
12use crate::metric_names as m;
13use crate::model::decode_batch;
14use crate::queue::{Metadata, QueueConsumer};
15
16const DEQUEUE_INTERVAL: u64 = 100;
17
18#[derive(Debug)]
20pub struct ConsumedBatch {
21 pub entries: Vec<Bytes>,
23 pub sequence: u64,
25 pub location: String,
27 pub metadata: Vec<Metadata>,
29}
30
31#[derive(Debug, Clone, PartialEq)]
37pub struct BatchDescriptor {
38 pub sequence: u64,
40 pub location: String,
42 pub metadata: Vec<Metadata>,
44}
45
46#[derive(Clone)]
58pub struct ConsumerFetchHandle {
59 object_store: Arc<dyn ObjectStore>,
60 manifest_path: String,
61}
62
63impl std::fmt::Debug for ConsumerFetchHandle {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("ConsumerFetchHandle")
66 .field("manifest_path", &self.manifest_path)
67 .finish_non_exhaustive()
68 }
69}
70
71impl ConsumerFetchHandle {
72 pub async fn fetch(&self, descriptor: BatchDescriptor) -> Result<ConsumedBatch> {
84 let _ = &self.manifest_path; let start = Instant::now();
86 let path = Path::from(descriptor.location.as_str());
87 let data = self
88 .object_store
89 .get(&path)
90 .await
91 .map_err(|e| Error::Storage(e.to_string()))?
92 .bytes()
93 .await
94 .map_err(|e| Error::Storage(e.to_string()))?;
95
96 let data_len = data.len() as u64;
97 let entries = decode_batch(data)?;
98
99 metrics::counter!(m::BATCHES_COLLECTED).increment(1);
100 metrics::counter!(m::ENTRIES_COLLECTED).increment(entries.len() as u64);
101 metrics::counter!(m::BYTES_COLLECTED).increment(data_len);
102 metrics::histogram!(m::FETCH_DURATION_SECONDS).record(start.elapsed().as_secs_f64());
103
104 Ok(ConsumedBatch {
105 entries,
106 sequence: descriptor.sequence,
107 location: descriptor.location,
108 metadata: descriptor.metadata,
109 })
110 }
111}
112
113pub struct Consumer {
120 consumer: QueueConsumer,
121 object_store: Arc<dyn ObjectStore>,
122 manifest_path: String,
123 gc_shutdown: CancellationToken,
124 gc_handle: tokio::task::JoinHandle<()>,
125 ack_count: u64,
126 last_acked_sequence: Option<u64>,
127 last_fetched_sequence: Option<u64>,
128 last_handed_out_sequence: Option<u64>,
134}
135
136impl Consumer {
137 pub async fn new(config: ConsumerConfig, last_acked_sequence: Option<u64>) -> Result<Self> {
144 let object_store = common::storage::factory::create_object_store(&config.object_store)
145 .map_err(|e| Error::Storage(e.to_string()))?;
146 Self::with_object_store(config, object_store, last_acked_sequence).await
147 }
148
149 pub async fn with_object_store(
150 config: ConsumerConfig,
151 object_store: Arc<dyn ObjectStore>,
152 last_acked_sequence: Option<u64>,
153 ) -> Result<Self> {
154 crate::metric_names::describe_consumer_metrics();
155 let manifest_path = config.manifest_path.clone();
156 let consumer =
157 QueueConsumer::with_object_store(manifest_path.clone(), object_store.clone());
158
159 consumer.initialize().await?;
161 if let Some(seq) = last_acked_sequence {
162 consumer.dequeue(seq).await?;
163 }
164
165 let gc_shutdown = CancellationToken::new();
166 let gc = GarbageCollector::new(
167 config.manifest_path,
168 config.data_path_prefix,
169 config.gc_interval,
170 config.gc_grace_period,
171 object_store.clone(),
172 );
173 let gc_handle = tokio::spawn(gc.collect(gc_shutdown.clone()));
174
175 Ok(Self {
176 consumer,
177 object_store,
178 manifest_path,
179 ack_count: 0,
180 last_acked_sequence,
181 gc_shutdown,
182 gc_handle,
183 last_fetched_sequence: last_acked_sequence,
184 last_handed_out_sequence: last_acked_sequence,
185 })
186 }
187
188 pub async fn next_batch(&mut self) -> Result<Option<ConsumedBatch>> {
211 let entries = self
212 .consumer
213 .descriptors_after(self.last_handed_out_sequence, 1)
214 .await?;
215 metrics::gauge!(m::QUEUE_LENGTH).set(self.consumer.len() as f64);
216 let Some(entry) = entries.into_iter().next() else {
217 return Ok(None);
218 };
219 let descriptor = BatchDescriptor {
220 sequence: entry.sequence,
221 location: entry.location,
222 metadata: entry.metadata,
223 };
224 let batch = self.fetch_descriptor(descriptor).await?;
230 self.last_handed_out_sequence = Some(batch.sequence);
231 Ok(Some(batch))
232 }
233
234 pub async fn next_descriptors(&mut self, max: usize) -> Result<Vec<BatchDescriptor>> {
253 if max == 0 {
254 return Ok(Vec::new());
255 }
256 let entries = self
257 .consumer
258 .descriptors_after(self.last_handed_out_sequence, max)
259 .await?;
260 metrics::gauge!(m::QUEUE_LENGTH).set(self.consumer.len() as f64);
261 if let Some(last) = entries.last() {
262 self.last_handed_out_sequence = Some(last.sequence);
263 }
264 let count = entries.len() as u64;
265 if count > 0 {
266 metrics::counter!(m::DESCRIPTORS_HANDED_OUT).increment(count);
267 }
268 Ok(entries
269 .into_iter()
270 .map(|e| BatchDescriptor {
271 sequence: e.sequence,
272 location: e.location,
273 metadata: e.metadata,
274 })
275 .collect())
276 }
277
278 pub fn fetch_handle(&self) -> ConsumerFetchHandle {
282 ConsumerFetchHandle {
283 object_store: self.object_store.clone(),
284 manifest_path: self.manifest_path.clone(),
285 }
286 }
287
288 pub async fn fetch_descriptor(&mut self, descriptor: BatchDescriptor) -> Result<ConsumedBatch> {
298 let handle = self.fetch_handle();
299 let batch = handle.fetch(descriptor).await?;
300 self.last_fetched_sequence = match self.last_fetched_sequence {
301 Some(prev) => Some(prev.max(batch.sequence)),
302 None => Some(batch.sequence),
303 };
304 if let Some(last_meta) = batch.metadata.last() {
305 let now_ms = SystemTime::now()
306 .duration_since(UNIX_EPOCH)
307 .unwrap_or_default()
308 .as_millis() as i64;
309 let lag_s = (now_ms - last_meta.ingestion_time_ms) as f64 / 1000.0;
310 metrics::gauge!(m::CONSUMER_LAG_SECONDS).set(lag_s.max(0.0));
311 }
312 Ok(batch)
313 }
314
315 pub async fn ack(&mut self, sequence: u64) -> Result<()> {
322 if let Some(last) = self.last_acked_sequence
323 && sequence != last + 1
324 {
325 return Err(Error::Storage(format!(
326 "out-of-order ack: expected sequence {}, got {}",
327 last + 1,
328 sequence
329 )));
330 }
331 self.last_acked_sequence = Some(sequence);
332 self.ack_count += 1;
333 metrics::counter!(m::ACKS).increment(1);
334 if self.ack_count.is_multiple_of(DEQUEUE_INTERVAL) {
335 self.consumer.dequeue(sequence).await?;
336 }
337 Ok(())
338 }
339
340 pub async fn ack_through(&mut self, sequence: u64) -> Result<()> {
351 if let Some(last) = self.last_acked_sequence
352 && sequence <= last
353 {
354 return Err(Error::Storage(format!(
355 "non-monotonic ack_through: last_acked={last}, requested={sequence}"
356 )));
357 }
358 let count_advanced = match self.last_acked_sequence {
359 None => sequence + 1,
360 Some(last) => sequence - last,
361 };
362
363 self.consumer.dequeue(sequence).await?;
366
367 self.last_acked_sequence = Some(sequence);
369 self.ack_count = self.ack_count.wrapping_add(count_advanced);
370 metrics::counter!(m::ACKS).increment(count_advanced);
371 Ok(())
372 }
373
374 pub async fn flush(&mut self) -> Result<()> {
376 if let Some(seq) = self.last_acked_sequence {
377 self.consumer.dequeue(seq).await?;
378 }
379 Ok(())
380 }
381
382 pub async fn close(mut self) -> Result<()> {
384 self.flush().await?;
385 self.gc_shutdown.cancel();
386 let _ = self.gc_handle.await;
387 Ok(())
388 }
389
390 pub fn len(&self) -> usize {
392 self.consumer.len()
393 }
394
395 pub fn is_empty(&self) -> bool {
397 self.len() == 0
398 }
399
400 pub fn conflict_rate(&self) -> f64 {
402 self.consumer.conflict_rate()
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use crate::config::ConsumerConfig;
410 use crate::model::{CompressionType, encode_batch};
411 use crate::queue::{Metadata, QueueProducer};
412 use bytes::Bytes;
413 use common::ObjectStoreConfig;
414 use slatedb::object_store::PutPayload;
415 use slatedb::object_store::memory::InMemory;
416 use std::time::Duration;
417
418 const TEST_MANIFEST_PATH: &str = "test/manifest";
419
420 fn test_collector_config() -> ConsumerConfig {
421 ConsumerConfig {
422 object_store: ObjectStoreConfig::InMemory,
423 manifest_path: TEST_MANIFEST_PATH.to_string(),
424 data_path_prefix: "ingest".to_string(),
425 gc_interval: Duration::from_secs(300),
426 gc_grace_period: Duration::from_secs(600),
427 }
428 }
429
430 fn test_entries() -> Vec<Bytes> {
431 vec![Bytes::from("data1"), Bytes::from("data2")]
432 }
433
434 async fn write_batch(store: &Arc<dyn ObjectStore>, location: &str, entries: &[Bytes]) {
435 let payload = encode_batch(entries, CompressionType::None).unwrap();
436 let path = Path::from(location);
437 store.put(&path, PutPayload::from(payload)).await.unwrap();
438 }
439
440 async fn make_collector(
441 store: &Arc<dyn ObjectStore>,
442 config: ConsumerConfig,
443 last_acked_sequence: Option<u64>,
444 ) -> (QueueProducer, Consumer) {
445 let producer =
446 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
447 let collector = Consumer::with_object_store(config, store.clone(), last_acked_sequence)
448 .await
449 .unwrap();
450 (producer, collector)
451 }
452
453 #[tokio::test]
454 async fn should_collect_enqueued_batch() {
455 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
456 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
457
458 let entries = test_entries();
459 let location = "batches/batch-001";
460 write_batch(&store, location, &entries).await;
461 producer
462 .enqueue(location.to_string(), vec![])
463 .await
464 .unwrap();
465
466 let batch = collector.next_batch().await.unwrap().unwrap();
467 assert_eq!(batch.entries.len(), 2);
468 assert_eq!(batch.entries[0], Bytes::from("data1"));
469 assert_eq!(batch.entries[1], Bytes::from("data2"));
470 assert_eq!(batch.location, location);
471 }
472
473 #[tokio::test]
474 async fn should_collect_metadata_from_queue_entry() {
475 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
476 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
477
478 let entries = test_entries();
479 let location = "batches/batch-meta";
480 write_batch(&store, location, &entries).await;
481
482 let metadata = vec![Metadata {
483 start_index: 0,
484 ingestion_time_ms: 1_700_000_000_000,
485 payload: Bytes::from(r#"{"topic":"events"}"#),
486 }];
487 producer
488 .enqueue(location.to_string(), metadata)
489 .await
490 .unwrap();
491
492 let batch = collector.next_batch().await.unwrap().unwrap();
493 assert_eq!(batch.metadata.len(), 1);
494 assert_eq!(batch.metadata[0].start_index, 0);
495 assert_eq!(batch.metadata[0].ingestion_time_ms, 1_700_000_000_000);
496 assert_eq!(
497 batch.metadata[0].payload,
498 Bytes::from(r#"{"topic":"events"}"#)
499 );
500 }
501
502 #[tokio::test]
503 async fn should_return_none_when_queue_empty() {
504 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
505 let (_producer, mut collector) =
506 make_collector(&store, test_collector_config(), None).await;
507
508 let result = collector.next_batch().await.unwrap();
509 assert!(result.is_none());
510 }
511
512 #[tokio::test]
513 async fn should_ack_batch() {
514 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
515 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
516
517 let entries = test_entries();
518 let location = "batches/batch-002";
519 write_batch(&store, location, &entries).await;
520 producer
521 .enqueue(location.to_string(), vec![])
522 .await
523 .unwrap();
524
525 let batch = collector.next_batch().await.unwrap().unwrap();
526 collector.ack(batch.sequence).await.unwrap();
527 collector.flush().await.unwrap();
528
529 let next = collector.next_batch().await.unwrap();
531 assert!(next.is_none());
532 }
533
534 #[tokio::test]
535 async fn should_next_batch_return_batch_after_last_acked() {
536 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
537 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
538
539 let entries = test_entries();
540 write_batch(&store, "batches/first", &entries).await;
541 write_batch(&store, "batches/second", &entries).await;
542 producer
543 .enqueue("batches/first".to_string(), vec![])
544 .await
545 .unwrap();
546 producer
547 .enqueue("batches/second".to_string(), vec![])
548 .await
549 .unwrap();
550
551 let first = collector.next_batch().await.unwrap().unwrap();
552 collector.ack(first.sequence).await.unwrap();
553
554 let batch = collector.next_batch().await.unwrap().unwrap();
555 assert_eq!(batch.location, "batches/second");
556 assert_eq!(batch.sequence, 1);
557 assert_eq!(batch.entries.len(), 2);
558 }
559
560 #[tokio::test]
561 async fn should_next_batch_advance_before_previous_batch_is_acked() {
562 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
564 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
565
566 let entries = test_entries();
567 write_batch(&store, "batches/first", &entries).await;
568 write_batch(&store, "batches/second", &entries).await;
569 producer
570 .enqueue("batches/first".to_string(), vec![])
571 .await
572 .unwrap();
573 producer
574 .enqueue("batches/second".to_string(), vec![])
575 .await
576 .unwrap();
577
578 let first = collector.next_batch().await.unwrap().unwrap();
580 let second = collector.next_batch().await.unwrap().unwrap();
581
582 assert_eq!(first.location, "batches/first");
584 assert_eq!(first.sequence, 0);
585 assert_eq!(second.location, "batches/second");
586 assert_eq!(second.sequence, 1);
587 }
588
589 #[tokio::test]
590 async fn should_next_batch_return_none_when_no_more_entries() {
591 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
592 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
593
594 let entries = test_entries();
595 write_batch(&store, "batches/first", &entries).await;
596 producer
597 .enqueue("batches/first".to_string(), vec![])
598 .await
599 .unwrap();
600
601 let first = collector.next_batch().await.unwrap().unwrap();
602 collector.ack(first.sequence).await.unwrap();
603
604 let result = collector.next_batch().await.unwrap();
605 assert!(result.is_none());
606 }
607
608 #[tokio::test]
609 async fn should_resume_from_last_acked_sequence() {
610 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
611 let (producer, mut collector) =
612 make_collector(&store, test_collector_config(), Some(0)).await;
613
614 let entries = test_entries();
615 write_batch(&store, "batches/first", &entries).await;
616 write_batch(&store, "batches/second", &entries).await;
617 producer
618 .enqueue("batches/first".to_string(), vec![])
619 .await
620 .unwrap();
621 producer
622 .enqueue("batches/second".to_string(), vec![])
623 .await
624 .unwrap();
625
626 let batch = collector.next_batch().await.unwrap().unwrap();
628 assert_eq!(batch.location, "batches/second");
629 assert_eq!(batch.sequence, 1);
630 }
631
632 #[tokio::test]
633 async fn should_reject_out_of_order_ack() {
634 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
635 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
636
637 let entries = test_entries();
638 write_batch(&store, "batches/first", &entries).await;
639 write_batch(&store, "batches/second", &entries).await;
640 producer
641 .enqueue("batches/first".to_string(), vec![])
642 .await
643 .unwrap();
644 producer
645 .enqueue("batches/second".to_string(), vec![])
646 .await
647 .unwrap();
648
649 let first = collector.next_batch().await.unwrap().unwrap();
650 collector.ack(first.sequence).await.unwrap();
651
652 let result = collector.ack(5).await;
654 assert!(matches!(result, Err(Error::Storage(msg)) if msg.contains("out-of-order ack")));
655 }
656
657 #[tokio::test]
658 async fn should_batch_dequeue_calls() {
659 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
660 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
661
662 let entries = test_entries();
663 let count = DEQUEUE_INTERVAL + 1;
665 for i in 0..count {
666 let location = format!("batches/batch-{:04}", i);
667 write_batch(&store, &location, &entries).await;
668 producer.enqueue(location, vec![]).await.unwrap();
669 }
670
671 for i in 0..count {
673 let batch = collector.next_batch().await.unwrap().unwrap();
674 assert_eq!(batch.sequence, i);
675 collector.ack(batch.sequence).await.unwrap();
676 }
677
678 assert_eq!(collector.len(), 1);
681
682 collector.flush().await.unwrap();
684
685 let result = collector.next_batch().await.unwrap();
686 assert!(result.is_none());
687 }
688
689 #[tokio::test]
690 async fn should_flush_pending_acks() {
691 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
692 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
693
694 let entries = test_entries();
695 write_batch(&store, "batches/first", &entries).await;
696 write_batch(&store, "batches/second", &entries).await;
697 producer
698 .enqueue("batches/first".to_string(), vec![])
699 .await
700 .unwrap();
701 producer
702 .enqueue("batches/second".to_string(), vec![])
703 .await
704 .unwrap();
705
706 let batch = collector.next_batch().await.unwrap().unwrap();
707 collector.ack(batch.sequence).await.unwrap();
708
709 assert_eq!(collector.len(), 2);
712
713 collector.flush().await.unwrap();
715 assert_eq!(collector.len(), 1);
716 }
717
718 #[tokio::test]
719 async fn should_close_flush_and_consume() {
720 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
721 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
722
723 let entries = test_entries();
724 write_batch(&store, "batches/first", &entries).await;
725 producer
726 .enqueue("batches/first".to_string(), vec![])
727 .await
728 .unwrap();
729
730 let batch = collector.next_batch().await.unwrap().unwrap();
731 collector.ack(batch.sequence).await.unwrap();
732 collector.close().await.unwrap();
733
734 let (_, mut collector2) = make_collector(&store, test_collector_config(), None).await;
736 let result = collector2.next_batch().await.unwrap();
737 assert!(result.is_none());
738 }
739
740 #[tokio::test]
741 async fn should_fence_previous_collector() {
742 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
743 let (producer, mut collector1) =
744 make_collector(&store, test_collector_config(), None).await;
745
746 let entries = test_entries();
747 write_batch(&store, "batches/first", &entries).await;
748 producer
749 .enqueue("batches/first".to_string(), vec![])
750 .await
751 .unwrap();
752
753 let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
755
756 let result = collector1.next_batch().await;
758 assert!(matches!(result, Err(Error::Fenced)));
759 }
760
761 #[tokio::test]
762 async fn should_iterate_multiple_sequential_batches() {
763 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
764 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
765
766 let entries = test_entries();
767 let locations = ["batches/a", "batches/b", "batches/c"];
768 for loc in &locations {
769 write_batch(&store, loc, &entries).await;
770 producer.enqueue(loc.to_string(), vec![]).await.unwrap();
771 }
772
773 for (i, expected_loc) in locations.iter().enumerate() {
774 let batch = collector.next_batch().await.unwrap().unwrap();
775 assert_eq!(batch.sequence, i as u64);
776 assert_eq!(batch.location, *expected_loc);
777 collector.ack(batch.sequence).await.unwrap();
778 }
779
780 let result = collector.next_batch().await.unwrap();
781 assert!(result.is_none());
782 }
783
784 #[tokio::test]
785 async fn should_initialize_none_with_pre_existing_entries() {
786 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
787 let (producer, _) = make_collector(&store, test_collector_config(), None).await;
788
789 let entries = test_entries();
791 for i in 0..5 {
792 let loc = format!("batches/placeholder-{}", i);
793 write_batch(&store, &loc, &entries).await;
794 producer.enqueue(loc, vec![]).await.unwrap();
795 }
796 let (_, mut tmp_collector) = make_collector(&store, test_collector_config(), None).await;
798 for _ in 0..5 {
799 let batch = tmp_collector.next_batch().await.unwrap().unwrap();
800 tmp_collector.ack(batch.sequence).await.unwrap();
801 }
802 tmp_collector.close().await.unwrap();
803
804 write_batch(&store, "batches/pre-existing", &entries).await;
806 producer
807 .enqueue("batches/pre-existing".to_string(), vec![])
808 .await
809 .unwrap();
810
811 let (_, mut collector) = make_collector(&store, test_collector_config(), None).await;
813
814 let batch = collector.next_batch().await.unwrap().unwrap();
815 assert_eq!(batch.location, "batches/pre-existing");
816 assert_eq!(batch.sequence, 5);
817 }
818
819 #[tokio::test]
820 async fn should_initialize_with_sequence_dequeue_already_processed() {
821 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
822 let (producer, _) = make_collector(&store, test_collector_config(), None).await;
823
824 let entries = test_entries();
825 write_batch(&store, "batches/first", &entries).await;
826 write_batch(&store, "batches/second", &entries).await;
827 producer
828 .enqueue("batches/first".to_string(), vec![])
829 .await
830 .unwrap();
831 producer
832 .enqueue("batches/second".to_string(), vec![])
833 .await
834 .unwrap();
835
836 let (_, mut collector) = make_collector(&store, test_collector_config(), Some(0)).await;
838
839 assert_eq!(collector.len(), 1);
841
842 let batch = collector.next_batch().await.unwrap().unwrap();
843 assert_eq!(batch.location, "batches/second");
844 assert_eq!(batch.sequence, 1);
845 }
846
847 async fn enqueue_n(
852 store: &Arc<dyn ObjectStore>,
853 producer: &QueueProducer,
854 n: usize,
855 ) -> Vec<String> {
856 let entries = test_entries();
857 let mut locations = Vec::with_capacity(n);
858 for i in 0..n {
859 let loc = format!("batches/seq-{i:06}");
860 write_batch(store, &loc, &entries).await;
861 producer.enqueue(loc.clone(), vec![]).await.unwrap();
862 locations.push(loc);
863 }
864 locations
865 }
866
867 #[tokio::test]
868 async fn should_next_descriptors_max_zero_return_empty_without_manifest_read() {
869 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
870 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
871 enqueue_n(&store, &producer, 3).await;
872
873 let v = collector.next_descriptors(0).await.unwrap();
875 assert!(v.is_empty());
876
877 let v = collector.next_descriptors(10).await.unwrap();
879 assert_eq!(v.len(), 3);
880 }
881
882 #[tokio::test]
883 async fn should_next_descriptors_return_contiguous_run_when_available() {
884 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
885 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
886 enqueue_n(&store, &producer, 5).await;
887
888 let v = collector.next_descriptors(3).await.unwrap();
889 assert_eq!(v.len(), 3);
890 assert_eq!(v[0].sequence, 0);
891 assert_eq!(v[1].sequence, 1);
892 assert_eq!(v[2].sequence, 2);
893
894 let v2 = collector.next_descriptors(10).await.unwrap();
896 assert_eq!(v2.len(), 2);
897 assert_eq!(v2[0].sequence, 3);
898 assert_eq!(v2[1].sequence, 4);
899
900 let v3 = collector.next_descriptors(10).await.unwrap();
902 assert!(v3.is_empty());
903 }
904
905 #[tokio::test]
906 async fn should_next_descriptors_return_fewer_than_max_on_short_manifest() {
907 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
908 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
909 enqueue_n(&store, &producer, 2).await;
910
911 let v = collector.next_descriptors(10).await.unwrap();
912 assert_eq!(v.len(), 2);
913 }
914
915 #[tokio::test]
916 async fn should_fetch_handle_return_same_consumed_batch_as_fetch_descriptor() {
917 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
920 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
921 let entries = test_entries();
922 let location = "batches/handle-vs-wrapper";
923 write_batch(&store, location, &entries).await;
924 let metadata = vec![Metadata {
925 start_index: 0,
926 ingestion_time_ms: 1_700_000_000_000,
927 payload: Bytes::from(r#"{"k":"v"}"#),
928 }];
929 producer
930 .enqueue(location.to_string(), metadata.clone())
931 .await
932 .unwrap();
933
934 let descriptors = collector.next_descriptors(1).await.unwrap();
935 assert_eq!(descriptors.len(), 1);
936 let d = descriptors[0].clone();
937
938 let from_handle = collector.fetch_handle().fetch(d.clone()).await.unwrap();
939 let from_wrapper = collector.fetch_descriptor(d).await.unwrap();
940
941 assert_eq!(from_handle.sequence, from_wrapper.sequence);
942 assert_eq!(from_handle.location, from_wrapper.location);
943 assert_eq!(from_handle.metadata, from_wrapper.metadata);
944 assert_eq!(from_handle.entries, from_wrapper.entries);
945 assert_eq!(from_handle.location, location);
947 assert_eq!(from_handle.entries.len(), 2);
948 assert_eq!(from_handle.metadata, metadata);
949 }
950
951 #[tokio::test]
952 async fn should_next_batch_not_advance_cursor_on_fetch_failure() {
953 use slatedb::object_store::path::Path;
958 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
959 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
960 let entries = test_entries();
961 let location_a = "batches/a-fails-then-succeeds";
962 let location_b = "batches/b";
963 write_batch(&store, location_a, &entries).await;
964 write_batch(&store, location_b, &entries).await;
965 producer
966 .enqueue(location_a.to_string(), vec![])
967 .await
968 .unwrap();
969 producer
970 .enqueue(location_b.to_string(), vec![])
971 .await
972 .unwrap();
973
974 store.delete(&Path::from(location_a)).await.unwrap();
976
977 let r = collector.next_batch().await;
979 assert!(
980 matches!(r, Err(Error::Storage(_))),
981 "expected Storage error, got {r:?}"
982 );
983
984 write_batch(&store, location_a, &entries).await;
986
987 let b = collector.next_batch().await.unwrap().unwrap();
990 assert_eq!(b.sequence, 0);
991 assert_eq!(b.location, location_a);
992 collector.ack(b.sequence).await.unwrap();
994 let b2 = collector.next_batch().await.unwrap().unwrap();
995 assert_eq!(b2.sequence, 1);
996 assert_eq!(b2.location, location_b);
997 }
998
999 #[tokio::test]
1000 async fn should_next_batch_not_advance_cursor_when_future_is_dropped() {
1001 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1008 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1009 let entries = test_entries();
1010 write_batch(&store, "batches/cancel-a", &entries).await;
1011 producer
1012 .enqueue("batches/cancel-a".to_string(), vec![])
1013 .await
1014 .unwrap();
1015
1016 assert_eq!(collector.last_handed_out_sequence, None);
1017
1018 {
1020 let fut = collector.next_batch();
1021 drop(fut);
1022 }
1023 assert_eq!(
1024 collector.last_handed_out_sequence, None,
1025 "dropped next_batch future must not advance the cursor"
1026 );
1027
1028 let b = collector.next_batch().await.unwrap().unwrap();
1030 assert_eq!(b.sequence, 0);
1031 assert_eq!(b.location, "batches/cancel-a");
1032 assert_eq!(collector.last_handed_out_sequence, Some(0));
1033 }
1034
1035 #[tokio::test]
1036 async fn should_next_batch_advance_cursor_on_successful_fetch() {
1037 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1042 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1043 let entries = test_entries();
1044 write_batch(&store, "batches/ok-a", &entries).await;
1045 write_batch(&store, "batches/ok-b", &entries).await;
1046 producer
1047 .enqueue("batches/ok-a".to_string(), vec![])
1048 .await
1049 .unwrap();
1050 producer
1051 .enqueue("batches/ok-b".to_string(), vec![])
1052 .await
1053 .unwrap();
1054
1055 assert_eq!(collector.last_handed_out_sequence, None);
1056
1057 let a = collector.next_batch().await.unwrap().unwrap();
1058 assert_eq!(a.sequence, 0);
1059 assert_eq!(collector.last_handed_out_sequence, Some(0));
1060
1061 let b = collector.next_batch().await.unwrap().unwrap();
1062 assert_eq!(b.sequence, 1);
1063 assert_eq!(collector.last_handed_out_sequence, Some(1));
1064 }
1065
1066 #[tokio::test]
1067 async fn should_fetch_handle_clones_fetch_concurrently_without_blocking_owner() {
1068 fn assert_send_sync_static<T: Send + Sync + 'static>() {}
1076 assert_send_sync_static::<ConsumerFetchHandle>();
1077
1078 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1079 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1080 enqueue_n(&store, &producer, 4).await;
1081 let descriptors = collector.next_descriptors(4).await.unwrap();
1082
1083 let handle = collector.fetch_handle();
1085 let h1 = handle.clone();
1086 let h2 = handle.clone();
1087 let d0 = descriptors[0].clone();
1088 let d1 = descriptors[1].clone();
1089 let t1 = tokio::spawn(async move { h1.fetch(d0).await.unwrap() });
1090 let t2 = tokio::spawn(async move { h2.fetch(d1).await.unwrap() });
1091 let r1 = t1.await.unwrap();
1092 let r2 = t2.await.unwrap();
1093 assert_eq!(r1.sequence, 0);
1094 assert_eq!(r2.sequence, 1);
1095
1096 collector.ack_through(1).await.unwrap();
1100 }
1101
1102 #[tokio::test]
1103 async fn should_ack_through_advance_frontier_in_one_call() {
1104 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1105 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1106 enqueue_n(&store, &producer, 5).await;
1107
1108 let _v = collector.next_descriptors(5).await.unwrap();
1109
1110 collector.ack_through(3).await.unwrap();
1112 assert_eq!(collector.len(), 1);
1114 }
1115
1116 #[tokio::test]
1117 async fn should_ack_through_reject_non_monotonic_input() {
1118 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1119 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1120 enqueue_n(&store, &producer, 5).await;
1121 collector.next_descriptors(5).await.unwrap();
1122
1123 collector.ack_through(2).await.unwrap();
1124 let err = collector.ack_through(2).await.unwrap_err();
1125 match err {
1126 Error::Storage(msg) => assert!(msg.contains("non-monotonic ack_through")),
1127 other => panic!("expected Error::Storage, got {other:?}"),
1128 }
1129 assert_eq!(collector.last_acked_sequence, Some(2));
1131 }
1132
1133 #[tokio::test]
1134 async fn should_ack_through_leave_state_unchanged_on_fence() {
1135 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1136 let (producer, mut collector1) =
1137 make_collector(&store, test_collector_config(), None).await;
1138 enqueue_n(&store, &producer, 5).await;
1139 collector1.next_descriptors(5).await.unwrap();
1140 collector1.ack_through(0).await.unwrap();
1141 assert_eq!(collector1.last_acked_sequence, Some(0));
1142
1143 let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
1145
1146 let err = collector1.ack_through(2).await.unwrap_err();
1149 assert!(matches!(err, Error::Fenced));
1150 assert_eq!(collector1.last_acked_sequence, Some(0));
1151 }
1152
1153 #[tokio::test]
1154 async fn should_next_descriptors_surface_fence() {
1155 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1156 let (producer, mut collector1) =
1157 make_collector(&store, test_collector_config(), None).await;
1158 enqueue_n(&store, &producer, 3).await;
1159
1160 let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
1161
1162 let result = collector1.next_descriptors(10).await;
1163 assert!(matches!(result, Err(Error::Fenced)));
1164 }
1165
1166 #[tokio::test]
1167 async fn should_next_batch_be_equivalent_to_descriptor_plus_fetch() {
1168 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1170 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1171 let entries = test_entries();
1172 let location = "batches/equiv";
1173 write_batch(&store, location, &entries).await;
1174 producer
1175 .enqueue(location.to_string(), vec![])
1176 .await
1177 .unwrap();
1178
1179 let via_wrapper = collector.next_batch().await.unwrap().unwrap();
1180 assert_eq!(via_wrapper.sequence, 0);
1181 assert_eq!(via_wrapper.location, location);
1182 assert_eq!(via_wrapper.entries.len(), 2);
1183 }
1184
1185 #[tokio::test]
1186 async fn should_descriptor_handout_contract_not_reissue_lost_descriptors() {
1187 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1191 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1192 enqueue_n(&store, &producer, 3).await;
1193
1194 let lost = collector.next_descriptors(2).await.unwrap();
1196 assert_eq!(lost.len(), 2);
1197 drop(lost);
1198
1199 let next = collector.next_descriptors(10).await.unwrap();
1201 assert_eq!(next.len(), 1);
1202 assert_eq!(next[0].sequence, 2);
1203
1204 collector.close().await.unwrap();
1208 let (_, mut recovered) = make_collector(&store, test_collector_config(), None).await;
1209 let again = recovered.next_descriptors(10).await.unwrap();
1210 assert_eq!(again.len(), 3);
1211 assert_eq!(again[0].sequence, 0);
1212 }
1213}