Skip to main content

buffer/
consumer.rs

1use std::sync::Arc;
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use bytes::Bytes;
5use slatedb::object_store::ObjectStore;
6use slatedb::object_store::path::Path;
7use tokio_util::sync::CancellationToken;
8
9use crate::config::ConsumerConfig;
10use crate::error::{Error, Result};
11use crate::gc::GarbageCollector;
12use crate::metric_names as m;
13use crate::model::decode_batch;
14use crate::queue::{Metadata, QueueConsumer};
15
16const DEQUEUE_INTERVAL: u64 = 100;
17
18/// A batch of entries read from object storage by the [`Consumer`].
19pub struct ConsumedBatch {
20    /// The deserialized opaque byte entries from the data batch.
21    pub entries: Vec<Bytes>,
22    /// The queue sequence number of this batch.
23    pub sequence: u64,
24    /// The object storage path of the data batch.
25    pub location: String,
26    /// Metadata ranges attached by the buffer(s) that contributed to this batch.
27    pub metadata: Vec<Metadata>,
28}
29
30/// Reads batches of ingested entries from object storage via a queue consumer.
31///
32/// The consumer iterates over entries in the queue manifest in ingestion order,
33/// fetches the corresponding data batches from object storage, and makes them
34/// available to the caller. Epoch-based fencing ensures only a single active
35/// consumer processes entries at any time.
36pub struct Consumer {
37    consumer: QueueConsumer,
38    object_store: Arc<dyn ObjectStore>,
39    gc_shutdown: CancellationToken,
40    gc_handle: tokio::task::JoinHandle<()>,
41    ack_count: u64,
42    last_acked_sequence: Option<u64>,
43    last_fetched_sequence: Option<u64>,
44}
45
46impl Consumer {
47    /// Create a new consumer from the given configuration.
48    ///
49    /// Initializes the queue consumer (fencing any previous instance) and spawns
50    /// the garbage collector. If `last_acked_sequence` is `Some(seq)`, the
51    /// consumer resumes after that sequence; if `None`, it discovers the first
52    /// available entry.
53    pub async fn new(config: ConsumerConfig, last_acked_sequence: Option<u64>) -> Result<Self> {
54        let object_store = common::storage::factory::create_object_store(&config.object_store)
55            .map_err(|e| Error::Storage(e.to_string()))?;
56        Self::with_object_store(config, object_store, last_acked_sequence).await
57    }
58
59    pub async fn with_object_store(
60        config: ConsumerConfig,
61        object_store: Arc<dyn ObjectStore>,
62        last_acked_sequence: Option<u64>,
63    ) -> Result<Self> {
64        crate::metric_names::describe_consumer_metrics();
65        let consumer =
66            QueueConsumer::with_object_store(config.manifest_path.clone(), object_store.clone());
67
68        // Fence previous consumers and position the cursor before spawning GC.
69        consumer.initialize().await?;
70        if let Some(seq) = last_acked_sequence {
71            consumer.dequeue(seq).await?;
72        }
73
74        let gc_shutdown = CancellationToken::new();
75        let gc = GarbageCollector::new(
76            config.manifest_path,
77            config.data_path_prefix,
78            config.gc_interval,
79            config.gc_grace_period,
80            object_store.clone(),
81        );
82        let gc_handle = tokio::spawn(gc.collect(gc_shutdown.clone()));
83
84        Ok(Self {
85            consumer,
86            object_store,
87            ack_count: 0,
88            last_acked_sequence,
89            gc_shutdown,
90            gc_handle,
91            last_fetched_sequence: last_acked_sequence,
92        })
93    }
94
95    /// Read the next data batch from object storage.
96    ///
97    /// If no batch has been fetched yet, peeks the earliest entry in the queue.
98    /// Otherwise, reads the entry with sequence `last_fetched_sequence + 1`.
99    /// Returns `None` if no matching entry is available.
100    pub async fn next_batch(&mut self) -> Result<Option<ConsumedBatch>> {
101        let queue_entry = match self.last_fetched_sequence {
102            None => self.consumer.peek().await?,
103            Some(seq) => self.consumer.read(seq.wrapping_add(1)).await?,
104        };
105        metrics::gauge!(m::QUEUE_LENGTH).set(self.consumer.len() as f64);
106        match queue_entry {
107            Some(entry) => {
108                let sequence = entry.sequence;
109                let batch = self.fetch_batch(entry).await?;
110                self.last_fetched_sequence = Some(sequence);
111                if let Some(ref b) = batch
112                    && let Some(last_meta) = b.metadata.last()
113                {
114                    let now_ms = SystemTime::now()
115                        .duration_since(UNIX_EPOCH)
116                        .unwrap_or_default()
117                        .as_millis() as i64;
118                    let lag_s = (now_ms - last_meta.ingestion_time_ms) as f64 / 1000.0;
119                    metrics::gauge!(m::CONSUMER_LAG_SECONDS).set(lag_s.max(0.0));
120                }
121                Ok(batch)
122            }
123            None => Ok(None),
124        }
125    }
126
127    async fn fetch_batch(
128        &self,
129        queue_entry: crate::queue::QueueEntry,
130    ) -> Result<Option<ConsumedBatch>> {
131        let start = Instant::now();
132        let path = Path::from(queue_entry.location.as_str());
133        let data = self
134            .object_store
135            .get(&path)
136            .await
137            .map_err(|e| Error::Storage(e.to_string()))?
138            .bytes()
139            .await
140            .map_err(|e| Error::Storage(e.to_string()))?;
141
142        let data_len = data.len() as u64;
143        let entries = decode_batch(data)?;
144
145        metrics::counter!(m::BATCHES_COLLECTED).increment(1);
146        metrics::counter!(m::ENTRIES_COLLECTED).increment(entries.len() as u64);
147        metrics::counter!(m::BYTES_COLLECTED).increment(data_len);
148        metrics::histogram!(m::FETCH_DURATION_SECONDS).record(start.elapsed().as_secs_f64());
149
150        Ok(Some(ConsumedBatch {
151            entries,
152            sequence: queue_entry.sequence,
153            location: queue_entry.location,
154            metadata: queue_entry.metadata,
155        }))
156    }
157
158    /// Acknowledge that the batch with the given sequence number has been processed.
159    ///
160    /// Acks must be in order — the sequence must immediately follow the last acked
161    /// sequence, otherwise an error is returned. To amortize manifest writes, the
162    /// consumer only calls `dequeue()` on the queue consumer every
163    /// 100 acks.
164    pub async fn ack(&mut self, sequence: u64) -> Result<()> {
165        if let Some(last) = self.last_acked_sequence
166            && sequence != last + 1
167        {
168            return Err(Error::Storage(format!(
169                "out-of-order ack: expected sequence {}, got {}",
170                last + 1,
171                sequence
172            )));
173        }
174        self.last_acked_sequence = Some(sequence);
175        self.ack_count += 1;
176        metrics::counter!(m::ACKS).increment(1);
177        if self.ack_count.is_multiple_of(DEQUEUE_INTERVAL) {
178            self.consumer.dequeue(sequence).await?;
179        }
180        Ok(())
181    }
182
183    /// Flush any pending acks by dequeueing up to the last acked sequence.
184    pub async fn flush(&mut self) -> Result<()> {
185        if let Some(seq) = self.last_acked_sequence {
186            self.consumer.dequeue(seq).await?;
187        }
188        Ok(())
189    }
190
191    /// Flush pending acks, shut down the garbage collector, and consume the handle.
192    pub async fn close(mut self) -> Result<()> {
193        self.flush().await?;
194        self.gc_shutdown.cancel();
195        let _ = self.gc_handle.await;
196        Ok(())
197    }
198
199    /// Return the number of entries in the queue as of the last manifest read or write.
200    pub fn len(&self) -> usize {
201        self.consumer.len()
202    }
203
204    /// Return `true` if the queue had no entries as of the last manifest read or write.
205    pub fn is_empty(&self) -> bool {
206        self.len() == 0
207    }
208
209    /// Return the percentage of manifest writes that encountered a conflict.
210    pub fn conflict_rate(&self) -> f64 {
211        self.consumer.conflict_rate()
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::config::ConsumerConfig;
219    use crate::model::{CompressionType, encode_batch};
220    use crate::queue::{Metadata, QueueProducer};
221    use bytes::Bytes;
222    use common::ObjectStoreConfig;
223    use slatedb::object_store::PutPayload;
224    use slatedb::object_store::memory::InMemory;
225    use std::time::Duration;
226
227    const TEST_MANIFEST_PATH: &str = "test/manifest";
228
229    fn test_collector_config() -> ConsumerConfig {
230        ConsumerConfig {
231            object_store: ObjectStoreConfig::InMemory,
232            manifest_path: TEST_MANIFEST_PATH.to_string(),
233            data_path_prefix: "ingest".to_string(),
234            gc_interval: Duration::from_secs(300),
235            gc_grace_period: Duration::from_secs(600),
236        }
237    }
238
239    fn test_entries() -> Vec<Bytes> {
240        vec![Bytes::from("data1"), Bytes::from("data2")]
241    }
242
243    async fn write_batch(store: &Arc<dyn ObjectStore>, location: &str, entries: &[Bytes]) {
244        let payload = encode_batch(entries, CompressionType::None).unwrap();
245        let path = Path::from(location);
246        store.put(&path, PutPayload::from(payload)).await.unwrap();
247    }
248
249    async fn make_collector(
250        store: &Arc<dyn ObjectStore>,
251        config: ConsumerConfig,
252        last_acked_sequence: Option<u64>,
253    ) -> (QueueProducer, Consumer) {
254        let producer =
255            QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
256        let collector = Consumer::with_object_store(config, store.clone(), last_acked_sequence)
257            .await
258            .unwrap();
259        (producer, collector)
260    }
261
262    #[tokio::test]
263    async fn should_collect_enqueued_batch() {
264        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
265        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
266
267        let entries = test_entries();
268        let location = "batches/batch-001";
269        write_batch(&store, location, &entries).await;
270        producer
271            .enqueue(location.to_string(), vec![])
272            .await
273            .unwrap();
274
275        let batch = collector.next_batch().await.unwrap().unwrap();
276        assert_eq!(batch.entries.len(), 2);
277        assert_eq!(batch.entries[0], Bytes::from("data1"));
278        assert_eq!(batch.entries[1], Bytes::from("data2"));
279        assert_eq!(batch.location, location);
280    }
281
282    #[tokio::test]
283    async fn should_collect_metadata_from_queue_entry() {
284        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
285        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
286
287        let entries = test_entries();
288        let location = "batches/batch-meta";
289        write_batch(&store, location, &entries).await;
290
291        let metadata = vec![Metadata {
292            start_index: 0,
293            ingestion_time_ms: 1_700_000_000_000,
294            payload: Bytes::from(r#"{"topic":"events"}"#),
295        }];
296        producer
297            .enqueue(location.to_string(), metadata)
298            .await
299            .unwrap();
300
301        let batch = collector.next_batch().await.unwrap().unwrap();
302        assert_eq!(batch.metadata.len(), 1);
303        assert_eq!(batch.metadata[0].start_index, 0);
304        assert_eq!(batch.metadata[0].ingestion_time_ms, 1_700_000_000_000);
305        assert_eq!(
306            batch.metadata[0].payload,
307            Bytes::from(r#"{"topic":"events"}"#)
308        );
309    }
310
311    #[tokio::test]
312    async fn should_return_none_when_queue_empty() {
313        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
314        let (_producer, mut collector) =
315            make_collector(&store, test_collector_config(), None).await;
316
317        let result = collector.next_batch().await.unwrap();
318        assert!(result.is_none());
319    }
320
321    #[tokio::test]
322    async fn should_ack_batch() {
323        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
324        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
325
326        let entries = test_entries();
327        let location = "batches/batch-002";
328        write_batch(&store, location, &entries).await;
329        producer
330            .enqueue(location.to_string(), vec![])
331            .await
332            .unwrap();
333
334        let batch = collector.next_batch().await.unwrap().unwrap();
335        collector.ack(batch.sequence).await.unwrap();
336        collector.flush().await.unwrap();
337
338        // After flush, entry is dequeued
339        let next = collector.next_batch().await.unwrap();
340        assert!(next.is_none());
341    }
342
343    #[tokio::test]
344    async fn should_next_batch_return_batch_after_last_acked() {
345        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
346        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
347
348        let entries = test_entries();
349        write_batch(&store, "batches/first", &entries).await;
350        write_batch(&store, "batches/second", &entries).await;
351        producer
352            .enqueue("batches/first".to_string(), vec![])
353            .await
354            .unwrap();
355        producer
356            .enqueue("batches/second".to_string(), vec![])
357            .await
358            .unwrap();
359
360        let first = collector.next_batch().await.unwrap().unwrap();
361        collector.ack(first.sequence).await.unwrap();
362
363        let batch = collector.next_batch().await.unwrap().unwrap();
364        assert_eq!(batch.location, "batches/second");
365        assert_eq!(batch.sequence, 1);
366        assert_eq!(batch.entries.len(), 2);
367    }
368
369    #[tokio::test]
370    async fn should_next_batch_advance_before_previous_batch_is_acked() {
371        // given
372        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
373        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
374
375        let entries = test_entries();
376        write_batch(&store, "batches/first", &entries).await;
377        write_batch(&store, "batches/second", &entries).await;
378        producer
379            .enqueue("batches/first".to_string(), vec![])
380            .await
381            .unwrap();
382        producer
383            .enqueue("batches/second".to_string(), vec![])
384            .await
385            .unwrap();
386
387        // when
388        let first = collector.next_batch().await.unwrap().unwrap();
389        let second = collector.next_batch().await.unwrap().unwrap();
390
391        // then
392        assert_eq!(first.location, "batches/first");
393        assert_eq!(first.sequence, 0);
394        assert_eq!(second.location, "batches/second");
395        assert_eq!(second.sequence, 1);
396    }
397
398    #[tokio::test]
399    async fn should_next_batch_return_none_when_no_more_entries() {
400        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
401        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
402
403        let entries = test_entries();
404        write_batch(&store, "batches/first", &entries).await;
405        producer
406            .enqueue("batches/first".to_string(), vec![])
407            .await
408            .unwrap();
409
410        let first = collector.next_batch().await.unwrap().unwrap();
411        collector.ack(first.sequence).await.unwrap();
412
413        let result = collector.next_batch().await.unwrap();
414        assert!(result.is_none());
415    }
416
417    #[tokio::test]
418    async fn should_resume_from_last_acked_sequence() {
419        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
420        let (producer, mut collector) =
421            make_collector(&store, test_collector_config(), Some(0)).await;
422
423        let entries = test_entries();
424        write_batch(&store, "batches/first", &entries).await;
425        write_batch(&store, "batches/second", &entries).await;
426        producer
427            .enqueue("batches/first".to_string(), vec![])
428            .await
429            .unwrap();
430        producer
431            .enqueue("batches/second".to_string(), vec![])
432            .await
433            .unwrap();
434
435        // Initialized with last_acked_sequence=0, so next_batch reads sequence 1
436        let batch = collector.next_batch().await.unwrap().unwrap();
437        assert_eq!(batch.location, "batches/second");
438        assert_eq!(batch.sequence, 1);
439    }
440
441    #[tokio::test]
442    async fn should_reject_out_of_order_ack() {
443        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
444        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
445
446        let entries = test_entries();
447        write_batch(&store, "batches/first", &entries).await;
448        write_batch(&store, "batches/second", &entries).await;
449        producer
450            .enqueue("batches/first".to_string(), vec![])
451            .await
452            .unwrap();
453        producer
454            .enqueue("batches/second".to_string(), vec![])
455            .await
456            .unwrap();
457
458        let first = collector.next_batch().await.unwrap().unwrap();
459        collector.ack(first.sequence).await.unwrap();
460
461        // Acking sequence 5 when last acked was 0 should fail
462        let result = collector.ack(5).await;
463        assert!(matches!(result, Err(Error::Storage(msg)) if msg.contains("out-of-order ack")));
464    }
465
466    #[tokio::test]
467    async fn should_batch_dequeue_calls() {
468        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
469        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
470
471        let entries = test_entries();
472        // Enqueue DEQUEUE_INTERVAL + 1 entries
473        let count = DEQUEUE_INTERVAL + 1;
474        for i in 0..count {
475            let location = format!("batches/batch-{:04}", i);
476            write_batch(&store, &location, &entries).await;
477            producer.enqueue(location, vec![]).await.unwrap();
478        }
479
480        // Ack all entries; dequeue fires at DEQUEUE_INTERVAL
481        for i in 0..count {
482            let batch = collector.next_batch().await.unwrap().unwrap();
483            assert_eq!(batch.sequence, i);
484            collector.ack(batch.sequence).await.unwrap();
485        }
486
487        // After DEQUEUE_INTERVAL acks, entries up to that point are dequeued.
488        // The last entry (index DEQUEUE_INTERVAL) was acked but not yet dequeued.
489        assert_eq!(collector.len(), 1);
490
491        // Flush to dequeue the remaining entry.
492        collector.flush().await.unwrap();
493
494        let result = collector.next_batch().await.unwrap();
495        assert!(result.is_none());
496    }
497
498    #[tokio::test]
499    async fn should_flush_pending_acks() {
500        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
501        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
502
503        let entries = test_entries();
504        write_batch(&store, "batches/first", &entries).await;
505        write_batch(&store, "batches/second", &entries).await;
506        producer
507            .enqueue("batches/first".to_string(), vec![])
508            .await
509            .unwrap();
510        producer
511            .enqueue("batches/second".to_string(), vec![])
512            .await
513            .unwrap();
514
515        let batch = collector.next_batch().await.unwrap().unwrap();
516        collector.ack(batch.sequence).await.unwrap();
517
518        // Without flush, the entry is still in the manifest.
519        // Verify by checking queue length hasn't changed.
520        assert_eq!(collector.len(), 2);
521
522        // After flush, dequeue removes the acked entry
523        collector.flush().await.unwrap();
524        assert_eq!(collector.len(), 1);
525    }
526
527    #[tokio::test]
528    async fn should_close_flush_and_consume() {
529        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
530        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
531
532        let entries = test_entries();
533        write_batch(&store, "batches/first", &entries).await;
534        producer
535            .enqueue("batches/first".to_string(), vec![])
536            .await
537            .unwrap();
538
539        let batch = collector.next_batch().await.unwrap().unwrap();
540        collector.ack(batch.sequence).await.unwrap();
541        collector.close().await.unwrap();
542
543        // After close, entries should be dequeued
544        let (_, mut collector2) = make_collector(&store, test_collector_config(), None).await;
545        let result = collector2.next_batch().await.unwrap();
546        assert!(result.is_none());
547    }
548
549    #[tokio::test]
550    async fn should_fence_previous_collector() {
551        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
552        let (producer, mut collector1) =
553            make_collector(&store, test_collector_config(), None).await;
554
555        let entries = test_entries();
556        write_batch(&store, "batches/first", &entries).await;
557        producer
558            .enqueue("batches/first".to_string(), vec![])
559            .await
560            .unwrap();
561
562        // Second collector fences the first
563        let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
564
565        // First collector should get a Fenced error
566        let result = collector1.next_batch().await;
567        assert!(matches!(result, Err(Error::Fenced)));
568    }
569
570    #[tokio::test]
571    async fn should_iterate_multiple_sequential_batches() {
572        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
573        let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
574
575        let entries = test_entries();
576        let locations = ["batches/a", "batches/b", "batches/c"];
577        for loc in &locations {
578            write_batch(&store, loc, &entries).await;
579            producer.enqueue(loc.to_string(), vec![]).await.unwrap();
580        }
581
582        for (i, expected_loc) in locations.iter().enumerate() {
583            let batch = collector.next_batch().await.unwrap().unwrap();
584            assert_eq!(batch.sequence, i as u64);
585            assert_eq!(batch.location, *expected_loc);
586            collector.ack(batch.sequence).await.unwrap();
587        }
588
589        let result = collector.next_batch().await.unwrap();
590        assert!(result.is_none());
591    }
592
593    #[tokio::test]
594    async fn should_initialize_none_with_pre_existing_entries() {
595        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
596        let (producer, _) = make_collector(&store, test_collector_config(), None).await;
597
598        // Enqueue and dequeue placeholder entries to advance the sequence counter
599        let entries = test_entries();
600        for i in 0..5 {
601            let loc = format!("batches/placeholder-{}", i);
602            write_batch(&store, &loc, &entries).await;
603            producer.enqueue(loc, vec![]).await.unwrap();
604        }
605        // Use a temporary collector to dequeue placeholders
606        let (_, mut tmp_collector) = make_collector(&store, test_collector_config(), None).await;
607        for _ in 0..5 {
608            let batch = tmp_collector.next_batch().await.unwrap().unwrap();
609            tmp_collector.ack(batch.sequence).await.unwrap();
610        }
611        tmp_collector.close().await.unwrap();
612
613        // Now enqueue the real entry — it gets sequence 5
614        write_batch(&store, "batches/pre-existing", &entries).await;
615        producer
616            .enqueue("batches/pre-existing".to_string(), vec![])
617            .await
618            .unwrap();
619
620        // New collector with initialize(None) should find this entry
621        let (_, mut collector) = make_collector(&store, test_collector_config(), None).await;
622
623        let batch = collector.next_batch().await.unwrap().unwrap();
624        assert_eq!(batch.location, "batches/pre-existing");
625        assert_eq!(batch.sequence, 5);
626    }
627
628    #[tokio::test]
629    async fn should_initialize_with_sequence_dequeue_already_processed() {
630        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
631        let (producer, _) = make_collector(&store, test_collector_config(), None).await;
632
633        let entries = test_entries();
634        write_batch(&store, "batches/first", &entries).await;
635        write_batch(&store, "batches/second", &entries).await;
636        producer
637            .enqueue("batches/first".to_string(), vec![])
638            .await
639            .unwrap();
640        producer
641            .enqueue("batches/second".to_string(), vec![])
642            .await
643            .unwrap();
644
645        // Simulate restart: new collector resumes after sequence 0
646        let (_, mut collector) = make_collector(&store, test_collector_config(), Some(0)).await;
647
648        // The flush in initialize should have dequeued entries through sequence 0
649        assert_eq!(collector.len(), 1);
650
651        let batch = collector.next_batch().await.unwrap().unwrap();
652        assert_eq!(batch.location, "batches/second");
653        assert_eq!(batch.sequence, 1);
654    }
655}