Skip to main content

buffer/
consumer.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use bytes::Bytes;
5use slatedb::object_store::ObjectStore;
6use slatedb::object_store::path::Path;
7use tokio_util::sync::CancellationToken;
8
9use crate::config::ConsumerConfig;
10use crate::error::{Error, Result};
11use crate::gc::GarbageCollector;
12use crate::metric_names as m;
13use crate::model::decode_batch;
14use crate::queue::{Metadata, QueueConsumer};
15
16const DEQUEUE_INTERVAL: u64 = 100;
17
18/// A batch of entries read from object storage by the [`Consumer`].
19#[derive(Debug)]
20pub struct ConsumedBatch {
21    /// The deserialized opaque byte entries from the data batch.
22    pub entries: Vec<Bytes>,
23    /// The queue sequence number of this batch.
24    pub sequence: u64,
25    /// The object storage path of the data batch.
26    pub location: String,
27    /// Metadata ranges attached by the buffer(s) that contributed to this batch.
28    pub metadata: Vec<Metadata>,
29}
30
31/// Lightweight pointer to one manifest entry. Carries everything a
32/// caller needs to fetch the corresponding data batch from object
33/// storage without re-reading the manifest.
34///
35/// Returned by [`Consumer::next_descriptors`]. See RFC 0003.
36#[derive(Debug, Clone, PartialEq)]
37pub struct BatchDescriptor {
38    /// The queue sequence number of this batch.
39    pub sequence: u64,
40    /// The object storage path of the data batch.
41    pub location: String,
42    /// Per-range metadata items attached to this batch by producers.
43    pub metadata: Vec<Metadata>,
44}
45
46/// Cloneable, concurrency-safe handle for fetching data batches from
47/// object storage given a [`BatchDescriptor`].
48///
49/// The handle holds an `Arc<dyn ObjectStore>` and the manifest path
50/// (used as a metric label only). It carries no manifest cursor and
51/// no mutable state, so it is safe to clone into many fetch worker
52/// tasks while the owning [`Consumer`] keeps `&mut`-only access to
53/// manifest operations (`next_descriptors`, `ack`, `ack_through`,
54/// `flush`).
55///
56/// See RFC 0003 ("Concurrency Model") for the full contract.
57#[derive(Clone)]
58pub struct ConsumerFetchHandle {
59    object_store: Arc<dyn ObjectStore>,
60    manifest_path: String,
61}
62
63impl std::fmt::Debug for ConsumerFetchHandle {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("ConsumerFetchHandle")
66            .field("manifest_path", &self.manifest_path)
67            .finish_non_exhaustive()
68    }
69}
70
71impl ConsumerFetchHandle {
72    /// Fetch and decode the batch identified by `descriptor`. Stateless
73    /// — does not consult or mutate manifest state. Safe to call
74    /// concurrently from multiple tasks (against the same handle clone
75    /// or distinct clones); two calls against the same descriptor
76    /// produce identical [`ConsumedBatch`] values.
77    ///
78    /// The handle does not detect epoch fencing on its own; a fenced
79    /// consumer's [`Consumer::next_descriptors`] is what surfaces
80    /// `Error::Fenced`. Fetches against descriptors handed out before
81    /// fencing succeed against object storage as long as the data
82    /// object is still present (i.e. before GC runs).
83    pub async fn fetch(&self, descriptor: BatchDescriptor) -> Result<ConsumedBatch> {
84        let _ = &self.manifest_path; // reserved for future per-source label
85        let start = Instant::now();
86        let path = Path::from(descriptor.location.as_str());
87        let data = self
88            .object_store
89            .get(&path)
90            .await
91            .map_err(|e| Error::Storage(e.to_string()))?
92            .bytes()
93            .await
94            .map_err(|e| Error::Storage(e.to_string()))?;
95
96        let data_len = data.len() as u64;
97        let entries = decode_batch(data)?;
98
99        metrics::counter!(m::BATCHES_COLLECTED).increment(1);
100        metrics::counter!(m::ENTRIES_COLLECTED).increment(entries.len() as u64);
101        metrics::counter!(m::BYTES_COLLECTED).increment(data_len);
102        metrics::histogram!(m::FETCH_DURATION_SECONDS).record(start.elapsed().as_secs_f64());
103
104        Ok(ConsumedBatch {
105            entries,
106            sequence: descriptor.sequence,
107            location: descriptor.location,
108            metadata: descriptor.metadata,
109        })
110    }
111}
112
113/// Reads batches of ingested entries from object storage via a queue consumer.
114///
115/// The consumer iterates over entries in the queue manifest in ingestion order,
116/// fetches the corresponding data batches from object storage, and makes them
117/// available to the caller. Epoch-based fencing ensures only a single active
118/// consumer processes entries at any time.
119pub struct Consumer {
120    consumer: QueueConsumer,
121    object_store: Arc<dyn ObjectStore>,
122    manifest_path: String,
123    gc_shutdown: CancellationToken,
124    gc_handle: tokio::task::JoinHandle<()>,
125    ack_count: u64,
126    last_acked_sequence: Option<u64>,
127    last_fetched_sequence: Option<u64>,
128    /// Read-ahead cursor: highest sequence handed out by
129    /// [`Consumer::next_descriptors`]. Distinct from
130    /// `last_fetched_sequence` because callers may receive
131    /// descriptors before the corresponding fetch resolves.
132    /// Initialized from `last_acked_sequence` on construction.
133    last_handed_out_sequence: Option<u64>,
134}
135
136impl Consumer {
137    /// Create a new consumer from the given configuration.
138    ///
139    /// Initializes the queue consumer (fencing any previous instance) and spawns
140    /// the garbage collector. If `last_acked_sequence` is `Some(seq)`, the
141    /// consumer resumes after that sequence; if `None`, it discovers the first
142    /// available entry.
143    pub async fn new(config: ConsumerConfig, last_acked_sequence: Option<u64>) -> Result<Self> {
144        let object_store = common::storage::factory::create_object_store(&config.object_store)
145            .map_err(|e| Error::Storage(e.to_string()))?;
146        Self::with_object_store(config, object_store, last_acked_sequence).await
147    }
148
149    pub async fn with_object_store(
150        config: ConsumerConfig,
151        object_store: Arc<dyn ObjectStore>,
152        last_acked_sequence: Option<u64>,
153    ) -> Result<Self> {
154        crate::metric_names::describe_consumer_metrics();
155        let manifest_path = config.manifest_path.clone();
156        let consumer =
157            QueueConsumer::with_object_store(manifest_path.clone(), object_store.clone());
158
159        // Fence previous consumers and position the cursor before spawning GC.
160        consumer.initialize().await?;
161        if let Some(seq) = last_acked_sequence {
162            consumer.dequeue(seq).await?;
163        }
164
165        let gc_shutdown = CancellationToken::new();
166        let gc = GarbageCollector::new(
167            config.manifest_path,
168            config.data_path_prefix,
169            config.gc_interval,
170            config.gc_grace_period,
171            object_store.clone(),
172        );
173        let gc_handle = tokio::spawn(gc.collect(gc_shutdown.clone()));
174
175        Ok(Self {
176            consumer,
177            object_store,
178            manifest_path,
179            ack_count: 0,
180            last_acked_sequence,
181            gc_shutdown,
182            gc_handle,
183            last_fetched_sequence: last_acked_sequence,
184            last_handed_out_sequence: last_acked_sequence,
185        })
186    }
187
188    /// Read the next data batch from object storage.
189    ///
190    /// Serial fetch of the next batch: peeks the next manifest entry
191    /// past the last handed-out / fetched sequence, fetches the
192    /// corresponding object, and returns it. Returns `None` if no
193    /// entry is available. May be called repeatedly to walk successive
194    /// batches; the read cursor is independent of the ack frontier.
195    ///
196    /// **Cancellation-safe and fetch-failure-safe.** The cursor
197    /// (`last_handed_out_sequence`) advances **only after** a
198    /// successful fetch. If the fetch fails, or if the future is
199    /// dropped mid-fetch, the cursor is unchanged and a subsequent
200    /// `next_batch` re-fetches the same entry.
201    ///
202    /// This is written as an inline peek-fetch-advance rather than a
203    /// wrapper over [`Consumer::next_descriptors`] +
204    /// [`Consumer::fetch_descriptor`] because `next_descriptors`
205    /// advances the cursor at handout time (a handed-out descriptor is
206    /// not reissued within a process; see the Descriptor Handout
207    /// Contract). Inlining keeps the cursor advance after the fetch
208    /// `await`, so a failed or dropped fetch leaves the cursor
209    /// untouched — the behavior `next_batch` callers expect.
210    pub async fn next_batch(&mut self) -> Result<Option<ConsumedBatch>> {
211        let entries = self
212            .consumer
213            .descriptors_after(self.last_handed_out_sequence, 1)
214            .await?;
215        metrics::gauge!(m::QUEUE_LENGTH).set(self.consumer.len() as f64);
216        let Some(entry) = entries.into_iter().next() else {
217            return Ok(None);
218        };
219        let descriptor = BatchDescriptor {
220            sequence: entry.sequence,
221            location: entry.location,
222            metadata: entry.metadata,
223        };
224        // fetch_descriptor advances last_fetched_sequence on success
225        // and updates the consumer_lag_seconds gauge. We then advance
226        // last_handed_out_sequence ourselves — only on success. If
227        // the fetch errors or the future is cancelled, neither cursor
228        // moves and the next next_batch re-issues the same entry.
229        let batch = self.fetch_descriptor(descriptor).await?;
230        self.last_handed_out_sequence = Some(batch.sequence);
231        Ok(Some(batch))
232    }
233
234    /// Read the manifest once and return up to `max` contiguous
235    /// [`BatchDescriptor`]s past the consumer's read-ahead cursor.
236    ///
237    /// Does not perform any object-store GET. Does not mutate the
238    /// durable ack frontier. Advances the in-memory read-ahead cursor
239    /// (`last_handed_out_sequence`) by the number of descriptors
240    /// returned.
241    ///
242    /// Returns an empty `Vec` if no new entries are available;
243    /// returns `Err(Error::Fenced)` if the consumer's epoch no longer
244    /// matches the manifest's.
245    ///
246    /// **Caller contract**: once a descriptor is returned, the caller
247    /// is responsible for either fetching and processing it or
248    /// accepting that it will be re-handed-out only via process
249    /// restart (`Consumer::new` with a `last_acked_sequence` argument).
250    /// Lost descriptors are not reissued within a process. See RFC
251    /// 0003 "Descriptor Handout Contract" for the full rules.
252    pub async fn next_descriptors(&mut self, max: usize) -> Result<Vec<BatchDescriptor>> {
253        if max == 0 {
254            return Ok(Vec::new());
255        }
256        let entries = self
257            .consumer
258            .descriptors_after(self.last_handed_out_sequence, max)
259            .await?;
260        metrics::gauge!(m::QUEUE_LENGTH).set(self.consumer.len() as f64);
261        if let Some(last) = entries.last() {
262            self.last_handed_out_sequence = Some(last.sequence);
263        }
264        let count = entries.len() as u64;
265        if count > 0 {
266            metrics::counter!(m::DESCRIPTORS_HANDED_OUT).increment(count);
267        }
268        Ok(entries
269            .into_iter()
270            .map(|e| BatchDescriptor {
271                sequence: e.sequence,
272                location: e.location,
273                metadata: e.metadata,
274            })
275            .collect())
276    }
277
278    /// Construct a cloneable fetch handle. O(1); each call returns a
279    /// fresh handle, and the handle itself implements `Clone` for
280    /// further duplication into worker tasks.
281    pub fn fetch_handle(&self) -> ConsumerFetchHandle {
282        ConsumerFetchHandle {
283            object_store: self.object_store.clone(),
284            manifest_path: self.manifest_path.clone(),
285        }
286    }
287
288    /// Fetch and decode a single batch via the consumer's serial
289    /// wrapper. Equivalent to `self.fetch_handle().fetch(descriptor)`,
290    /// but additionally maintains the consumer's serial lag cursor
291    /// (`last_fetched_sequence` + the `consumer_lag_seconds` gauge)
292    /// used by the legacy `next_batch` path.
293    ///
294    /// Parallel-fetch callers should use [`Consumer::fetch_handle`]
295    /// directly; the runtime owns its own per-stage latency
296    /// histograms (RFC 0002).
297    pub async fn fetch_descriptor(&mut self, descriptor: BatchDescriptor) -> Result<ConsumedBatch> {
298        let handle = self.fetch_handle();
299        let batch = handle.fetch(descriptor).await?;
300        self.last_fetched_sequence = match self.last_fetched_sequence {
301            Some(prev) => Some(prev.max(batch.sequence)),
302            None => Some(batch.sequence),
303        };
304        if let Some(last_meta) = batch.metadata.last() {
305            let now_ms = SystemTime::now()
306                .duration_since(UNIX_EPOCH)
307                .unwrap_or_default()
308                .as_millis() as i64;
309            let lag_s = (now_ms - last_meta.ingestion_time_ms) as f64 / 1000.0;
310            metrics::gauge!(m::CONSUMER_LAG_SECONDS).set(lag_s.max(0.0));
311        }
312        Ok(batch)
313    }
314
315    /// Acknowledge that the batch with the given sequence number has been processed.
316    ///
317    /// Acks must be in order — the sequence must immediately follow the last acked
318    /// sequence, otherwise an error is returned. To amortize manifest writes, the
319    /// consumer only calls `dequeue()` on the queue consumer every
320    /// 100 acks.
321    pub async fn ack(&mut self, sequence: u64) -> Result<()> {
322        if let Some(last) = self.last_acked_sequence
323            && sequence != last + 1
324        {
325            return Err(Error::Storage(format!(
326                "out-of-order ack: expected sequence {}, got {}",
327                last + 1,
328                sequence
329            )));
330        }
331        self.last_acked_sequence = Some(sequence);
332        self.ack_count += 1;
333        metrics::counter!(m::ACKS).increment(1);
334        if self.ack_count.is_multiple_of(DEQUEUE_INTERVAL) {
335            self.consumer.dequeue(sequence).await?;
336        }
337        Ok(())
338    }
339
340    /// Advance the durable ack frontier through (and including)
341    /// `sequence`. Performs `dequeue(sequence)` against the manifest
342    /// **first**, then updates in-memory state on success.
343    ///
344    /// On error (storage, fence), `last_acked_sequence` and the
345    /// `buffer.acks` counter remain at their pre-call values. A retry
346    /// against the same sequence is safe.
347    ///
348    /// Errors if `sequence <= last_acked_sequence` (the frontier is
349    /// monotonic).
350    pub async fn ack_through(&mut self, sequence: u64) -> Result<()> {
351        if let Some(last) = self.last_acked_sequence
352            && sequence <= last
353        {
354            return Err(Error::Storage(format!(
355                "non-monotonic ack_through: last_acked={last}, requested={sequence}"
356            )));
357        }
358        let count_advanced = match self.last_acked_sequence {
359            None => sequence + 1,
360            Some(last) => sequence - last,
361        };
362
363        // Durable dequeue first. Bubbles up Fenced and Storage errors
364        // without touching local state.
365        self.consumer.dequeue(sequence).await?;
366
367        // Only mutate in-memory state after dequeue succeeds.
368        self.last_acked_sequence = Some(sequence);
369        self.ack_count = self.ack_count.wrapping_add(count_advanced);
370        metrics::counter!(m::ACKS).increment(count_advanced);
371        Ok(())
372    }
373
374    /// Flush any pending acks by dequeueing up to the last acked sequence.
375    pub async fn flush(&mut self) -> Result<()> {
376        if let Some(seq) = self.last_acked_sequence {
377            self.consumer.dequeue(seq).await?;
378        }
379        Ok(())
380    }
381
382    /// Flush pending acks, shut down the garbage collector, and consume the handle.
383    pub async fn close(mut self) -> Result<()> {
384        self.flush().await?;
385        self.gc_shutdown.cancel();
386        let _ = self.gc_handle.await;
387        Ok(())
388    }
389
390    /// Return the number of entries in the queue as of the last manifest read or write.
391    pub fn len(&self) -> usize {
392        self.consumer.len()
393    }
394
395    /// Return `true` if the queue had no entries as of the last manifest read or write.
396    pub fn is_empty(&self) -> bool {
397        self.len() == 0
398    }
399
400    /// Return the percentage of manifest writes that encountered a conflict.
401    pub fn conflict_rate(&self) -> f64 {
402        self.consumer.conflict_rate()
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use crate::config::ConsumerConfig;
410    use crate::model::{CompressionType, encode_batch};
411    use crate::queue::{Metadata, QueueProducer};
412    use bytes::Bytes;
413    use common::ObjectStoreConfig;
414    use slatedb::object_store::PutPayload;
415    use slatedb::object_store::memory::InMemory;
416    use std::time::Duration;
417
418    const TEST_MANIFEST_PATH: &str = "test/manifest";
419
420    fn test_collector_config() -> ConsumerConfig {
421        ConsumerConfig {
422            object_store: ObjectStoreConfig::InMemory,
423            manifest_path: TEST_MANIFEST_PATH.to_string(),
424            data_path_prefix: "ingest".to_string(),
425            gc_interval: Duration::from_secs(300),
426            gc_grace_period: Duration::from_secs(600),
427        }
428    }
429
430    fn test_entries() -> Vec<Bytes> {
431        vec![Bytes::from("data1"), Bytes::from("data2")]
432    }
433
434    async fn write_batch(store: &Arc<dyn ObjectStore>, location: &str, entries: &[Bytes]) {
435        let payload = encode_batch(entries, CompressionType::None).unwrap();
436        let path = Path::from(location);
437        store.put(&path, PutPayload::from(payload)).await.unwrap();
438    }
439
440    async fn make_collector(
441        store: &Arc<dyn ObjectStore>,
442        config: ConsumerConfig,
443        last_acked_sequence: Option<u64>,
444    ) -> (QueueProducer, Consumer) {
445        let producer =
446            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
447        let collector = Consumer::with_object_store(config, store.clone(), last_acked_sequence)
448            .await
449            .unwrap();
450        (producer, collector)
451    }
452
453    #[tokio::test]
454    async fn should_collect_enqueued_batch() {
455        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
456        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
457
458        let entries = test_entries();
459        let location = "batches/batch-001";
460        write_batch(&store, location, &entries).await;
461        producer
462            .enqueue(location.to_string(), vec![])
463            .await
464            .unwrap();
465
466        let batch = collector.next_batch().await.unwrap().unwrap();
467        assert_eq!(batch.entries.len(), 2);
468        assert_eq!(batch.entries[0], Bytes::from("data1"));
469        assert_eq!(batch.entries[1], Bytes::from("data2"));
470        assert_eq!(batch.location, location);
471    }
472
473    #[tokio::test]
474    async fn should_collect_metadata_from_queue_entry() {
475        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
476        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
477
478        let entries = test_entries();
479        let location = "batches/batch-meta";
480        write_batch(&store, location, &entries).await;
481
482        let metadata = vec![Metadata {
483            start_index: 0,
484            ingestion_time_ms: 1_700_000_000_000,
485            payload: Bytes::from(r#"{"topic":"events"}"#),
486        }];
487        producer
488            .enqueue(location.to_string(), metadata)
489            .await
490            .unwrap();
491
492        let batch = collector.next_batch().await.unwrap().unwrap();
493        assert_eq!(batch.metadata.len(), 1);
494        assert_eq!(batch.metadata[0].start_index, 0);
495        assert_eq!(batch.metadata[0].ingestion_time_ms, 1_700_000_000_000);
496        assert_eq!(
497            batch.metadata[0].payload,
498            Bytes::from(r#"{"topic":"events"}"#)
499        );
500    }
501
502    #[tokio::test]
503    async fn should_return_none_when_queue_empty() {
504        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
505        let (_producer, mut collector) =
506            make_collector(&store, test_collector_config(), None).await;
507
508        let result = collector.next_batch().await.unwrap();
509        assert!(result.is_none());
510    }
511
512    #[tokio::test]
513    async fn should_ack_batch() {
514        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
515        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
516
517        let entries = test_entries();
518        let location = "batches/batch-002";
519        write_batch(&store, location, &entries).await;
520        producer
521            .enqueue(location.to_string(), vec![])
522            .await
523            .unwrap();
524
525        let batch = collector.next_batch().await.unwrap().unwrap();
526        collector.ack(batch.sequence).await.unwrap();
527        collector.flush().await.unwrap();
528
529        // After flush, entry is dequeued
530        let next = collector.next_batch().await.unwrap();
531        assert!(next.is_none());
532    }
533
534    #[tokio::test]
535    async fn should_next_batch_return_batch_after_last_acked() {
536        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
537        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
538
539        let entries = test_entries();
540        write_batch(&store, "batches/first", &entries).await;
541        write_batch(&store, "batches/second", &entries).await;
542        producer
543            .enqueue("batches/first".to_string(), vec![])
544            .await
545            .unwrap();
546        producer
547            .enqueue("batches/second".to_string(), vec![])
548            .await
549            .unwrap();
550
551        let first = collector.next_batch().await.unwrap().unwrap();
552        collector.ack(first.sequence).await.unwrap();
553
554        let batch = collector.next_batch().await.unwrap().unwrap();
555        assert_eq!(batch.location, "batches/second");
556        assert_eq!(batch.sequence, 1);
557        assert_eq!(batch.entries.len(), 2);
558    }
559
560    #[tokio::test]
561    async fn should_next_batch_advance_before_previous_batch_is_acked() {
562        // given
563        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
564        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
565
566        let entries = test_entries();
567        write_batch(&store, "batches/first", &entries).await;
568        write_batch(&store, "batches/second", &entries).await;
569        producer
570            .enqueue("batches/first".to_string(), vec![])
571            .await
572            .unwrap();
573        producer
574            .enqueue("batches/second".to_string(), vec![])
575            .await
576            .unwrap();
577
578        // when
579        let first = collector.next_batch().await.unwrap().unwrap();
580        let second = collector.next_batch().await.unwrap().unwrap();
581
582        // then
583        assert_eq!(first.location, "batches/first");
584        assert_eq!(first.sequence, 0);
585        assert_eq!(second.location, "batches/second");
586        assert_eq!(second.sequence, 1);
587    }
588
589    #[tokio::test]
590    async fn should_next_batch_return_none_when_no_more_entries() {
591        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
592        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
593
594        let entries = test_entries();
595        write_batch(&store, "batches/first", &entries).await;
596        producer
597            .enqueue("batches/first".to_string(), vec![])
598            .await
599            .unwrap();
600
601        let first = collector.next_batch().await.unwrap().unwrap();
602        collector.ack(first.sequence).await.unwrap();
603
604        let result = collector.next_batch().await.unwrap();
605        assert!(result.is_none());
606    }
607
608    #[tokio::test]
609    async fn should_resume_from_last_acked_sequence() {
610        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
611        let (producer, mut collector) =
612            make_collector(&store, test_collector_config(), Some(0)).await;
613
614        let entries = test_entries();
615        write_batch(&store, "batches/first", &entries).await;
616        write_batch(&store, "batches/second", &entries).await;
617        producer
618            .enqueue("batches/first".to_string(), vec![])
619            .await
620            .unwrap();
621        producer
622            .enqueue("batches/second".to_string(), vec![])
623            .await
624            .unwrap();
625
626        // Initialized with last_acked_sequence=0, so next_batch reads sequence 1
627        let batch = collector.next_batch().await.unwrap().unwrap();
628        assert_eq!(batch.location, "batches/second");
629        assert_eq!(batch.sequence, 1);
630    }
631
632    #[tokio::test]
633    async fn should_reject_out_of_order_ack() {
634        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
635        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
636
637        let entries = test_entries();
638        write_batch(&store, "batches/first", &entries).await;
639        write_batch(&store, "batches/second", &entries).await;
640        producer
641            .enqueue("batches/first".to_string(), vec![])
642            .await
643            .unwrap();
644        producer
645            .enqueue("batches/second".to_string(), vec![])
646            .await
647            .unwrap();
648
649        let first = collector.next_batch().await.unwrap().unwrap();
650        collector.ack(first.sequence).await.unwrap();
651
652        // Acking sequence 5 when last acked was 0 should fail
653        let result = collector.ack(5).await;
654        assert!(matches!(result, Err(Error::Storage(msg)) if msg.contains("out-of-order ack")));
655    }
656
657    #[tokio::test]
658    async fn should_batch_dequeue_calls() {
659        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
660        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
661
662        let entries = test_entries();
663        // Enqueue DEQUEUE_INTERVAL + 1 entries
664        let count = DEQUEUE_INTERVAL + 1;
665        for i in 0..count {
666            let location = format!("batches/batch-{:04}", i);
667            write_batch(&store, &location, &entries).await;
668            producer.enqueue(location, vec![]).await.unwrap();
669        }
670
671        // Ack all entries; dequeue fires at DEQUEUE_INTERVAL
672        for i in 0..count {
673            let batch = collector.next_batch().await.unwrap().unwrap();
674            assert_eq!(batch.sequence, i);
675            collector.ack(batch.sequence).await.unwrap();
676        }
677
678        // After DEQUEUE_INTERVAL acks, entries up to that point are dequeued.
679        // The last entry (index DEQUEUE_INTERVAL) was acked but not yet dequeued.
680        assert_eq!(collector.len(), 1);
681
682        // Flush to dequeue the remaining entry.
683        collector.flush().await.unwrap();
684
685        let result = collector.next_batch().await.unwrap();
686        assert!(result.is_none());
687    }
688
689    #[tokio::test]
690    async fn should_flush_pending_acks() {
691        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
692        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
693
694        let entries = test_entries();
695        write_batch(&store, "batches/first", &entries).await;
696        write_batch(&store, "batches/second", &entries).await;
697        producer
698            .enqueue("batches/first".to_string(), vec![])
699            .await
700            .unwrap();
701        producer
702            .enqueue("batches/second".to_string(), vec![])
703            .await
704            .unwrap();
705
706        let batch = collector.next_batch().await.unwrap().unwrap();
707        collector.ack(batch.sequence).await.unwrap();
708
709        // Without flush, the entry is still in the manifest.
710        // Verify by checking queue length hasn't changed.
711        assert_eq!(collector.len(), 2);
712
713        // After flush, dequeue removes the acked entry
714        collector.flush().await.unwrap();
715        assert_eq!(collector.len(), 1);
716    }
717
718    #[tokio::test]
719    async fn should_close_flush_and_consume() {
720        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
721        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
722
723        let entries = test_entries();
724        write_batch(&store, "batches/first", &entries).await;
725        producer
726            .enqueue("batches/first".to_string(), vec![])
727            .await
728            .unwrap();
729
730        let batch = collector.next_batch().await.unwrap().unwrap();
731        collector.ack(batch.sequence).await.unwrap();
732        collector.close().await.unwrap();
733
734        // After close, entries should be dequeued
735        let (_, mut collector2) = make_collector(&store, test_collector_config(), None).await;
736        let result = collector2.next_batch().await.unwrap();
737        assert!(result.is_none());
738    }
739
740    #[tokio::test]
741    async fn should_fence_previous_collector() {
742        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
743        let (producer, mut collector1) =
744            make_collector(&store, test_collector_config(), None).await;
745
746        let entries = test_entries();
747        write_batch(&store, "batches/first", &entries).await;
748        producer
749            .enqueue("batches/first".to_string(), vec![])
750            .await
751            .unwrap();
752
753        // Second collector fences the first
754        let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
755
756        // First collector should get a Fenced error
757        let result = collector1.next_batch().await;
758        assert!(matches!(result, Err(Error::Fenced)));
759    }
760
761    #[tokio::test]
762    async fn should_iterate_multiple_sequential_batches() {
763        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
764        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
765
766        let entries = test_entries();
767        let locations = ["batches/a", "batches/b", "batches/c"];
768        for loc in &locations {
769            write_batch(&store, loc, &entries).await;
770            producer.enqueue(loc.to_string(), vec![]).await.unwrap();
771        }
772
773        for (i, expected_loc) in locations.iter().enumerate() {
774            let batch = collector.next_batch().await.unwrap().unwrap();
775            assert_eq!(batch.sequence, i as u64);
776            assert_eq!(batch.location, *expected_loc);
777            collector.ack(batch.sequence).await.unwrap();
778        }
779
780        let result = collector.next_batch().await.unwrap();
781        assert!(result.is_none());
782    }
783
784    #[tokio::test]
785    async fn should_initialize_none_with_pre_existing_entries() {
786        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
787        let (producer, _) = make_collector(&store, test_collector_config(), None).await;
788
789        // Enqueue and dequeue placeholder entries to advance the sequence counter
790        let entries = test_entries();
791        for i in 0..5 {
792            let loc = format!("batches/placeholder-{}", i);
793            write_batch(&store, &loc, &entries).await;
794            producer.enqueue(loc, vec![]).await.unwrap();
795        }
796        // Use a temporary collector to dequeue placeholders
797        let (_, mut tmp_collector) = make_collector(&store, test_collector_config(), None).await;
798        for _ in 0..5 {
799            let batch = tmp_collector.next_batch().await.unwrap().unwrap();
800            tmp_collector.ack(batch.sequence).await.unwrap();
801        }
802        tmp_collector.close().await.unwrap();
803
804        // Now enqueue the real entry — it gets sequence 5
805        write_batch(&store, "batches/pre-existing", &entries).await;
806        producer
807            .enqueue("batches/pre-existing".to_string(), vec![])
808            .await
809            .unwrap();
810
811        // New collector with initialize(None) should find this entry
812        let (_, mut collector) = make_collector(&store, test_collector_config(), None).await;
813
814        let batch = collector.next_batch().await.unwrap().unwrap();
815        assert_eq!(batch.location, "batches/pre-existing");
816        assert_eq!(batch.sequence, 5);
817    }
818
819    #[tokio::test]
820    async fn should_initialize_with_sequence_dequeue_already_processed() {
821        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
822        let (producer, _) = make_collector(&store, test_collector_config(), None).await;
823
824        let entries = test_entries();
825        write_batch(&store, "batches/first", &entries).await;
826        write_batch(&store, "batches/second", &entries).await;
827        producer
828            .enqueue("batches/first".to_string(), vec![])
829            .await
830            .unwrap();
831        producer
832            .enqueue("batches/second".to_string(), vec![])
833            .await
834            .unwrap();
835
836        // Simulate restart: new collector resumes after sequence 0
837        let (_, mut collector) = make_collector(&store, test_collector_config(), Some(0)).await;
838
839        // The flush in initialize should have dequeued entries through sequence 0
840        assert_eq!(collector.len(), 1);
841
842        let batch = collector.next_batch().await.unwrap().unwrap();
843        assert_eq!(batch.location, "batches/second");
844        assert_eq!(batch.sequence, 1);
845    }
846
847    // ========================================================================
848    // RFC 0003 read-ahead API tests.
849    // ========================================================================
850
851    async fn enqueue_n(
852        store: &Arc<dyn ObjectStore>,
853        producer: &QueueProducer,
854        n: usize,
855    ) -> Vec<String> {
856        let entries = test_entries();
857        let mut locations = Vec::with_capacity(n);
858        for i in 0..n {
859            let loc = format!("batches/seq-{i:06}");
860            write_batch(store, &loc, &entries).await;
861            producer.enqueue(loc.clone(), vec![]).await.unwrap();
862            locations.push(loc);
863        }
864        locations
865    }
866
867    #[tokio::test]
868    async fn should_next_descriptors_max_zero_return_empty_without_manifest_read() {
869        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
870        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
871        enqueue_n(&store, &producer, 3).await;
872
873        // max=0 returns empty and does not advance the cursor.
874        let v = collector.next_descriptors(0).await.unwrap();
875        assert!(v.is_empty());
876
877        // Subsequent next_descriptors still sees all three.
878        let v = collector.next_descriptors(10).await.unwrap();
879        assert_eq!(v.len(), 3);
880    }
881
882    #[tokio::test]
883    async fn should_next_descriptors_return_contiguous_run_when_available() {
884        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
885        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
886        enqueue_n(&store, &producer, 5).await;
887
888        let v = collector.next_descriptors(3).await.unwrap();
889        assert_eq!(v.len(), 3);
890        assert_eq!(v[0].sequence, 0);
891        assert_eq!(v[1].sequence, 1);
892        assert_eq!(v[2].sequence, 2);
893
894        // Cursor advanced; next call returns sequences 3 and 4 only.
895        let v2 = collector.next_descriptors(10).await.unwrap();
896        assert_eq!(v2.len(), 2);
897        assert_eq!(v2[0].sequence, 3);
898        assert_eq!(v2[1].sequence, 4);
899
900        // Empty when nothing new.
901        let v3 = collector.next_descriptors(10).await.unwrap();
902        assert!(v3.is_empty());
903    }
904
905    #[tokio::test]
906    async fn should_next_descriptors_return_fewer_than_max_on_short_manifest() {
907        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
908        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
909        enqueue_n(&store, &producer, 2).await;
910
911        let v = collector.next_descriptors(10).await.unwrap();
912        assert_eq!(v.len(), 2);
913    }
914
915    #[tokio::test]
916    async fn should_fetch_handle_return_same_consumed_batch_as_fetch_descriptor() {
917        // Fetch the *same* descriptor through both paths and compare:
918        // sequence, location, metadata, entries must all match.
919        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
920        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
921        let entries = test_entries();
922        let location = "batches/handle-vs-wrapper";
923        write_batch(&store, location, &entries).await;
924        let metadata = vec![Metadata {
925            start_index: 0,
926            ingestion_time_ms: 1_700_000_000_000,
927            payload: Bytes::from(r#"{"k":"v"}"#),
928        }];
929        producer
930            .enqueue(location.to_string(), metadata.clone())
931            .await
932            .unwrap();
933
934        let descriptors = collector.next_descriptors(1).await.unwrap();
935        assert_eq!(descriptors.len(), 1);
936        let d = descriptors[0].clone();
937
938        let from_handle = collector.fetch_handle().fetch(d.clone()).await.unwrap();
939        let from_wrapper = collector.fetch_descriptor(d).await.unwrap();
940
941        assert_eq!(from_handle.sequence, from_wrapper.sequence);
942        assert_eq!(from_handle.location, from_wrapper.location);
943        assert_eq!(from_handle.metadata, from_wrapper.metadata);
944        assert_eq!(from_handle.entries, from_wrapper.entries);
945        // And both must match what the producer actually wrote.
946        assert_eq!(from_handle.location, location);
947        assert_eq!(from_handle.entries.len(), 2);
948        assert_eq!(from_handle.metadata, metadata);
949    }
950
951    #[tokio::test]
952    async fn should_next_batch_not_advance_cursor_on_fetch_failure() {
953        // Pre-RFC-0003 contract: next_batch advances its cursor only
954        // after a successful fetch. The wrapper must roll back the
955        // private last_handed_out_sequence on fetch error so a retry
956        // re-issues the same batch.
957        use slatedb::object_store::path::Path;
958        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
959        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
960        let entries = test_entries();
961        let location_a = "batches/a-fails-then-succeeds";
962        let location_b = "batches/b";
963        write_batch(&store, location_a, &entries).await;
964        write_batch(&store, location_b, &entries).await;
965        producer
966            .enqueue(location_a.to_string(), vec![])
967            .await
968            .unwrap();
969        producer
970            .enqueue(location_b.to_string(), vec![])
971            .await
972            .unwrap();
973
974        // Delete A's data object so the fetch will fail.
975        store.delete(&Path::from(location_a)).await.unwrap();
976
977        // First next_batch fails on the fetch.
978        let r = collector.next_batch().await;
979        assert!(
980            matches!(r, Err(Error::Storage(_))),
981            "expected Storage error, got {r:?}"
982        );
983
984        // Re-write A so the next fetch succeeds.
985        write_batch(&store, location_a, &entries).await;
986
987        // Second next_batch must return A — cursor did not advance past
988        // it on the previous failure.
989        let b = collector.next_batch().await.unwrap().unwrap();
990        assert_eq!(b.sequence, 0);
991        assert_eq!(b.location, location_a);
992        // Acknowledge A and confirm forward progress to B.
993        collector.ack(b.sequence).await.unwrap();
994        let b2 = collector.next_batch().await.unwrap().unwrap();
995        assert_eq!(b2.sequence, 1);
996        assert_eq!(b2.location, location_b);
997    }
998
999    #[tokio::test]
1000    async fn should_next_batch_not_advance_cursor_when_future_is_dropped() {
1001        // Cancellation safety: constructing a `next_batch` future and
1002        // dropping it before it completes must leave
1003        // last_handed_out_sequence untouched, so the entry is not
1004        // skipped. The structural property: last_handed_out_sequence is
1005        // mutated *after* the fetch_descriptor await returns Ok, so a
1006        // future dropped before that line leaves the cursor unchanged.
1007        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1008        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1009        let entries = test_entries();
1010        write_batch(&store, "batches/cancel-a", &entries).await;
1011        producer
1012            .enqueue("batches/cancel-a".to_string(), vec![])
1013            .await
1014            .unwrap();
1015
1016        assert_eq!(collector.last_handed_out_sequence, None);
1017
1018        // Construct the future and drop it without ever polling it.
1019        {
1020            let fut = collector.next_batch();
1021            drop(fut);
1022        }
1023        assert_eq!(
1024            collector.last_handed_out_sequence, None,
1025            "dropped next_batch future must not advance the cursor"
1026        );
1027
1028        // The entry was not consumed: a real next_batch still returns it.
1029        let b = collector.next_batch().await.unwrap().unwrap();
1030        assert_eq!(b.sequence, 0);
1031        assert_eq!(b.location, "batches/cancel-a");
1032        assert_eq!(collector.last_handed_out_sequence, Some(0));
1033    }
1034
1035    #[tokio::test]
1036    async fn should_next_batch_advance_cursor_on_successful_fetch() {
1037        // The success branch (the deterministic counterpart to the
1038        // dropped-future test): a completed next_batch advances
1039        // last_handed_out_sequence to the fetched sequence, and
1040        // successive calls walk forward one entry at a time.
1041        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1042        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1043        let entries = test_entries();
1044        write_batch(&store, "batches/ok-a", &entries).await;
1045        write_batch(&store, "batches/ok-b", &entries).await;
1046        producer
1047            .enqueue("batches/ok-a".to_string(), vec![])
1048            .await
1049            .unwrap();
1050        producer
1051            .enqueue("batches/ok-b".to_string(), vec![])
1052            .await
1053            .unwrap();
1054
1055        assert_eq!(collector.last_handed_out_sequence, None);
1056
1057        let a = collector.next_batch().await.unwrap().unwrap();
1058        assert_eq!(a.sequence, 0);
1059        assert_eq!(collector.last_handed_out_sequence, Some(0));
1060
1061        let b = collector.next_batch().await.unwrap().unwrap();
1062        assert_eq!(b.sequence, 1);
1063        assert_eq!(collector.last_handed_out_sequence, Some(1));
1064    }
1065
1066    #[tokio::test]
1067    async fn should_fetch_handle_clones_fetch_concurrently_without_blocking_owner() {
1068        // The fetch handle is the parallel-fetch seam: it must be
1069        // Clone + Send + Sync + 'static (compile-time assertion below),
1070        // two clones must fetch from separate spawned tasks
1071        // concurrently, and the owning Consumer must keep &mut access
1072        // throughout (the handle holds an Arc, not a borrow) so it can
1073        // still drive the manifest — checked by the ack_through at the
1074        // end.
1075        fn assert_send_sync_static<T: Send + Sync + 'static>() {}
1076        assert_send_sync_static::<ConsumerFetchHandle>();
1077
1078        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1079        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1080        enqueue_n(&store, &producer, 4).await;
1081        let descriptors = collector.next_descriptors(4).await.unwrap();
1082
1083        // Two clones of the same handle, run from spawned tasks.
1084        let handle = collector.fetch_handle();
1085        let h1 = handle.clone();
1086        let h2 = handle.clone();
1087        let d0 = descriptors[0].clone();
1088        let d1 = descriptors[1].clone();
1089        let t1 = tokio::spawn(async move { h1.fetch(d0).await.unwrap() });
1090        let t2 = tokio::spawn(async move { h2.fetch(d1).await.unwrap() });
1091        let r1 = t1.await.unwrap();
1092        let r2 = t2.await.unwrap();
1093        assert_eq!(r1.sequence, 0);
1094        assert_eq!(r2.sequence, 1);
1095
1096        // Owner can still drive next_descriptors and ack_through after
1097        // the spawned tasks completed (no &mut conflict because the
1098        // handle holds an Arc, not a borrow).
1099        collector.ack_through(1).await.unwrap();
1100    }
1101
1102    #[tokio::test]
1103    async fn should_ack_through_advance_frontier_in_one_call() {
1104        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1105        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1106        enqueue_n(&store, &producer, 5).await;
1107
1108        let _v = collector.next_descriptors(5).await.unwrap();
1109
1110        // Advance through sequence 3 in one call.
1111        collector.ack_through(3).await.unwrap();
1112        // Manifest has 1 entry remaining (sequence 4).
1113        assert_eq!(collector.len(), 1);
1114    }
1115
1116    #[tokio::test]
1117    async fn should_ack_through_reject_non_monotonic_input() {
1118        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1119        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1120        enqueue_n(&store, &producer, 5).await;
1121        collector.next_descriptors(5).await.unwrap();
1122
1123        collector.ack_through(2).await.unwrap();
1124        let err = collector.ack_through(2).await.unwrap_err();
1125        match err {
1126            Error::Storage(msg) => assert!(msg.contains("non-monotonic ack_through")),
1127            other => panic!("expected Error::Storage, got {other:?}"),
1128        }
1129        // The bookkeeping did not advance.
1130        assert_eq!(collector.last_acked_sequence, Some(2));
1131    }
1132
1133    #[tokio::test]
1134    async fn should_ack_through_leave_state_unchanged_on_fence() {
1135        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1136        let (producer, mut collector1) =
1137            make_collector(&store, test_collector_config(), None).await;
1138        enqueue_n(&store, &producer, 5).await;
1139        collector1.next_descriptors(5).await.unwrap();
1140        collector1.ack_through(0).await.unwrap();
1141        assert_eq!(collector1.last_acked_sequence, Some(0));
1142
1143        // Fence collector1 by spinning up a second consumer.
1144        let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
1145
1146        // ack_through against the fenced consumer must error and leave
1147        // last_acked_sequence at 0.
1148        let err = collector1.ack_through(2).await.unwrap_err();
1149        assert!(matches!(err, Error::Fenced));
1150        assert_eq!(collector1.last_acked_sequence, Some(0));
1151    }
1152
1153    #[tokio::test]
1154    async fn should_next_descriptors_surface_fence() {
1155        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1156        let (producer, mut collector1) =
1157            make_collector(&store, test_collector_config(), None).await;
1158        enqueue_n(&store, &producer, 3).await;
1159
1160        let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
1161
1162        let result = collector1.next_descriptors(10).await;
1163        assert!(matches!(result, Err(Error::Fenced)));
1164    }
1165
1166    #[tokio::test]
1167    async fn should_next_batch_be_equivalent_to_descriptor_plus_fetch() {
1168        // Equivalence: next_batch == next_descriptors(1) + fetch_descriptor.
1169        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1170        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1171        let entries = test_entries();
1172        let location = "batches/equiv";
1173        write_batch(&store, location, &entries).await;
1174        producer
1175            .enqueue(location.to_string(), vec![])
1176            .await
1177            .unwrap();
1178
1179        let via_wrapper = collector.next_batch().await.unwrap().unwrap();
1180        assert_eq!(via_wrapper.sequence, 0);
1181        assert_eq!(via_wrapper.location, location);
1182        assert_eq!(via_wrapper.entries.len(), 2);
1183    }
1184
1185    #[tokio::test]
1186    async fn should_descriptor_handout_contract_not_reissue_lost_descriptors() {
1187        // Per RFC 0003: once next_descriptors returns a descriptor,
1188        // it is not handed out again within the same process.
1189        // Recovery is restart from the durable ack frontier.
1190        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
1191        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
1192        enqueue_n(&store, &producer, 3).await;
1193
1194        // Hand out 2 descriptors; "lose" them (drop without acking).
1195        let lost = collector.next_descriptors(2).await.unwrap();
1196        assert_eq!(lost.len(), 2);
1197        drop(lost);
1198
1199        // The same consumer's next_descriptors moves on past them.
1200        let next = collector.next_descriptors(10).await.unwrap();
1201        assert_eq!(next.len(), 1);
1202        assert_eq!(next[0].sequence, 2);
1203
1204        // Recovery: a fresh consumer initialized from the last durable
1205        // ack (None here, since nothing was acked) re-emits the
1206        // unacked range from the start.
1207        collector.close().await.unwrap();
1208        let (_, mut recovered) = make_collector(&store, test_collector_config(), None).await;
1209        let again = recovered.next_descriptors(10).await.unwrap();
1210        assert_eq!(again.len(), 3);
1211        assert_eq!(again[0].sequence, 0);
1212    }
1213}