Skip to main content

ingest/
collector.rs

1use std::cell::Cell;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use futures::StreamExt;
6use futures::stream;
7use slatedb::object_store::ObjectStore;
8use slatedb::object_store::path::Path;
9
10use crate::config::CollectorConfig;
11use crate::error::{Error, Result};
12use crate::model::decode_batch;
13use crate::queue::{Metadata, QueueConsumer, QueueEntry};
14
15const DEQUEUE_INTERVAL: u64 = 100;
16
17/// A batch of entries read from object storage by the [`Collector`].
18pub struct CollectedBatch {
19    /// The deserialized opaque byte entries from the data batch.
20    pub entries: Vec<Bytes>,
21    /// The queue sequence number of this batch.
22    pub sequence: u64,
23    /// The object storage path of the data batch.
24    pub location: String,
25    /// Metadata ranges attached by the ingestor(s) that contributed to this batch.
26    pub metadata: Vec<Metadata>,
27}
28
29/// Reads batches of ingested entries from object storage via a queue consumer.
30///
31/// The collector iterates over entries in the queue manifest in ingestion order,
32/// fetches the corresponding data batches from object storage, and makes them
33/// available to the caller. Epoch-based fencing ensures only a single active
34/// collector processes entries at any time.
35pub struct Collector {
36    consumer: QueueConsumer,
37    object_store: Arc<dyn ObjectStore>,
38    ack_count: Cell<u64>,
39    last_acked_sequence: Cell<Option<u64>>,
40}
41
42impl Collector {
43    /// Create a new collector from the given configuration.
44    pub fn new(config: CollectorConfig) -> Result<Self> {
45        let object_store = common::storage::factory::create_object_store(&config.object_store)
46            .map_err(|e| Error::Storage(e.to_string()))?;
47        Ok(Self::with_object_store(config, object_store))
48    }
49
50    fn with_object_store(config: CollectorConfig, object_store: Arc<dyn ObjectStore>) -> Self {
51        let consumer = QueueConsumer::with_object_store(config.manifest_path, object_store.clone());
52        Self {
53            consumer,
54            object_store,
55            ack_count: Cell::new(0),
56            last_acked_sequence: Cell::new(None),
57        }
58    }
59
60    /// Initialize the consumer by fencing any previous consumer instance.
61    ///
62    /// If `last_acked_sequence` is `Some(seq)`, the collector resumes after that
63    /// sequence — the next call to [`next_batch`](Self::next_batch) will read
64    /// sequence `seq + 1`. If `None`, the collector peeks the queue to discover
65    /// the first available entry and positions itself just before it.
66    pub async fn initialize(&self, last_acked_sequence: Option<u64>) -> Result<()> {
67        self.consumer.initialize().await?;
68        if let Some(seq) = last_acked_sequence {
69            self.last_acked_sequence.set(Some(seq));
70            self.flush().await?;
71        }
72        Ok(())
73    }
74
75    /// Read the next data batch from object storage.
76    ///
77    /// If no batch has been acked yet, peeks the earliest entry in the queue.
78    /// Otherwise, reads the entry with sequence `last_acked_sequence + 1`.
79    /// Returns `None` if no matching entry is available.
80    pub async fn next_batch(&self) -> Result<Option<CollectedBatch>> {
81        let queue_entry = match self.last_acked_sequence.get() {
82            Some(seq) => self.consumer.read(seq.wrapping_add(1)).await?,
83            None => self.consumer.peek().await?,
84        };
85        match queue_entry {
86            Some(entry) => self.fetch_batch(entry).await,
87            None => Ok(None),
88        }
89    }
90
91    async fn fetch_batch(
92        &self,
93        queue_entry: crate::queue::QueueEntry,
94    ) -> Result<Option<CollectedBatch>> {
95        let path = Path::from(queue_entry.location.as_str());
96        let data = self
97            .object_store
98            .get(&path)
99            .await
100            .map_err(|e| Error::Storage(e.to_string()))?
101            .bytes()
102            .await
103            .map_err(|e| Error::Storage(e.to_string()))?;
104
105        let entries = decode_batch(data)?;
106
107        Ok(Some(CollectedBatch {
108            entries,
109            sequence: queue_entry.sequence,
110            location: queue_entry.location,
111            metadata: queue_entry.metadata,
112        }))
113    }
114
115    /// Acknowledge that the batch with the given sequence number has been processed.
116    ///
117    /// Acks must be in order — the sequence must immediately follow the last acked
118    /// sequence, otherwise an error is returned. To amortize manifest writes, the
119    /// collector only calls `dequeue()` on the queue consumer every
120    /// 100 acks.
121    pub async fn ack(&self, sequence: u64) -> Result<()> {
122        if let Some(last) = self.last_acked_sequence.get()
123            && sequence != last + 1
124        {
125            return Err(Error::Storage(format!(
126                "out-of-order ack: expected sequence {}, got {}",
127                last + 1,
128                sequence
129            )));
130        }
131        self.last_acked_sequence.set(Some(sequence));
132        let count = self.ack_count.get() + 1;
133        self.ack_count.set(count);
134        if count.is_multiple_of(DEQUEUE_INTERVAL) {
135            let dequeued = self.consumer.dequeue(sequence).await?;
136            delete_dequeued_batches(self.object_store.clone(), dequeued);
137        }
138        Ok(())
139    }
140
141    /// Flush any pending acks by dequeueing up to the last acked sequence.
142    pub async fn flush(&self) -> Result<()> {
143        if let Some(seq) = self.last_acked_sequence.get() {
144            let dequeued = self.consumer.dequeue(seq).await?;
145            delete_dequeued_batches(self.object_store.clone(), dequeued);
146        }
147        Ok(())
148    }
149
150    /// Flush pending acks and consume the collector.
151    pub async fn close(self) -> Result<()> {
152        self.flush().await
153    }
154
155    /// Return the number of entries in the queue as of the last manifest read or write.
156    pub fn len(&self) -> usize {
157        self.consumer.len()
158    }
159
160    /// Return `true` if the queue had no entries as of the last manifest read or write.
161    pub fn is_empty(&self) -> bool {
162        self.len() == 0
163    }
164
165    /// Return the percentage of manifest writes that encountered a conflict.
166    pub fn conflict_rate(&self) -> f64 {
167        self.consumer.conflict_rate()
168    }
169}
170
171fn delete_dequeued_batches(object_store: Arc<dyn ObjectStore>, entries: Vec<QueueEntry>) {
172    if entries.is_empty() {
173        return;
174    }
175    tokio::spawn(async move {
176        let locations = stream::iter(entries.iter().map(|e| Ok(Path::from(e.location.as_str()))));
177        let mut results = object_store.delete_stream(locations.boxed());
178        let mut i = 0;
179        while let Some(result) = results.next().await {
180            if let Err(e) = result {
181                tracing::warn!(
182                    path = entries[i].location,
183                    error = %e,
184                    "failed to delete ingested data batch"
185                );
186            }
187            i += 1;
188        }
189    });
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::config::CollectorConfig;
196    use crate::model::{CompressionType, encode_batch};
197    use crate::queue::{Metadata, QueueProducer};
198    use bytes::Bytes;
199    use common::ObjectStoreConfig;
200    use slatedb::object_store::PutPayload;
201    use slatedb::object_store::memory::InMemory;
202
203    const TEST_MANIFEST_PATH: &str = "test/manifest";
204
205    fn test_collector_config() -> CollectorConfig {
206        CollectorConfig {
207            object_store: ObjectStoreConfig::InMemory,
208            manifest_path: TEST_MANIFEST_PATH.to_string(),
209        }
210    }
211
212    fn test_entries() -> Vec<Bytes> {
213        vec![Bytes::from("data1"), Bytes::from("data2")]
214    }
215
216    async fn write_batch(store: &Arc<dyn ObjectStore>, location: &str, entries: &[Bytes]) {
217        let payload = encode_batch(entries, CompressionType::None).unwrap();
218        let path = Path::from(location);
219        store.put(&path, PutPayload::from(payload)).await.unwrap();
220    }
221
222    fn make_collector(
223        store: &Arc<dyn ObjectStore>,
224        config: CollectorConfig,
225    ) -> (QueueProducer, Collector) {
226        let producer =
227            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
228        let collector = Collector::with_object_store(config, store.clone());
229        (producer, collector)
230    }
231
232    #[tokio::test]
233    async fn should_collect_enqueued_batch() {
234        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
235        let (producer, collector) = make_collector(&store, test_collector_config());
236        collector.initialize(None).await.unwrap();
237
238        let entries = test_entries();
239        let location = "batches/batch-001";
240        write_batch(&store, location, &entries).await;
241        producer
242            .enqueue(location.to_string(), vec![])
243            .await
244            .unwrap();
245
246        let batch = collector.next_batch().await.unwrap().unwrap();
247        assert_eq!(batch.entries.len(), 2);
248        assert_eq!(batch.entries[0], Bytes::from("data1"));
249        assert_eq!(batch.entries[1], Bytes::from("data2"));
250        assert_eq!(batch.location, location);
251    }
252
253    #[tokio::test]
254    async fn should_collect_metadata_from_queue_entry() {
255        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
256        let (producer, collector) = make_collector(&store, test_collector_config());
257        collector.initialize(None).await.unwrap();
258
259        let entries = test_entries();
260        let location = "batches/batch-meta";
261        write_batch(&store, location, &entries).await;
262
263        let metadata = vec![Metadata {
264            start_index: 0,
265            ingestion_time_ms: 1_700_000_000_000,
266            payload: Bytes::from(r#"{"topic":"events"}"#),
267        }];
268        producer
269            .enqueue(location.to_string(), metadata)
270            .await
271            .unwrap();
272
273        let batch = collector.next_batch().await.unwrap().unwrap();
274        assert_eq!(batch.metadata.len(), 1);
275        assert_eq!(batch.metadata[0].start_index, 0);
276        assert_eq!(batch.metadata[0].ingestion_time_ms, 1_700_000_000_000);
277        assert_eq!(
278            batch.metadata[0].payload,
279            Bytes::from(r#"{"topic":"events"}"#)
280        );
281    }
282
283    #[tokio::test]
284    async fn should_return_none_when_queue_empty() {
285        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
286        let (_producer, collector) = make_collector(&store, test_collector_config());
287        collector.initialize(None).await.unwrap();
288
289        let result = collector.next_batch().await.unwrap();
290        assert!(result.is_none());
291    }
292
293    #[tokio::test]
294    async fn should_ack_batch() {
295        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
296        let (producer, collector) = make_collector(&store, test_collector_config());
297        collector.initialize(None).await.unwrap();
298
299        let entries = test_entries();
300        let location = "batches/batch-002";
301        write_batch(&store, location, &entries).await;
302        producer
303            .enqueue(location.to_string(), vec![])
304            .await
305            .unwrap();
306
307        let batch = collector.next_batch().await.unwrap().unwrap();
308        collector.ack(batch.sequence).await.unwrap();
309        collector.flush().await.unwrap();
310
311        // After flush, entry is dequeued
312        let next = collector.next_batch().await.unwrap();
313        assert!(next.is_none());
314    }
315
316    #[tokio::test]
317    async fn should_next_batch_return_batch_after_last_acked() {
318        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
319        let (producer, collector) = make_collector(&store, test_collector_config());
320        collector.initialize(None).await.unwrap();
321
322        let entries = test_entries();
323        write_batch(&store, "batches/first", &entries).await;
324        write_batch(&store, "batches/second", &entries).await;
325        producer
326            .enqueue("batches/first".to_string(), vec![])
327            .await
328            .unwrap();
329        producer
330            .enqueue("batches/second".to_string(), vec![])
331            .await
332            .unwrap();
333
334        let first = collector.next_batch().await.unwrap().unwrap();
335        collector.ack(first.sequence).await.unwrap();
336
337        let batch = collector.next_batch().await.unwrap().unwrap();
338        assert_eq!(batch.location, "batches/second");
339        assert_eq!(batch.sequence, 1);
340        assert_eq!(batch.entries.len(), 2);
341    }
342
343    #[tokio::test]
344    async fn should_next_batch_return_none_when_no_more_entries() {
345        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
346        let (producer, collector) = make_collector(&store, test_collector_config());
347        collector.initialize(None).await.unwrap();
348
349        let entries = test_entries();
350        write_batch(&store, "batches/first", &entries).await;
351        producer
352            .enqueue("batches/first".to_string(), vec![])
353            .await
354            .unwrap();
355
356        let first = collector.next_batch().await.unwrap().unwrap();
357        collector.ack(first.sequence).await.unwrap();
358
359        let result = collector.next_batch().await.unwrap();
360        assert!(result.is_none());
361    }
362
363    #[tokio::test]
364    async fn should_resume_from_last_acked_sequence() {
365        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
366        let (producer, collector) = make_collector(&store, test_collector_config());
367        collector.initialize(Some(0)).await.unwrap();
368
369        let entries = test_entries();
370        write_batch(&store, "batches/first", &entries).await;
371        write_batch(&store, "batches/second", &entries).await;
372        producer
373            .enqueue("batches/first".to_string(), vec![])
374            .await
375            .unwrap();
376        producer
377            .enqueue("batches/second".to_string(), vec![])
378            .await
379            .unwrap();
380
381        // Initialized with last_acked_sequence=0, so next_batch reads sequence 1
382        let batch = collector.next_batch().await.unwrap().unwrap();
383        assert_eq!(batch.location, "batches/second");
384        assert_eq!(batch.sequence, 1);
385    }
386
387    #[tokio::test]
388    async fn should_reject_out_of_order_ack() {
389        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
390        let (producer, collector) = make_collector(&store, test_collector_config());
391        collector.initialize(None).await.unwrap();
392
393        let entries = test_entries();
394        write_batch(&store, "batches/first", &entries).await;
395        write_batch(&store, "batches/second", &entries).await;
396        producer
397            .enqueue("batches/first".to_string(), vec![])
398            .await
399            .unwrap();
400        producer
401            .enqueue("batches/second".to_string(), vec![])
402            .await
403            .unwrap();
404
405        let first = collector.next_batch().await.unwrap().unwrap();
406        collector.ack(first.sequence).await.unwrap();
407
408        // Acking sequence 5 when last acked was 0 should fail
409        let result = collector.ack(5).await;
410        assert!(matches!(result, Err(Error::Storage(msg)) if msg.contains("out-of-order ack")));
411    }
412
413    #[tokio::test]
414    async fn should_batch_dequeue_calls() {
415        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
416        let (producer, collector) = make_collector(&store, test_collector_config());
417        collector.initialize(None).await.unwrap();
418
419        let entries = test_entries();
420        // Enqueue DEQUEUE_INTERVAL + 1 entries
421        let count = DEQUEUE_INTERVAL + 1;
422        for i in 0..count {
423            let location = format!("batches/batch-{:04}", i);
424            write_batch(&store, &location, &entries).await;
425            producer.enqueue(location, vec![]).await.unwrap();
426        }
427
428        // Ack all entries; dequeue fires at DEQUEUE_INTERVAL
429        for i in 0..count {
430            let batch = collector.next_batch().await.unwrap().unwrap();
431            assert_eq!(batch.sequence, i);
432            collector.ack(batch.sequence).await.unwrap();
433        }
434
435        // After DEQUEUE_INTERVAL acks, entries up to that point are dequeued.
436        // The last entry (index DEQUEUE_INTERVAL) was acked but not yet dequeued.
437        assert_eq!(collector.len(), 1);
438
439        // Flush to dequeue the remaining entry.
440        collector.flush().await.unwrap();
441
442        let result = collector.next_batch().await.unwrap();
443        assert!(result.is_none());
444    }
445
446    #[tokio::test]
447    async fn should_flush_pending_acks() {
448        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
449        let (producer, collector) = make_collector(&store, test_collector_config());
450        collector.initialize(None).await.unwrap();
451
452        let entries = test_entries();
453        write_batch(&store, "batches/first", &entries).await;
454        write_batch(&store, "batches/second", &entries).await;
455        producer
456            .enqueue("batches/first".to_string(), vec![])
457            .await
458            .unwrap();
459        producer
460            .enqueue("batches/second".to_string(), vec![])
461            .await
462            .unwrap();
463
464        let batch = collector.next_batch().await.unwrap().unwrap();
465        collector.ack(batch.sequence).await.unwrap();
466
467        // Without flush, the entry is still in the manifest.
468        // Verify by checking queue length hasn't changed.
469        assert_eq!(collector.len(), 2);
470
471        // After flush, dequeue removes the acked entry
472        collector.flush().await.unwrap();
473        assert_eq!(collector.len(), 1);
474    }
475
476    #[tokio::test]
477    async fn should_close_flush_and_consume() {
478        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
479        let (producer, collector) = make_collector(&store, test_collector_config());
480        collector.initialize(None).await.unwrap();
481
482        let entries = test_entries();
483        write_batch(&store, "batches/first", &entries).await;
484        producer
485            .enqueue("batches/first".to_string(), vec![])
486            .await
487            .unwrap();
488
489        let batch = collector.next_batch().await.unwrap().unwrap();
490        collector.ack(batch.sequence).await.unwrap();
491        collector.close().await.unwrap();
492
493        // After close, entries should be dequeued
494        let (_, collector2) = make_collector(&store, test_collector_config());
495        collector2.initialize(None).await.unwrap();
496        let result = collector2.next_batch().await.unwrap();
497        assert!(result.is_none());
498    }
499
500    #[tokio::test]
501    async fn should_fence_previous_collector() {
502        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
503        let (producer, collector1) = make_collector(&store, test_collector_config());
504        collector1.initialize(None).await.unwrap();
505
506        let entries = test_entries();
507        write_batch(&store, "batches/first", &entries).await;
508        producer
509            .enqueue("batches/first".to_string(), vec![])
510            .await
511            .unwrap();
512
513        // Second collector fences the first
514        let (_, collector2) = make_collector(&store, test_collector_config());
515        collector2.initialize(None).await.unwrap();
516
517        // First collector should get a Fenced error
518        let result = collector1.next_batch().await;
519        assert!(matches!(result, Err(Error::Fenced)));
520    }
521
522    #[tokio::test]
523    async fn should_iterate_multiple_sequential_batches() {
524        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
525        let (producer, collector) = make_collector(&store, test_collector_config());
526        collector.initialize(None).await.unwrap();
527
528        let entries = test_entries();
529        let locations = ["batches/a", "batches/b", "batches/c"];
530        for loc in &locations {
531            write_batch(&store, loc, &entries).await;
532            producer.enqueue(loc.to_string(), vec![]).await.unwrap();
533        }
534
535        for (i, expected_loc) in locations.iter().enumerate() {
536            let batch = collector.next_batch().await.unwrap().unwrap();
537            assert_eq!(batch.sequence, i as u64);
538            assert_eq!(batch.location, *expected_loc);
539            collector.ack(batch.sequence).await.unwrap();
540        }
541
542        let result = collector.next_batch().await.unwrap();
543        assert!(result.is_none());
544    }
545
546    #[tokio::test]
547    async fn should_initialize_none_with_pre_existing_entries() {
548        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
549        let (producer, _) = make_collector(&store, test_collector_config());
550
551        // Enqueue and dequeue placeholder entries to advance the sequence counter
552        let entries = test_entries();
553        for i in 0..5 {
554            let loc = format!("batches/placeholder-{}", i);
555            write_batch(&store, &loc, &entries).await;
556            producer.enqueue(loc, vec![]).await.unwrap();
557        }
558        // Use a temporary collector to dequeue placeholders
559        let (_, tmp_collector) = make_collector(&store, test_collector_config());
560        tmp_collector.initialize(None).await.unwrap();
561        for _ in 0..5 {
562            let batch = tmp_collector.next_batch().await.unwrap().unwrap();
563            tmp_collector.ack(batch.sequence).await.unwrap();
564        }
565        tmp_collector.close().await.unwrap();
566
567        // Now enqueue the real entry — it gets sequence 5
568        write_batch(&store, "batches/pre-existing", &entries).await;
569        producer
570            .enqueue("batches/pre-existing".to_string(), vec![])
571            .await
572            .unwrap();
573
574        // New collector with initialize(None) should find this entry
575        let (_, collector) = make_collector(&store, test_collector_config());
576        collector.initialize(None).await.unwrap();
577
578        let batch = collector.next_batch().await.unwrap().unwrap();
579        assert_eq!(batch.location, "batches/pre-existing");
580        assert_eq!(batch.sequence, 5);
581    }
582
583    #[tokio::test]
584    async fn should_initialize_with_sequence_dequeue_already_processed() {
585        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
586        let (producer, _) = make_collector(&store, test_collector_config());
587
588        let entries = test_entries();
589        write_batch(&store, "batches/first", &entries).await;
590        write_batch(&store, "batches/second", &entries).await;
591        producer
592            .enqueue("batches/first".to_string(), vec![])
593            .await
594            .unwrap();
595        producer
596            .enqueue("batches/second".to_string(), vec![])
597            .await
598            .unwrap();
599
600        // Simulate restart: new collector resumes after sequence 0
601        let (_, collector) = make_collector(&store, test_collector_config());
602        collector.initialize(Some(0)).await.unwrap();
603
604        // The flush in initialize should have dequeued entries through sequence 0
605        assert_eq!(collector.len(), 1);
606
607        let batch = collector.next_batch().await.unwrap().unwrap();
608        assert_eq!(batch.location, "batches/second");
609        assert_eq!(batch.sequence, 1);
610    }
611
612    async fn assert_batch_deleted(store: &Arc<dyn ObjectStore>, location: &str) {
613        // Yield to let the spawned deletion task run.
614        tokio::task::yield_now().await;
615        let path = Path::from(location);
616        let result = store.get(&path).await;
617        assert!(
618            result.is_err(),
619            "expected batch file to be deleted: {location}"
620        );
621    }
622
623    #[tokio::test]
624    async fn should_delete_batch_file_after_flush() {
625        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
626        let (producer, collector) = make_collector(&store, test_collector_config());
627        collector.initialize(None).await.unwrap();
628
629        let entries = test_entries();
630        let location = "batches/delete-me";
631        write_batch(&store, location, &entries).await;
632        producer
633            .enqueue(location.to_string(), vec![])
634            .await
635            .unwrap();
636
637        let batch = collector.next_batch().await.unwrap().unwrap();
638        collector.ack(batch.sequence).await.unwrap();
639        collector.flush().await.unwrap();
640
641        assert_batch_deleted(&store, location).await;
642    }
643
644    #[tokio::test]
645    async fn should_delete_batch_files_after_dequeue_interval() {
646        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
647        let (producer, collector) = make_collector(&store, test_collector_config());
648        collector.initialize(None).await.unwrap();
649
650        let entries = test_entries();
651        let mut locations = Vec::new();
652        for i in 0..DEQUEUE_INTERVAL {
653            let location = format!("batches/interval-{:04}", i);
654            write_batch(&store, &location, &entries).await;
655            producer.enqueue(location.clone(), vec![]).await.unwrap();
656            locations.push(location);
657        }
658
659        for _ in 0..DEQUEUE_INTERVAL {
660            let batch = collector.next_batch().await.unwrap().unwrap();
661            collector.ack(batch.sequence).await.unwrap();
662        }
663
664        // The DEQUEUE_INTERVAL-th ack triggers dequeue + deletion.
665        for location in &locations {
666            assert_batch_deleted(&store, location).await;
667        }
668    }
669
670    #[tokio::test]
671    async fn should_delete_batch_files_on_close() {
672        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
673        let (producer, collector) = make_collector(&store, test_collector_config());
674        collector.initialize(None).await.unwrap();
675
676        let entries = test_entries();
677        let location = "batches/close-me";
678        write_batch(&store, location, &entries).await;
679        producer
680            .enqueue(location.to_string(), vec![])
681            .await
682            .unwrap();
683
684        let batch = collector.next_batch().await.unwrap().unwrap();
685        collector.ack(batch.sequence).await.unwrap();
686        collector.close().await.unwrap();
687
688        assert_batch_deleted(&store, location).await;
689    }
690}