1use std::sync::Arc;
2use std::time::{Instant, SystemTime, UNIX_EPOCH};
3
4use bytes::Bytes;
5use slatedb::object_store::ObjectStore;
6use slatedb::object_store::path::Path;
7use tokio_util::sync::CancellationToken;
8
9use crate::config::ConsumerConfig;
10use crate::error::{Error, Result};
11use crate::gc::GarbageCollector;
12use crate::metric_names as m;
13use crate::model::decode_batch;
14use crate::queue::{Metadata, QueueConsumer};
15
16const DEQUEUE_INTERVAL: u64 = 100;
17
18pub struct ConsumedBatch {
20 pub entries: Vec<Bytes>,
22 pub sequence: u64,
24 pub location: String,
26 pub metadata: Vec<Metadata>,
28}
29
30pub struct Consumer {
37 consumer: QueueConsumer,
38 object_store: Arc<dyn ObjectStore>,
39 gc_shutdown: CancellationToken,
40 gc_handle: tokio::task::JoinHandle<()>,
41 ack_count: u64,
42 last_acked_sequence: Option<u64>,
43 last_fetched_sequence: Option<u64>,
44}
45
46impl Consumer {
47 pub async fn new(config: ConsumerConfig, last_acked_sequence: Option<u64>) -> Result<Self> {
54 let object_store = common::storage::factory::create_object_store(&config.object_store)
55 .map_err(|e| Error::Storage(e.to_string()))?;
56 Self::with_object_store(config, object_store, last_acked_sequence).await
57 }
58
59 pub async fn with_object_store(
60 config: ConsumerConfig,
61 object_store: Arc<dyn ObjectStore>,
62 last_acked_sequence: Option<u64>,
63 ) -> Result<Self> {
64 crate::metric_names::describe_consumer_metrics();
65 let consumer =
66 QueueConsumer::with_object_store(config.manifest_path.clone(), object_store.clone());
67
68 consumer.initialize().await?;
70 if let Some(seq) = last_acked_sequence {
71 consumer.dequeue(seq).await?;
72 }
73
74 let gc_shutdown = CancellationToken::new();
75 let gc = GarbageCollector::new(
76 config.manifest_path,
77 config.data_path_prefix,
78 config.gc_interval,
79 config.gc_grace_period,
80 object_store.clone(),
81 );
82 let gc_handle = tokio::spawn(gc.collect(gc_shutdown.clone()));
83
84 Ok(Self {
85 consumer,
86 object_store,
87 ack_count: 0,
88 last_acked_sequence,
89 gc_shutdown,
90 gc_handle,
91 last_fetched_sequence: last_acked_sequence,
92 })
93 }
94
95 pub async fn next_batch(&mut self) -> Result<Option<ConsumedBatch>> {
101 let queue_entry = match self.last_fetched_sequence {
102 None => self.consumer.peek().await?,
103 Some(seq) => self.consumer.read(seq.wrapping_add(1)).await?,
104 };
105 metrics::gauge!(m::QUEUE_LENGTH).set(self.consumer.len() as f64);
106 match queue_entry {
107 Some(entry) => {
108 let sequence = entry.sequence;
109 let batch = self.fetch_batch(entry).await?;
110 self.last_fetched_sequence = Some(sequence);
111 if let Some(ref b) = batch
112 && let Some(last_meta) = b.metadata.last()
113 {
114 let now_ms = SystemTime::now()
115 .duration_since(UNIX_EPOCH)
116 .unwrap_or_default()
117 .as_millis() as i64;
118 let lag_s = (now_ms - last_meta.ingestion_time_ms) as f64 / 1000.0;
119 metrics::gauge!(m::CONSUMER_LAG_SECONDS).set(lag_s.max(0.0));
120 }
121 Ok(batch)
122 }
123 None => Ok(None),
124 }
125 }
126
127 async fn fetch_batch(
128 &self,
129 queue_entry: crate::queue::QueueEntry,
130 ) -> Result<Option<ConsumedBatch>> {
131 let start = Instant::now();
132 let path = Path::from(queue_entry.location.as_str());
133 let data = self
134 .object_store
135 .get(&path)
136 .await
137 .map_err(|e| Error::Storage(e.to_string()))?
138 .bytes()
139 .await
140 .map_err(|e| Error::Storage(e.to_string()))?;
141
142 let data_len = data.len() as u64;
143 let entries = decode_batch(data)?;
144
145 metrics::counter!(m::BATCHES_COLLECTED).increment(1);
146 metrics::counter!(m::ENTRIES_COLLECTED).increment(entries.len() as u64);
147 metrics::counter!(m::BYTES_COLLECTED).increment(data_len);
148 metrics::histogram!(m::FETCH_DURATION_SECONDS).record(start.elapsed().as_secs_f64());
149
150 Ok(Some(ConsumedBatch {
151 entries,
152 sequence: queue_entry.sequence,
153 location: queue_entry.location,
154 metadata: queue_entry.metadata,
155 }))
156 }
157
158 pub async fn ack(&mut self, sequence: u64) -> Result<()> {
165 if let Some(last) = self.last_acked_sequence
166 && sequence != last + 1
167 {
168 return Err(Error::Storage(format!(
169 "out-of-order ack: expected sequence {}, got {}",
170 last + 1,
171 sequence
172 )));
173 }
174 self.last_acked_sequence = Some(sequence);
175 self.ack_count += 1;
176 metrics::counter!(m::ACKS).increment(1);
177 if self.ack_count.is_multiple_of(DEQUEUE_INTERVAL) {
178 self.consumer.dequeue(sequence).await?;
179 }
180 Ok(())
181 }
182
183 pub async fn flush(&mut self) -> Result<()> {
185 if let Some(seq) = self.last_acked_sequence {
186 self.consumer.dequeue(seq).await?;
187 }
188 Ok(())
189 }
190
191 pub async fn close(mut self) -> Result<()> {
193 self.flush().await?;
194 self.gc_shutdown.cancel();
195 let _ = self.gc_handle.await;
196 Ok(())
197 }
198
199 pub fn len(&self) -> usize {
201 self.consumer.len()
202 }
203
204 pub fn is_empty(&self) -> bool {
206 self.len() == 0
207 }
208
209 pub fn conflict_rate(&self) -> f64 {
211 self.consumer.conflict_rate()
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::config::ConsumerConfig;
219 use crate::model::{CompressionType, encode_batch};
220 use crate::queue::{Metadata, QueueProducer};
221 use bytes::Bytes;
222 use common::ObjectStoreConfig;
223 use slatedb::object_store::PutPayload;
224 use slatedb::object_store::memory::InMemory;
225 use std::time::Duration;
226
227 const TEST_MANIFEST_PATH: &str = "test/manifest";
228
229 fn test_collector_config() -> ConsumerConfig {
230 ConsumerConfig {
231 object_store: ObjectStoreConfig::InMemory,
232 manifest_path: TEST_MANIFEST_PATH.to_string(),
233 data_path_prefix: "ingest".to_string(),
234 gc_interval: Duration::from_secs(300),
235 gc_grace_period: Duration::from_secs(600),
236 }
237 }
238
239 fn test_entries() -> Vec<Bytes> {
240 vec![Bytes::from("data1"), Bytes::from("data2")]
241 }
242
243 async fn write_batch(store: &Arc<dyn ObjectStore>, location: &str, entries: &[Bytes]) {
244 let payload = encode_batch(entries, CompressionType::None).unwrap();
245 let path = Path::from(location);
246 store.put(&path, PutPayload::from(payload)).await.unwrap();
247 }
248
249 async fn make_collector(
250 store: &Arc<dyn ObjectStore>,
251 config: ConsumerConfig,
252 last_acked_sequence: Option<u64>,
253 ) -> (QueueProducer, Consumer) {
254 let producer =
255 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
256 let collector = Consumer::with_object_store(config, store.clone(), last_acked_sequence)
257 .await
258 .unwrap();
259 (producer, collector)
260 }
261
262 #[tokio::test]
263 async fn should_collect_enqueued_batch() {
264 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
265 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
266
267 let entries = test_entries();
268 let location = "batches/batch-001";
269 write_batch(&store, location, &entries).await;
270 producer
271 .enqueue(location.to_string(), vec![])
272 .await
273 .unwrap();
274
275 let batch = collector.next_batch().await.unwrap().unwrap();
276 assert_eq!(batch.entries.len(), 2);
277 assert_eq!(batch.entries[0], Bytes::from("data1"));
278 assert_eq!(batch.entries[1], Bytes::from("data2"));
279 assert_eq!(batch.location, location);
280 }
281
282 #[tokio::test]
283 async fn should_collect_metadata_from_queue_entry() {
284 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
285 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
286
287 let entries = test_entries();
288 let location = "batches/batch-meta";
289 write_batch(&store, location, &entries).await;
290
291 let metadata = vec![Metadata {
292 start_index: 0,
293 ingestion_time_ms: 1_700_000_000_000,
294 payload: Bytes::from(r#"{"topic":"events"}"#),
295 }];
296 producer
297 .enqueue(location.to_string(), metadata)
298 .await
299 .unwrap();
300
301 let batch = collector.next_batch().await.unwrap().unwrap();
302 assert_eq!(batch.metadata.len(), 1);
303 assert_eq!(batch.metadata[0].start_index, 0);
304 assert_eq!(batch.metadata[0].ingestion_time_ms, 1_700_000_000_000);
305 assert_eq!(
306 batch.metadata[0].payload,
307 Bytes::from(r#"{"topic":"events"}"#)
308 );
309 }
310
311 #[tokio::test]
312 async fn should_return_none_when_queue_empty() {
313 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
314 let (_producer, mut collector) =
315 make_collector(&store, test_collector_config(), None).await;
316
317 let result = collector.next_batch().await.unwrap();
318 assert!(result.is_none());
319 }
320
321 #[tokio::test]
322 async fn should_ack_batch() {
323 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
324 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
325
326 let entries = test_entries();
327 let location = "batches/batch-002";
328 write_batch(&store, location, &entries).await;
329 producer
330 .enqueue(location.to_string(), vec![])
331 .await
332 .unwrap();
333
334 let batch = collector.next_batch().await.unwrap().unwrap();
335 collector.ack(batch.sequence).await.unwrap();
336 collector.flush().await.unwrap();
337
338 let next = collector.next_batch().await.unwrap();
340 assert!(next.is_none());
341 }
342
343 #[tokio::test]
344 async fn should_next_batch_return_batch_after_last_acked() {
345 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
346 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
347
348 let entries = test_entries();
349 write_batch(&store, "batches/first", &entries).await;
350 write_batch(&store, "batches/second", &entries).await;
351 producer
352 .enqueue("batches/first".to_string(), vec![])
353 .await
354 .unwrap();
355 producer
356 .enqueue("batches/second".to_string(), vec![])
357 .await
358 .unwrap();
359
360 let first = collector.next_batch().await.unwrap().unwrap();
361 collector.ack(first.sequence).await.unwrap();
362
363 let batch = collector.next_batch().await.unwrap().unwrap();
364 assert_eq!(batch.location, "batches/second");
365 assert_eq!(batch.sequence, 1);
366 assert_eq!(batch.entries.len(), 2);
367 }
368
369 #[tokio::test]
370 async fn should_next_batch_advance_before_previous_batch_is_acked() {
371 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
373 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
374
375 let entries = test_entries();
376 write_batch(&store, "batches/first", &entries).await;
377 write_batch(&store, "batches/second", &entries).await;
378 producer
379 .enqueue("batches/first".to_string(), vec![])
380 .await
381 .unwrap();
382 producer
383 .enqueue("batches/second".to_string(), vec![])
384 .await
385 .unwrap();
386
387 let first = collector.next_batch().await.unwrap().unwrap();
389 let second = collector.next_batch().await.unwrap().unwrap();
390
391 assert_eq!(first.location, "batches/first");
393 assert_eq!(first.sequence, 0);
394 assert_eq!(second.location, "batches/second");
395 assert_eq!(second.sequence, 1);
396 }
397
398 #[tokio::test]
399 async fn should_next_batch_return_none_when_no_more_entries() {
400 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
401 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
402
403 let entries = test_entries();
404 write_batch(&store, "batches/first", &entries).await;
405 producer
406 .enqueue("batches/first".to_string(), vec![])
407 .await
408 .unwrap();
409
410 let first = collector.next_batch().await.unwrap().unwrap();
411 collector.ack(first.sequence).await.unwrap();
412
413 let result = collector.next_batch().await.unwrap();
414 assert!(result.is_none());
415 }
416
417 #[tokio::test]
418 async fn should_resume_from_last_acked_sequence() {
419 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
420 let (producer, mut collector) =
421 make_collector(&store, test_collector_config(), Some(0)).await;
422
423 let entries = test_entries();
424 write_batch(&store, "batches/first", &entries).await;
425 write_batch(&store, "batches/second", &entries).await;
426 producer
427 .enqueue("batches/first".to_string(), vec![])
428 .await
429 .unwrap();
430 producer
431 .enqueue("batches/second".to_string(), vec![])
432 .await
433 .unwrap();
434
435 let batch = collector.next_batch().await.unwrap().unwrap();
437 assert_eq!(batch.location, "batches/second");
438 assert_eq!(batch.sequence, 1);
439 }
440
441 #[tokio::test]
442 async fn should_reject_out_of_order_ack() {
443 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
444 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
445
446 let entries = test_entries();
447 write_batch(&store, "batches/first", &entries).await;
448 write_batch(&store, "batches/second", &entries).await;
449 producer
450 .enqueue("batches/first".to_string(), vec![])
451 .await
452 .unwrap();
453 producer
454 .enqueue("batches/second".to_string(), vec![])
455 .await
456 .unwrap();
457
458 let first = collector.next_batch().await.unwrap().unwrap();
459 collector.ack(first.sequence).await.unwrap();
460
461 let result = collector.ack(5).await;
463 assert!(matches!(result, Err(Error::Storage(msg)) if msg.contains("out-of-order ack")));
464 }
465
466 #[tokio::test]
467 async fn should_batch_dequeue_calls() {
468 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
469 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
470
471 let entries = test_entries();
472 let count = DEQUEUE_INTERVAL + 1;
474 for i in 0..count {
475 let location = format!("batches/batch-{:04}", i);
476 write_batch(&store, &location, &entries).await;
477 producer.enqueue(location, vec![]).await.unwrap();
478 }
479
480 for i in 0..count {
482 let batch = collector.next_batch().await.unwrap().unwrap();
483 assert_eq!(batch.sequence, i);
484 collector.ack(batch.sequence).await.unwrap();
485 }
486
487 assert_eq!(collector.len(), 1);
490
491 collector.flush().await.unwrap();
493
494 let result = collector.next_batch().await.unwrap();
495 assert!(result.is_none());
496 }
497
498 #[tokio::test]
499 async fn should_flush_pending_acks() {
500 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
501 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
502
503 let entries = test_entries();
504 write_batch(&store, "batches/first", &entries).await;
505 write_batch(&store, "batches/second", &entries).await;
506 producer
507 .enqueue("batches/first".to_string(), vec![])
508 .await
509 .unwrap();
510 producer
511 .enqueue("batches/second".to_string(), vec![])
512 .await
513 .unwrap();
514
515 let batch = collector.next_batch().await.unwrap().unwrap();
516 collector.ack(batch.sequence).await.unwrap();
517
518 assert_eq!(collector.len(), 2);
521
522 collector.flush().await.unwrap();
524 assert_eq!(collector.len(), 1);
525 }
526
527 #[tokio::test]
528 async fn should_close_flush_and_consume() {
529 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
530 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
531
532 let entries = test_entries();
533 write_batch(&store, "batches/first", &entries).await;
534 producer
535 .enqueue("batches/first".to_string(), vec![])
536 .await
537 .unwrap();
538
539 let batch = collector.next_batch().await.unwrap().unwrap();
540 collector.ack(batch.sequence).await.unwrap();
541 collector.close().await.unwrap();
542
543 let (_, mut collector2) = make_collector(&store, test_collector_config(), None).await;
545 let result = collector2.next_batch().await.unwrap();
546 assert!(result.is_none());
547 }
548
549 #[tokio::test]
550 async fn should_fence_previous_collector() {
551 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
552 let (producer, mut collector1) =
553 make_collector(&store, test_collector_config(), None).await;
554
555 let entries = test_entries();
556 write_batch(&store, "batches/first", &entries).await;
557 producer
558 .enqueue("batches/first".to_string(), vec![])
559 .await
560 .unwrap();
561
562 let (_, _collector2) = make_collector(&store, test_collector_config(), None).await;
564
565 let result = collector1.next_batch().await;
567 assert!(matches!(result, Err(Error::Fenced)));
568 }
569
570 #[tokio::test]
571 async fn should_iterate_multiple_sequential_batches() {
572 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
573 let (producer, mut collector) = make_collector(&store, test_collector_config(), None).await;
574
575 let entries = test_entries();
576 let locations = ["batches/a", "batches/b", "batches/c"];
577 for loc in &locations {
578 write_batch(&store, loc, &entries).await;
579 producer.enqueue(loc.to_string(), vec![]).await.unwrap();
580 }
581
582 for (i, expected_loc) in locations.iter().enumerate() {
583 let batch = collector.next_batch().await.unwrap().unwrap();
584 assert_eq!(batch.sequence, i as u64);
585 assert_eq!(batch.location, *expected_loc);
586 collector.ack(batch.sequence).await.unwrap();
587 }
588
589 let result = collector.next_batch().await.unwrap();
590 assert!(result.is_none());
591 }
592
593 #[tokio::test]
594 async fn should_initialize_none_with_pre_existing_entries() {
595 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
596 let (producer, _) = make_collector(&store, test_collector_config(), None).await;
597
598 let entries = test_entries();
600 for i in 0..5 {
601 let loc = format!("batches/placeholder-{}", i);
602 write_batch(&store, &loc, &entries).await;
603 producer.enqueue(loc, vec![]).await.unwrap();
604 }
605 let (_, mut tmp_collector) = make_collector(&store, test_collector_config(), None).await;
607 for _ in 0..5 {
608 let batch = tmp_collector.next_batch().await.unwrap().unwrap();
609 tmp_collector.ack(batch.sequence).await.unwrap();
610 }
611 tmp_collector.close().await.unwrap();
612
613 write_batch(&store, "batches/pre-existing", &entries).await;
615 producer
616 .enqueue("batches/pre-existing".to_string(), vec![])
617 .await
618 .unwrap();
619
620 let (_, mut collector) = make_collector(&store, test_collector_config(), None).await;
622
623 let batch = collector.next_batch().await.unwrap().unwrap();
624 assert_eq!(batch.location, "batches/pre-existing");
625 assert_eq!(batch.sequence, 5);
626 }
627
628 #[tokio::test]
629 async fn should_initialize_with_sequence_dequeue_already_processed() {
630 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
631 let (producer, _) = make_collector(&store, test_collector_config(), None).await;
632
633 let entries = test_entries();
634 write_batch(&store, "batches/first", &entries).await;
635 write_batch(&store, "batches/second", &entries).await;
636 producer
637 .enqueue("batches/first".to_string(), vec![])
638 .await
639 .unwrap();
640 producer
641 .enqueue("batches/second".to_string(), vec![])
642 .await
643 .unwrap();
644
645 let (_, mut collector) = make_collector(&store, test_collector_config(), Some(0)).await;
647
648 assert_eq!(collector.len(), 1);
650
651 let batch = collector.next_batch().await.unwrap().unwrap();
652 assert_eq!(batch.location, "batches/second");
653 assert_eq!(batch.sequence, 1);
654 }
655}