1use std::cell::Cell;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use futures::StreamExt;
6use futures::stream;
7use slatedb::object_store::ObjectStore;
8use slatedb::object_store::path::Path;
9
10use crate::config::CollectorConfig;
11use crate::error::{Error, Result};
12use crate::model::decode_batch;
13use crate::queue::{Metadata, QueueConsumer, QueueEntry};
14
15const DEQUEUE_INTERVAL: u64 = 100;
16
17pub struct CollectedBatch {
19 pub entries: Vec<Bytes>,
21 pub sequence: u64,
23 pub location: String,
25 pub metadata: Vec<Metadata>,
27}
28
29pub struct Collector {
36 consumer: QueueConsumer,
37 object_store: Arc<dyn ObjectStore>,
38 ack_count: Cell<u64>,
39 last_acked_sequence: Cell<Option<u64>>,
40}
41
42impl Collector {
43 pub fn new(config: CollectorConfig) -> Result<Self> {
45 let object_store = common::storage::factory::create_object_store(&config.object_store)
46 .map_err(|e| Error::Storage(e.to_string()))?;
47 Ok(Self::with_object_store(config, object_store))
48 }
49
50 fn with_object_store(config: CollectorConfig, object_store: Arc<dyn ObjectStore>) -> Self {
51 let consumer = QueueConsumer::with_object_store(config.manifest_path, object_store.clone());
52 Self {
53 consumer,
54 object_store,
55 ack_count: Cell::new(0),
56 last_acked_sequence: Cell::new(None),
57 }
58 }
59
60 pub async fn initialize(&self, last_acked_sequence: Option<u64>) -> Result<()> {
67 self.consumer.initialize().await?;
68 if let Some(seq) = last_acked_sequence {
69 self.last_acked_sequence.set(Some(seq));
70 self.flush().await?;
71 }
72 Ok(())
73 }
74
75 pub async fn next_batch(&self) -> Result<Option<CollectedBatch>> {
81 let queue_entry = match self.last_acked_sequence.get() {
82 Some(seq) => self.consumer.read(seq.wrapping_add(1)).await?,
83 None => self.consumer.peek().await?,
84 };
85 match queue_entry {
86 Some(entry) => self.fetch_batch(entry).await,
87 None => Ok(None),
88 }
89 }
90
91 async fn fetch_batch(
92 &self,
93 queue_entry: crate::queue::QueueEntry,
94 ) -> Result<Option<CollectedBatch>> {
95 let path = Path::from(queue_entry.location.as_str());
96 let data = self
97 .object_store
98 .get(&path)
99 .await
100 .map_err(|e| Error::Storage(e.to_string()))?
101 .bytes()
102 .await
103 .map_err(|e| Error::Storage(e.to_string()))?;
104
105 let entries = decode_batch(data)?;
106
107 Ok(Some(CollectedBatch {
108 entries,
109 sequence: queue_entry.sequence,
110 location: queue_entry.location,
111 metadata: queue_entry.metadata,
112 }))
113 }
114
115 pub async fn ack(&self, sequence: u64) -> Result<()> {
122 if let Some(last) = self.last_acked_sequence.get()
123 && sequence != last + 1
124 {
125 return Err(Error::Storage(format!(
126 "out-of-order ack: expected sequence {}, got {}",
127 last + 1,
128 sequence
129 )));
130 }
131 self.last_acked_sequence.set(Some(sequence));
132 let count = self.ack_count.get() + 1;
133 self.ack_count.set(count);
134 if count.is_multiple_of(DEQUEUE_INTERVAL) {
135 let dequeued = self.consumer.dequeue(sequence).await?;
136 delete_dequeued_batches(self.object_store.clone(), dequeued);
137 }
138 Ok(())
139 }
140
141 pub async fn flush(&self) -> Result<()> {
143 if let Some(seq) = self.last_acked_sequence.get() {
144 let dequeued = self.consumer.dequeue(seq).await?;
145 delete_dequeued_batches(self.object_store.clone(), dequeued);
146 }
147 Ok(())
148 }
149
150 pub async fn close(self) -> Result<()> {
152 self.flush().await
153 }
154
155 pub fn len(&self) -> usize {
157 self.consumer.len()
158 }
159
160 pub fn is_empty(&self) -> bool {
162 self.len() == 0
163 }
164
165 pub fn conflict_rate(&self) -> f64 {
167 self.consumer.conflict_rate()
168 }
169}
170
171fn delete_dequeued_batches(object_store: Arc<dyn ObjectStore>, entries: Vec<QueueEntry>) {
172 if entries.is_empty() {
173 return;
174 }
175 tokio::spawn(async move {
176 let locations = stream::iter(entries.iter().map(|e| Ok(Path::from(e.location.as_str()))));
177 let mut results = object_store.delete_stream(locations.boxed());
178 let mut i = 0;
179 while let Some(result) = results.next().await {
180 if let Err(e) = result {
181 tracing::warn!(
182 path = entries[i].location,
183 error = %e,
184 "failed to delete ingested data batch"
185 );
186 }
187 i += 1;
188 }
189 });
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::config::CollectorConfig;
196 use crate::model::{CompressionType, encode_batch};
197 use crate::queue::{Metadata, QueueProducer};
198 use bytes::Bytes;
199 use common::ObjectStoreConfig;
200 use slatedb::object_store::PutPayload;
201 use slatedb::object_store::memory::InMemory;
202
203 const TEST_MANIFEST_PATH: &str = "test/manifest";
204
205 fn test_collector_config() -> CollectorConfig {
206 CollectorConfig {
207 object_store: ObjectStoreConfig::InMemory,
208 manifest_path: TEST_MANIFEST_PATH.to_string(),
209 }
210 }
211
212 fn test_entries() -> Vec<Bytes> {
213 vec![Bytes::from("data1"), Bytes::from("data2")]
214 }
215
216 async fn write_batch(store: &Arc<dyn ObjectStore>, location: &str, entries: &[Bytes]) {
217 let payload = encode_batch(entries, CompressionType::None).unwrap();
218 let path = Path::from(location);
219 store.put(&path, PutPayload::from(payload)).await.unwrap();
220 }
221
222 fn make_collector(
223 store: &Arc<dyn ObjectStore>,
224 config: CollectorConfig,
225 ) -> (QueueProducer, Collector) {
226 let producer =
227 QueueProducer::with_object_store(TEST_MANIFEST_PATH.to_string(), store.clone());
228 let collector = Collector::with_object_store(config, store.clone());
229 (producer, collector)
230 }
231
232 #[tokio::test]
233 async fn should_collect_enqueued_batch() {
234 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
235 let (producer, collector) = make_collector(&store, test_collector_config());
236 collector.initialize(None).await.unwrap();
237
238 let entries = test_entries();
239 let location = "batches/batch-001";
240 write_batch(&store, location, &entries).await;
241 producer
242 .enqueue(location.to_string(), vec![])
243 .await
244 .unwrap();
245
246 let batch = collector.next_batch().await.unwrap().unwrap();
247 assert_eq!(batch.entries.len(), 2);
248 assert_eq!(batch.entries[0], Bytes::from("data1"));
249 assert_eq!(batch.entries[1], Bytes::from("data2"));
250 assert_eq!(batch.location, location);
251 }
252
253 #[tokio::test]
254 async fn should_collect_metadata_from_queue_entry() {
255 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
256 let (producer, collector) = make_collector(&store, test_collector_config());
257 collector.initialize(None).await.unwrap();
258
259 let entries = test_entries();
260 let location = "batches/batch-meta";
261 write_batch(&store, location, &entries).await;
262
263 let metadata = vec![Metadata {
264 start_index: 0,
265 ingestion_time_ms: 1_700_000_000_000,
266 payload: Bytes::from(r#"{"topic":"events"}"#),
267 }];
268 producer
269 .enqueue(location.to_string(), metadata)
270 .await
271 .unwrap();
272
273 let batch = collector.next_batch().await.unwrap().unwrap();
274 assert_eq!(batch.metadata.len(), 1);
275 assert_eq!(batch.metadata[0].start_index, 0);
276 assert_eq!(batch.metadata[0].ingestion_time_ms, 1_700_000_000_000);
277 assert_eq!(
278 batch.metadata[0].payload,
279 Bytes::from(r#"{"topic":"events"}"#)
280 );
281 }
282
283 #[tokio::test]
284 async fn should_return_none_when_queue_empty() {
285 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
286 let (_producer, collector) = make_collector(&store, test_collector_config());
287 collector.initialize(None).await.unwrap();
288
289 let result = collector.next_batch().await.unwrap();
290 assert!(result.is_none());
291 }
292
293 #[tokio::test]
294 async fn should_ack_batch() {
295 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
296 let (producer, collector) = make_collector(&store, test_collector_config());
297 collector.initialize(None).await.unwrap();
298
299 let entries = test_entries();
300 let location = "batches/batch-002";
301 write_batch(&store, location, &entries).await;
302 producer
303 .enqueue(location.to_string(), vec![])
304 .await
305 .unwrap();
306
307 let batch = collector.next_batch().await.unwrap().unwrap();
308 collector.ack(batch.sequence).await.unwrap();
309 collector.flush().await.unwrap();
310
311 let next = collector.next_batch().await.unwrap();
313 assert!(next.is_none());
314 }
315
316 #[tokio::test]
317 async fn should_next_batch_return_batch_after_last_acked() {
318 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
319 let (producer, collector) = make_collector(&store, test_collector_config());
320 collector.initialize(None).await.unwrap();
321
322 let entries = test_entries();
323 write_batch(&store, "batches/first", &entries).await;
324 write_batch(&store, "batches/second", &entries).await;
325 producer
326 .enqueue("batches/first".to_string(), vec![])
327 .await
328 .unwrap();
329 producer
330 .enqueue("batches/second".to_string(), vec![])
331 .await
332 .unwrap();
333
334 let first = collector.next_batch().await.unwrap().unwrap();
335 collector.ack(first.sequence).await.unwrap();
336
337 let batch = collector.next_batch().await.unwrap().unwrap();
338 assert_eq!(batch.location, "batches/second");
339 assert_eq!(batch.sequence, 1);
340 assert_eq!(batch.entries.len(), 2);
341 }
342
343 #[tokio::test]
344 async fn should_next_batch_return_none_when_no_more_entries() {
345 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
346 let (producer, collector) = make_collector(&store, test_collector_config());
347 collector.initialize(None).await.unwrap();
348
349 let entries = test_entries();
350 write_batch(&store, "batches/first", &entries).await;
351 producer
352 .enqueue("batches/first".to_string(), vec![])
353 .await
354 .unwrap();
355
356 let first = collector.next_batch().await.unwrap().unwrap();
357 collector.ack(first.sequence).await.unwrap();
358
359 let result = collector.next_batch().await.unwrap();
360 assert!(result.is_none());
361 }
362
363 #[tokio::test]
364 async fn should_resume_from_last_acked_sequence() {
365 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
366 let (producer, collector) = make_collector(&store, test_collector_config());
367 collector.initialize(Some(0)).await.unwrap();
368
369 let entries = test_entries();
370 write_batch(&store, "batches/first", &entries).await;
371 write_batch(&store, "batches/second", &entries).await;
372 producer
373 .enqueue("batches/first".to_string(), vec![])
374 .await
375 .unwrap();
376 producer
377 .enqueue("batches/second".to_string(), vec![])
378 .await
379 .unwrap();
380
381 let batch = collector.next_batch().await.unwrap().unwrap();
383 assert_eq!(batch.location, "batches/second");
384 assert_eq!(batch.sequence, 1);
385 }
386
387 #[tokio::test]
388 async fn should_reject_out_of_order_ack() {
389 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
390 let (producer, collector) = make_collector(&store, test_collector_config());
391 collector.initialize(None).await.unwrap();
392
393 let entries = test_entries();
394 write_batch(&store, "batches/first", &entries).await;
395 write_batch(&store, "batches/second", &entries).await;
396 producer
397 .enqueue("batches/first".to_string(), vec![])
398 .await
399 .unwrap();
400 producer
401 .enqueue("batches/second".to_string(), vec![])
402 .await
403 .unwrap();
404
405 let first = collector.next_batch().await.unwrap().unwrap();
406 collector.ack(first.sequence).await.unwrap();
407
408 let result = collector.ack(5).await;
410 assert!(matches!(result, Err(Error::Storage(msg)) if msg.contains("out-of-order ack")));
411 }
412
413 #[tokio::test]
414 async fn should_batch_dequeue_calls() {
415 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
416 let (producer, collector) = make_collector(&store, test_collector_config());
417 collector.initialize(None).await.unwrap();
418
419 let entries = test_entries();
420 let count = DEQUEUE_INTERVAL + 1;
422 for i in 0..count {
423 let location = format!("batches/batch-{:04}", i);
424 write_batch(&store, &location, &entries).await;
425 producer.enqueue(location, vec![]).await.unwrap();
426 }
427
428 for i in 0..count {
430 let batch = collector.next_batch().await.unwrap().unwrap();
431 assert_eq!(batch.sequence, i);
432 collector.ack(batch.sequence).await.unwrap();
433 }
434
435 assert_eq!(collector.len(), 1);
438
439 collector.flush().await.unwrap();
441
442 let result = collector.next_batch().await.unwrap();
443 assert!(result.is_none());
444 }
445
446 #[tokio::test]
447 async fn should_flush_pending_acks() {
448 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
449 let (producer, collector) = make_collector(&store, test_collector_config());
450 collector.initialize(None).await.unwrap();
451
452 let entries = test_entries();
453 write_batch(&store, "batches/first", &entries).await;
454 write_batch(&store, "batches/second", &entries).await;
455 producer
456 .enqueue("batches/first".to_string(), vec![])
457 .await
458 .unwrap();
459 producer
460 .enqueue("batches/second".to_string(), vec![])
461 .await
462 .unwrap();
463
464 let batch = collector.next_batch().await.unwrap().unwrap();
465 collector.ack(batch.sequence).await.unwrap();
466
467 assert_eq!(collector.len(), 2);
470
471 collector.flush().await.unwrap();
473 assert_eq!(collector.len(), 1);
474 }
475
476 #[tokio::test]
477 async fn should_close_flush_and_consume() {
478 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
479 let (producer, collector) = make_collector(&store, test_collector_config());
480 collector.initialize(None).await.unwrap();
481
482 let entries = test_entries();
483 write_batch(&store, "batches/first", &entries).await;
484 producer
485 .enqueue("batches/first".to_string(), vec![])
486 .await
487 .unwrap();
488
489 let batch = collector.next_batch().await.unwrap().unwrap();
490 collector.ack(batch.sequence).await.unwrap();
491 collector.close().await.unwrap();
492
493 let (_, collector2) = make_collector(&store, test_collector_config());
495 collector2.initialize(None).await.unwrap();
496 let result = collector2.next_batch().await.unwrap();
497 assert!(result.is_none());
498 }
499
500 #[tokio::test]
501 async fn should_fence_previous_collector() {
502 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
503 let (producer, collector1) = make_collector(&store, test_collector_config());
504 collector1.initialize(None).await.unwrap();
505
506 let entries = test_entries();
507 write_batch(&store, "batches/first", &entries).await;
508 producer
509 .enqueue("batches/first".to_string(), vec![])
510 .await
511 .unwrap();
512
513 let (_, collector2) = make_collector(&store, test_collector_config());
515 collector2.initialize(None).await.unwrap();
516
517 let result = collector1.next_batch().await;
519 assert!(matches!(result, Err(Error::Fenced)));
520 }
521
522 #[tokio::test]
523 async fn should_iterate_multiple_sequential_batches() {
524 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
525 let (producer, collector) = make_collector(&store, test_collector_config());
526 collector.initialize(None).await.unwrap();
527
528 let entries = test_entries();
529 let locations = ["batches/a", "batches/b", "batches/c"];
530 for loc in &locations {
531 write_batch(&store, loc, &entries).await;
532 producer.enqueue(loc.to_string(), vec![]).await.unwrap();
533 }
534
535 for (i, expected_loc) in locations.iter().enumerate() {
536 let batch = collector.next_batch().await.unwrap().unwrap();
537 assert_eq!(batch.sequence, i as u64);
538 assert_eq!(batch.location, *expected_loc);
539 collector.ack(batch.sequence).await.unwrap();
540 }
541
542 let result = collector.next_batch().await.unwrap();
543 assert!(result.is_none());
544 }
545
546 #[tokio::test]
547 async fn should_initialize_none_with_pre_existing_entries() {
548 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
549 let (producer, _) = make_collector(&store, test_collector_config());
550
551 let entries = test_entries();
553 for i in 0..5 {
554 let loc = format!("batches/placeholder-{}", i);
555 write_batch(&store, &loc, &entries).await;
556 producer.enqueue(loc, vec![]).await.unwrap();
557 }
558 let (_, tmp_collector) = make_collector(&store, test_collector_config());
560 tmp_collector.initialize(None).await.unwrap();
561 for _ in 0..5 {
562 let batch = tmp_collector.next_batch().await.unwrap().unwrap();
563 tmp_collector.ack(batch.sequence).await.unwrap();
564 }
565 tmp_collector.close().await.unwrap();
566
567 write_batch(&store, "batches/pre-existing", &entries).await;
569 producer
570 .enqueue("batches/pre-existing".to_string(), vec![])
571 .await
572 .unwrap();
573
574 let (_, collector) = make_collector(&store, test_collector_config());
576 collector.initialize(None).await.unwrap();
577
578 let batch = collector.next_batch().await.unwrap().unwrap();
579 assert_eq!(batch.location, "batches/pre-existing");
580 assert_eq!(batch.sequence, 5);
581 }
582
583 #[tokio::test]
584 async fn should_initialize_with_sequence_dequeue_already_processed() {
585 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
586 let (producer, _) = make_collector(&store, test_collector_config());
587
588 let entries = test_entries();
589 write_batch(&store, "batches/first", &entries).await;
590 write_batch(&store, "batches/second", &entries).await;
591 producer
592 .enqueue("batches/first".to_string(), vec![])
593 .await
594 .unwrap();
595 producer
596 .enqueue("batches/second".to_string(), vec![])
597 .await
598 .unwrap();
599
600 let (_, collector) = make_collector(&store, test_collector_config());
602 collector.initialize(Some(0)).await.unwrap();
603
604 assert_eq!(collector.len(), 1);
606
607 let batch = collector.next_batch().await.unwrap().unwrap();
608 assert_eq!(batch.location, "batches/second");
609 assert_eq!(batch.sequence, 1);
610 }
611
612 async fn assert_batch_deleted(store: &Arc<dyn ObjectStore>, location: &str) {
613 tokio::task::yield_now().await;
615 let path = Path::from(location);
616 let result = store.get(&path).await;
617 assert!(
618 result.is_err(),
619 "expected batch file to be deleted: {location}"
620 );
621 }
622
623 #[tokio::test]
624 async fn should_delete_batch_file_after_flush() {
625 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
626 let (producer, collector) = make_collector(&store, test_collector_config());
627 collector.initialize(None).await.unwrap();
628
629 let entries = test_entries();
630 let location = "batches/delete-me";
631 write_batch(&store, location, &entries).await;
632 producer
633 .enqueue(location.to_string(), vec![])
634 .await
635 .unwrap();
636
637 let batch = collector.next_batch().await.unwrap().unwrap();
638 collector.ack(batch.sequence).await.unwrap();
639 collector.flush().await.unwrap();
640
641 assert_batch_deleted(&store, location).await;
642 }
643
644 #[tokio::test]
645 async fn should_delete_batch_files_after_dequeue_interval() {
646 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
647 let (producer, collector) = make_collector(&store, test_collector_config());
648 collector.initialize(None).await.unwrap();
649
650 let entries = test_entries();
651 let mut locations = Vec::new();
652 for i in 0..DEQUEUE_INTERVAL {
653 let location = format!("batches/interval-{:04}", i);
654 write_batch(&store, &location, &entries).await;
655 producer.enqueue(location.clone(), vec![]).await.unwrap();
656 locations.push(location);
657 }
658
659 for _ in 0..DEQUEUE_INTERVAL {
660 let batch = collector.next_batch().await.unwrap().unwrap();
661 collector.ack(batch.sequence).await.unwrap();
662 }
663
664 for location in &locations {
666 assert_batch_deleted(&store, location).await;
667 }
668 }
669
670 #[tokio::test]
671 async fn should_delete_batch_files_on_close() {
672 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
673 let (producer, collector) = make_collector(&store, test_collector_config());
674 collector.initialize(None).await.unwrap();
675
676 let entries = test_entries();
677 let location = "batches/close-me";
678 write_batch(&store, location, &entries).await;
679 producer
680 .enqueue(location.to_string(), vec![])
681 .await
682 .unwrap();
683
684 let batch = collector.next_batch().await.unwrap().unwrap();
685 collector.ack(batch.sequence).await.unwrap();
686 collector.close().await.unwrap();
687
688 assert_batch_deleted(&store, location).await;
689 }
690}