1use std::sync::Arc;
2use std::time::{Duration, SystemTime};
3
4use bytes::Bytes;
5use common::clock::Clock;
6use slatedb::object_store::path::Path;
7use slatedb::object_store::{ObjectStore, PutPayload};
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use crate::config::IngestorConfig;
12use crate::error::{Error, Result};
13use crate::model::{CompressionType, encode_batch};
14use crate::queue::{Metadata, QueueProducer};
15use crate::util::millis;
16
17type Notifier = tokio::sync::watch::Sender<Option<Result<()>>>;
18
19#[derive(Clone)]
20pub struct DurabilityWatcher {
21 receiver: tokio::sync::watch::Receiver<Option<Result<()>>>,
22}
23
24impl DurabilityWatcher {
25 pub fn result(&self) -> Option<Result<()>> {
28 self.receiver.borrow().clone()
29 }
30
31 pub async fn await_durable(&mut self) -> Result<()> {
33 self.receiver
34 .wait_for(|v| v.is_some())
35 .await
36 .map_err(|_| Error::Storage("ingestor shut down".to_string()))?
37 .clone()
38 .expect("value must be present after wait_for")
39 }
40}
41
42pub struct WriteHandle {
43 pub watcher: DurabilityWatcher,
44}
45
46enum IngestMessage {
47 Add {
48 entries: Vec<Bytes>,
49 metadata: Bytes,
50 ingestion_time_ms: i64,
51 notifier: Notifier,
52 },
53 Flush {
54 result_sender: tokio::sync::oneshot::Sender<Result<()>>,
55 },
56}
57
58#[derive(Default)]
59struct DataAndNotifiers {
60 entries: Vec<Bytes>,
61 metadata: Vec<Metadata>,
62 notifiers: Vec<Notifier>,
63}
64
65impl DataAndNotifiers {
66 fn add(
67 &mut self,
68 entries: Vec<Bytes>,
69 metadata: Bytes,
70 ingestion_time_ms: i64,
71 notifier: Notifier,
72 ) -> Result<()> {
73 let start_index = self.entries.len() as u32;
74 self.entries.extend(entries);
75 if self.entries.len() > u32::MAX as usize {
76 return Err(Error::InvalidInput(format!(
77 "batch entry count {} exceeds u32::MAX",
78 self.entries.len()
79 )));
80 }
81 self.metadata.push(Metadata {
82 start_index,
83 ingestion_time_ms,
84 payload: metadata,
85 });
86 self.notifiers.push(notifier);
87 Ok(())
88 }
89
90 fn is_empty(&self) -> bool {
91 self.entries.is_empty() && self.metadata.is_empty() && self.notifiers.is_empty()
92 }
93}
94
95struct Batch {
96 data_and_notifiers: DataAndNotifiers,
97 size_bytes: usize,
98 started_at: Option<SystemTime>,
99}
100
101impl Batch {
102 fn new() -> Self {
103 Self {
104 data_and_notifiers: DataAndNotifiers::default(),
105 size_bytes: 0,
106 started_at: None,
107 }
108 }
109
110 fn add(
111 &mut self,
112 entries: Vec<Bytes>,
113 metadata: Bytes,
114 ingestion_time_ms: i64,
115 notifier: Notifier,
116 now: SystemTime,
117 ) -> Result<()> {
118 let mut entry_size_sum = 0usize;
119 for e in &entries {
120 if e.len() > u32::MAX as usize {
121 return Err(Error::InvalidInput(format!(
122 "entry size {} exceeds u32::MAX",
123 e.len()
124 )));
125 }
126 entry_size_sum += e.len();
127 }
128 self.size_bytes += entry_size_sum + metadata.len();
129 self.data_and_notifiers
130 .add(entries, metadata, ingestion_time_ms, notifier)?;
131 if self.started_at.is_none() {
132 self.started_at = Some(now);
133 }
134 Ok(())
135 }
136
137 fn take(&mut self) -> DataAndNotifiers {
138 self.size_bytes = 0;
139 self.started_at = None;
140 std::mem::take(&mut self.data_and_notifiers)
141 }
142
143 fn is_empty(&self) -> bool {
144 self.data_and_notifiers.is_empty()
145 }
146}
147
148struct BatchWriterTask {
149 object_store: Arc<dyn ObjectStore>,
150 producer: Arc<QueueProducer>,
151 data_path_prefix: String,
152 flush_interval: Duration,
153 flush_size_bytes: usize,
154 batch_compression: CompressionType,
155 batch: Batch,
156 clock: Arc<dyn Clock>,
157}
158
159impl BatchWriterTask {
160 fn new(
161 object_store: Arc<dyn ObjectStore>,
162 producer: Arc<QueueProducer>,
163 data_path_prefix: String,
164 flush_interval: Duration,
165 flush_size_bytes: usize,
166 batch_compression: CompressionType,
167 clock: Arc<dyn Clock>,
168 ) -> Self {
169 Self {
170 object_store,
171 producer,
172 data_path_prefix,
173 flush_interval,
174 flush_size_bytes,
175 batch_compression,
176 batch: Batch::new(),
177 clock,
178 }
179 }
180
181 async fn run(&mut self, mut rx: mpsc::Receiver<IngestMessage>, shutdown: CancellationToken) {
182 loop {
183 let sleep_duration = match self.batch.started_at {
184 Some(started) => (started + self.flush_interval)
185 .duration_since(self.clock.now())
186 .unwrap_or(Duration::ZERO),
187 None => self.flush_interval,
188 };
189
190 tokio::select! {
191 biased;
192 _ = shutdown.cancelled() => {
193 return;
194 },
195 msg = rx.recv() => {
196 match msg {
197 Some(IngestMessage::Add { entries, metadata, ingestion_time_ms, notifier }) => {
198 if let Err(e) = self.batch.add(entries, metadata, ingestion_time_ms, notifier.clone(), self.clock.now()) {
199 let _ = notifier.send(Some(Err(e)));
200 } else if self.batch.size_bytes >= self.flush_size_bytes {
201 let _ = self.write_batch().await;
202 }
203 }
204 Some(IngestMessage::Flush { result_sender }) => {
205 let _ = result_sender.send(self.write_batch().await);
206 }
207 None => break,
208 }
209 },
210 _ = tokio::time::sleep(sleep_duration) => {
211 let _ = self.write_batch().await;
212 },
213 }
214 }
215 }
216
217 async fn write_batch(&mut self) -> Result<()> {
218 if self.batch.is_empty() {
219 return Ok(());
220 }
221 let data_and_notifiers = self.batch.take();
222 let result = self
223 .write_and_enqueue(data_and_notifiers.entries, data_and_notifiers.metadata)
224 .await;
225
226 for notifier in data_and_notifiers.notifiers {
227 let _ = notifier.send(Some(result.clone()));
228 }
229
230 result
231 }
232
233 async fn write_and_enqueue(&self, entries: Vec<Bytes>, metadata: Vec<Metadata>) -> Result<()> {
234 let payload = encode_batch(&entries, self.batch_compression)?;
235 let id = ulid::Ulid::new();
236 let path = Path::from(format!("{}/{}.batch", self.data_path_prefix, id));
237 self.object_store
238 .put(&path, PutPayload::from(payload))
239 .await
240 .map_err(|e| Error::Storage(e.to_string()))?;
241
242 self.producer.enqueue(path.to_string(), metadata).await?;
243
244 Ok(())
245 }
246}
247
248struct BatchWriter {
249 producer: Arc<QueueProducer>,
250 sender: mpsc::Sender<IngestMessage>,
251 cancellation_token: CancellationToken,
252 handle: tokio::task::JoinHandle<()>,
253}
254
255impl BatchWriter {
256 fn new(
257 object_store: Arc<dyn ObjectStore>,
258 config: &IngestorConfig,
259 clock: Arc<dyn Clock>,
260 ) -> Self {
261 let (sender, receiver) = mpsc::channel(config.max_buffered_inputs);
262 let producer = Arc::new(QueueProducer::with_object_store(
263 config.manifest_path.clone(),
264 object_store.clone(),
265 ));
266 let mut task = BatchWriterTask::new(
267 object_store,
268 producer.clone(),
269 config.data_path_prefix.clone(),
270 config.flush_interval,
271 config.flush_size_bytes,
272 config.batch_compression,
273 clock,
274 );
275 let shutdown = CancellationToken::new();
276 let cancellation_token = shutdown.clone();
277 let handle = tokio::spawn(async move { task.run(receiver, shutdown).await });
278 Self {
279 producer,
280 sender,
281 cancellation_token,
282 handle,
283 }
284 }
285
286 async fn add(
287 &self,
288 entries: Vec<Bytes>,
289 metadata: Bytes,
290 ingestion_time_ms: i64,
291 ) -> Result<DurabilityWatcher> {
292 let (notifier_sender, notifier_receiver) = tokio::sync::watch::channel(None);
293 self.sender
294 .send(IngestMessage::Add {
295 entries,
296 metadata,
297 ingestion_time_ms,
298 notifier: notifier_sender,
299 })
300 .await
301 .map_err(|_| Error::Storage("ingestor shut down".to_string()))?;
302 Ok(DurabilityWatcher {
303 receiver: notifier_receiver,
304 })
305 }
306
307 async fn flush(&self) -> Result<()> {
308 let (result_sender, result_receiver) = tokio::sync::oneshot::channel();
309 self.sender
310 .send(IngestMessage::Flush { result_sender })
311 .await
312 .map_err(|_| Error::Storage("batch writer task not running".to_string()))?;
313 result_receiver
314 .await
315 .map_err(|_| Error::Storage("batch writer task not running".to_string()))?
316 }
317
318 fn conflict_rate(&self) -> f64 {
319 self.producer.conflict_rate()
320 }
321 async fn close(self) -> Result<()> {
322 self.flush().await?;
323 self.cancellation_token.cancel();
324 let _ = self.handle.await;
325 Ok(())
326 }
327}
328
329pub struct Ingestor {
330 writer: BatchWriter,
331 clock: Arc<dyn Clock>,
332}
333
334impl Ingestor {
335 pub fn new(config: IngestorConfig, clock: Arc<dyn Clock>) -> Result<Self> {
337 let object_store = common::storage::factory::create_object_store(&config.object_store)
338 .map_err(|e| Error::Storage(e.to_string()))?;
339 Self::with_object_store(config, object_store, clock)
340 }
341
342 fn with_object_store(
343 config: IngestorConfig,
344 object_store: Arc<dyn ObjectStore>,
345 clock: Arc<dyn Clock>,
346 ) -> Result<Self> {
347 let writer = BatchWriter::new(object_store, &config, clock.clone());
348 Ok(Self { writer, clock })
349 }
350
351 pub async fn ingest(&self, entries: Vec<Bytes>, metadata: Bytes) -> Result<WriteHandle> {
359 if entries.is_empty() {
360 return Err(Error::InvalidInput("entries must not be empty".to_string()));
361 }
362 if metadata.len() > u32::MAX as usize {
363 return Err(Error::InvalidInput(format!(
364 "metadata size {} exceeds u32::MAX",
365 metadata.len()
366 )));
367 }
368 let ingestion_time_ms = millis(self.clock.now());
369 let durability_watcher = self
370 .writer
371 .add(entries, metadata, ingestion_time_ms)
372 .await?;
373 Ok(WriteHandle {
374 watcher: durability_watcher,
375 })
376 }
377
378 pub async fn flush(&self) -> Result<()> {
380 self.writer.flush().await
381 }
382
383 pub fn conflict_rate(&self) -> f64 {
385 self.writer.conflict_rate()
386 }
387
388 pub async fn close(self) -> Result<()> {
390 self.writer.close().await
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use crate::config::IngestorConfig;
398 use crate::model::decode_batch;
399 use crate::queue::{Manifest, QueueEntry};
400 use bytes::Bytes;
401 use common::ObjectStoreConfig;
402 use common::clock::{MockClock, SystemClock};
403 use slatedb::object_store::ObjectStore;
404 use slatedb::object_store::memory::InMemory;
405 use std::time::UNIX_EPOCH;
406
407 async fn read_manifest_entries(store: &Arc<dyn ObjectStore>, path: &str) -> Vec<QueueEntry> {
408 let path = slatedb::object_store::path::Path::from(path);
409 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
410 let manifest = Manifest::from_bytes(data).unwrap();
411 manifest.iter().map(|e| e.unwrap()).collect()
412 }
413
414 fn test_config() -> IngestorConfig {
415 IngestorConfig {
416 object_store: ObjectStoreConfig::InMemory,
417 data_path_prefix: "test-ingest".to_string(),
418 manifest_path: "test/manifest".to_string(),
419 flush_interval: Duration::from_hours(24),
420 flush_size_bytes: 64 * 1024 * 1024,
421 max_buffered_inputs: 1000,
422 batch_compression: CompressionType::None,
423 }
424 }
425
426 #[tokio::test]
427 async fn should_ingest_entries_and_enqueue_location() {
428 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
429 let ingestor =
430 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
431 .unwrap();
432
433 ingestor
434 .ingest(vec![Bytes::from("data1")], Bytes::new())
435 .await
436 .unwrap();
437 ingestor
438 .ingest(vec![Bytes::from("data2")], Bytes::new())
439 .await
440 .unwrap();
441 ingestor.flush().await.unwrap();
442
443 let entries = read_manifest_entries(&store, "test/manifest").await;
444 assert_eq!(entries.len(), 1);
445 assert!(entries[0].location.starts_with("test-ingest/"));
446 assert!(entries[0].location.ends_with(".batch"));
447 }
448
449 #[tokio::test]
450 async fn should_write_valid_batch_to_object_store() {
451 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
452 let ingestor =
453 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
454 .unwrap();
455
456 ingestor
457 .ingest(vec![Bytes::from("mydata")], Bytes::new())
458 .await
459 .unwrap();
460 ingestor.flush().await.unwrap();
461
462 let entries = read_manifest_entries(&store, "test/manifest").await;
463 let path = Path::from(entries[0].location.as_str());
464 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
465 let parsed = decode_batch(data).unwrap();
466
467 assert_eq!(parsed.len(), 1);
468 assert_eq!(parsed[0], Bytes::from("mydata"));
469 }
470
471 #[tokio::test]
472 async fn should_flush_when_batch_size_exceeded() {
473 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
474
475 let mut config = test_config();
476 config.flush_size_bytes = 10;
477
478 let ingestor =
479 Ingestor::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
480
481 let mut watcher = ingestor
482 .ingest(vec![Bytes::from("some-long-data")], Bytes::new())
483 .await
484 .unwrap();
485 watcher.watcher.await_durable().await.unwrap();
486
487 let entries = read_manifest_entries(&store, "test/manifest").await;
488 assert_eq!(entries.len(), 1);
489 }
490
491 #[tokio::test]
492 async fn should_flush_when_interval_elapsed() {
493 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
494
495 let mut config = test_config();
496 config.flush_interval = Duration::from_millis(50);
497 config.flush_size_bytes = 64 * 1024 * 1024;
498
499 let ingestor =
500 Ingestor::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
501
502 let mut watcher = ingestor
503 .ingest(vec![Bytes::from("v1")], Bytes::new())
504 .await
505 .unwrap();
506
507 assert!(watcher.watcher.result().is_none());
508 let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
509 assert!(store.get(&manifest_path).await.is_err());
510
511 watcher.watcher.await_durable().await.unwrap();
512
513 let entries = read_manifest_entries(&store, "test/manifest").await;
514 assert_eq!(entries.len(), 1);
515 }
516
517 #[tokio::test]
518 async fn should_not_flush_below_thresholds() {
519 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
520 let ingestor =
521 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
522 .unwrap();
523
524 let watcher = ingestor
525 .ingest(vec![Bytes::from("v")], Bytes::new())
526 .await
527 .unwrap();
528
529 assert!(watcher.watcher.result().is_none());
530
531 let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
532 assert!(store.get(&manifest_path).await.is_err());
533
534 ingestor.flush().await.unwrap();
535
536 assert!(watcher.watcher.result().unwrap().is_ok());
537
538 let entries = read_manifest_entries(&store, "test/manifest").await;
539 assert_eq!(entries.len(), 1);
540 }
541
542 #[tokio::test]
543 async fn should_batch_multiple_ingests_into_single_file() {
544 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
545 let ingestor =
546 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
547 .unwrap();
548
549 let watcher1 = ingestor
550 .ingest(vec![Bytes::from("data1")], Bytes::new())
551 .await
552 .unwrap();
553 let watcher2 = ingestor
554 .ingest(vec![Bytes::from("data2")], Bytes::new())
555 .await
556 .unwrap();
557
558 ingestor.flush().await.unwrap();
559
560 assert!(watcher1.watcher.result().unwrap().is_ok());
561 assert!(watcher2.watcher.result().unwrap().is_ok());
562
563 let entries = read_manifest_entries(&store, "test/manifest").await;
564 assert_eq!(entries.len(), 1);
565
566 let path = Path::from(entries[0].location.as_str());
567 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
568 let parsed = decode_batch(data).unwrap();
569 assert_eq!(parsed.len(), 2);
570 assert_eq!(parsed[0], Bytes::from("data1"));
571 assert_eq!(parsed[1], Bytes::from("data2"));
572 }
573
574 #[tokio::test]
575 async fn should_apply_backpressure_when_buffer_full() {
576 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
577
578 let mut config = test_config();
579 config.max_buffered_inputs = 1;
580
581 let ingestor =
582 Ingestor::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
583
584 ingestor
586 .ingest(vec![Bytes::from("data1")], Bytes::new())
587 .await
588 .unwrap();
589
590 ingestor
592 .ingest(vec![Bytes::from("data2")], Bytes::new())
593 .await
594 .unwrap();
595
596 ingestor.flush().await.unwrap();
597
598 let entries = read_manifest_entries(&store, "test/manifest").await;
599 assert!(!entries.is_empty());
600 }
601
602 #[tokio::test]
603 async fn should_record_metadata_and_ingestion_time_in_queue_entry() {
604 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
605 let fixed_time = UNIX_EPOCH + Duration::from_millis(1_700_000_000_000);
606 let clock = Arc::new(MockClock::with_time(fixed_time));
607
608 let ingestor = Ingestor::with_object_store(test_config(), store.clone(), clock).unwrap();
609
610 let metadata = Bytes::from(r#"{"topic":"events"}"#);
611 ingestor
612 .ingest(vec![Bytes::from("payload")], metadata.clone())
613 .await
614 .unwrap();
615 ingestor.flush().await.unwrap();
616
617 let entries = read_manifest_entries(&store, "test/manifest").await;
618 assert_eq!(entries.len(), 1);
619 assert_eq!(entries[0].metadata.len(), 1);
620 assert_eq!(entries[0].metadata[0].payload, metadata);
621 assert_eq!(entries[0].metadata[0].ingestion_time_ms, 1_700_000_000_000);
622 }
623
624 #[tokio::test]
625 async fn should_flush_remaining_entries_on_close() {
626 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
627 let ingestor =
628 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
629 .unwrap();
630
631 ingestor
632 .ingest(vec![Bytes::from("unflushed")], Bytes::new())
633 .await
634 .unwrap();
635
636 ingestor.close().await.unwrap();
637
638 let entries = read_manifest_entries(&store, "test/manifest").await;
639 assert_eq!(entries.len(), 1);
640
641 let path = Path::from(entries[0].location.as_str());
642 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
643 let parsed = decode_batch(data).unwrap();
644 assert_eq!(parsed, vec![Bytes::from("unflushed")]);
645 }
646
647 #[tokio::test]
648 async fn should_produce_separate_batches_per_flush() {
649 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
650 let ingestor =
651 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
652 .unwrap();
653
654 ingestor
655 .ingest(vec![Bytes::from("batch1")], Bytes::new())
656 .await
657 .unwrap();
658 ingestor.flush().await.unwrap();
659
660 ingestor
661 .ingest(vec![Bytes::from("batch2")], Bytes::new())
662 .await
663 .unwrap();
664 ingestor.flush().await.unwrap();
665
666 let entries = read_manifest_entries(&store, "test/manifest").await;
667 assert_eq!(entries.len(), 2);
668 assert_ne!(entries[0].location, entries[1].location);
669
670 let data1 = store
671 .get(&Path::from(entries[0].location.as_str()))
672 .await
673 .unwrap()
674 .bytes()
675 .await
676 .unwrap();
677 assert_eq!(decode_batch(data1).unwrap(), vec![Bytes::from("batch1")]);
678
679 let data2 = store
680 .get(&Path::from(entries[1].location.as_str()))
681 .await
682 .unwrap()
683 .bytes()
684 .await
685 .unwrap();
686 assert_eq!(decode_batch(data2).unwrap(), vec![Bytes::from("batch2")]);
687 }
688
689 #[tokio::test]
690 async fn should_not_flush_empty_batch() {
691 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
692 let ingestor =
693 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
694 .unwrap();
695
696 ingestor.flush().await.unwrap();
697
698 let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
699 assert!(store.get(&manifest_path).await.is_err());
700 }
701
702 #[tokio::test]
703 async fn should_preserve_all_empty_entries_batch() {
704 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
705 let ingestor =
706 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
707 .unwrap();
708
709 ingestor
710 .ingest(vec![Bytes::new(), Bytes::new()], Bytes::from("meta"))
711 .await
712 .unwrap();
713 ingestor.flush().await.unwrap();
714
715 let entries = read_manifest_entries(&store, "test/manifest").await;
716 assert_eq!(entries.len(), 1);
717 assert!(!entries[0].location.is_empty());
718 assert_eq!(entries[0].metadata.len(), 1);
719 assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
720 assert_eq!(entries[0].metadata[0].start_index, 0);
721
722 let data = store
723 .get(&Path::from(entries[0].location.clone()))
724 .await
725 .unwrap()
726 .bytes()
727 .await
728 .unwrap();
729 let records = decode_batch(data).unwrap();
730 assert_eq!(records, vec![Bytes::new(), Bytes::new()]);
731 }
732
733 #[tokio::test]
734 async fn should_preserve_empty_and_non_empty_entries_in_batch() {
735 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
736 let ingestor =
737 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
738 .unwrap();
739
740 ingestor
741 .ingest(
742 vec![
743 Bytes::from("a"),
744 Bytes::new(),
745 Bytes::from("b"),
746 Bytes::new(),
747 ],
748 Bytes::from("meta"),
749 )
750 .await
751 .unwrap();
752 ingestor.flush().await.unwrap();
753
754 let entries = read_manifest_entries(&store, "test/manifest").await;
755 assert_eq!(entries.len(), 1);
756 assert!(!entries[0].location.is_empty());
757 assert_eq!(entries[0].metadata.len(), 1);
758 assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
759 assert_eq!(entries[0].metadata[0].start_index, 0);
760
761 let data = store
762 .get(&Path::from(entries[0].location.clone()))
763 .await
764 .unwrap()
765 .bytes()
766 .await
767 .unwrap();
768 let records = decode_batch(data).unwrap();
769 assert_eq!(
770 records,
771 vec![
772 Bytes::from("a"),
773 Bytes::new(),
774 Bytes::from("b"),
775 Bytes::new()
776 ]
777 );
778 }
779
780 #[tokio::test]
781 async fn should_reject_empty_entries() {
782 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
783 let ingestor =
784 Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
785 .unwrap();
786
787 let result = ingestor.ingest(vec![], Bytes::new()).await;
788 assert!(matches!(result, Err(Error::InvalidInput(_))));
789
790 let result = ingestor.ingest(vec![], Bytes::from("meta")).await;
791 assert!(matches!(result, Err(Error::InvalidInput(_))));
792 }
793}