Skip to main content

ingest/
ingestor.rs

1use std::sync::Arc;
2use std::time::{Duration, SystemTime};
3
4use bytes::Bytes;
5use common::clock::Clock;
6use slatedb::object_store::path::Path;
7use slatedb::object_store::{ObjectStore, PutPayload};
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use crate::config::IngestorConfig;
12use crate::error::{Error, Result};
13use crate::model::{CompressionType, encode_batch};
14use crate::queue::{Metadata, QueueProducer};
15use crate::util::millis;
16
17type Notifier = tokio::sync::watch::Sender<Option<Result<()>>>;
18
19#[derive(Clone)]
20pub struct DurabilityWatcher {
21    receiver: tokio::sync::watch::Receiver<Option<Result<()>>>,
22}
23
24impl DurabilityWatcher {
25    /// Return the outcome of the write if the batch has already been flushed,
26    /// or `None` if the flush has not completed yet.
27    pub fn result(&self) -> Option<Result<()>> {
28        self.receiver.borrow().clone()
29    }
30
31    /// Wait until the batch containing this write has been durably flushed.
32    pub async fn await_durable(&mut self) -> Result<()> {
33        self.receiver
34            .wait_for(|v| v.is_some())
35            .await
36            .map_err(|_| Error::Storage("ingestor shut down".to_string()))?
37            .clone()
38            .expect("value must be present after wait_for")
39    }
40}
41
42pub struct WriteHandle {
43    pub watcher: DurabilityWatcher,
44}
45
46enum IngestMessage {
47    Add {
48        entries: Vec<Bytes>,
49        metadata: Bytes,
50        ingestion_time_ms: i64,
51        notifier: Notifier,
52    },
53    Flush {
54        result_sender: tokio::sync::oneshot::Sender<Result<()>>,
55    },
56}
57
58#[derive(Default)]
59struct DataAndNotifiers {
60    entries: Vec<Bytes>,
61    metadata: Vec<Metadata>,
62    notifiers: Vec<Notifier>,
63}
64
65impl DataAndNotifiers {
66    fn add(
67        &mut self,
68        entries: Vec<Bytes>,
69        metadata: Bytes,
70        ingestion_time_ms: i64,
71        notifier: Notifier,
72    ) -> Result<()> {
73        let start_index = self.entries.len() as u32;
74        self.entries.extend(entries);
75        if self.entries.len() > u32::MAX as usize {
76            return Err(Error::InvalidInput(format!(
77                "batch entry count {} exceeds u32::MAX",
78                self.entries.len()
79            )));
80        }
81        self.metadata.push(Metadata {
82            start_index,
83            ingestion_time_ms,
84            payload: metadata,
85        });
86        self.notifiers.push(notifier);
87        Ok(())
88    }
89
90    fn is_empty(&self) -> bool {
91        self.entries.is_empty() && self.metadata.is_empty() && self.notifiers.is_empty()
92    }
93}
94
95struct Batch {
96    data_and_notifiers: DataAndNotifiers,
97    size_bytes: usize,
98    started_at: Option<SystemTime>,
99}
100
101impl Batch {
102    fn new() -> Self {
103        Self {
104            data_and_notifiers: DataAndNotifiers::default(),
105            size_bytes: 0,
106            started_at: None,
107        }
108    }
109
110    fn add(
111        &mut self,
112        entries: Vec<Bytes>,
113        metadata: Bytes,
114        ingestion_time_ms: i64,
115        notifier: Notifier,
116        now: SystemTime,
117    ) -> Result<()> {
118        let mut entry_size_sum = 0usize;
119        for e in &entries {
120            if e.len() > u32::MAX as usize {
121                return Err(Error::InvalidInput(format!(
122                    "entry size {} exceeds u32::MAX",
123                    e.len()
124                )));
125            }
126            entry_size_sum += e.len();
127        }
128        self.size_bytes += entry_size_sum + metadata.len();
129        self.data_and_notifiers
130            .add(entries, metadata, ingestion_time_ms, notifier)?;
131        if self.started_at.is_none() {
132            self.started_at = Some(now);
133        }
134        Ok(())
135    }
136
137    fn take(&mut self) -> DataAndNotifiers {
138        self.size_bytes = 0;
139        self.started_at = None;
140        std::mem::take(&mut self.data_and_notifiers)
141    }
142
143    fn is_empty(&self) -> bool {
144        self.data_and_notifiers.is_empty()
145    }
146}
147
148struct BatchWriterTask {
149    object_store: Arc<dyn ObjectStore>,
150    producer: Arc<QueueProducer>,
151    data_path_prefix: String,
152    flush_interval: Duration,
153    flush_size_bytes: usize,
154    batch_compression: CompressionType,
155    batch: Batch,
156    clock: Arc<dyn Clock>,
157}
158
159impl BatchWriterTask {
160    fn new(
161        object_store: Arc<dyn ObjectStore>,
162        producer: Arc<QueueProducer>,
163        data_path_prefix: String,
164        flush_interval: Duration,
165        flush_size_bytes: usize,
166        batch_compression: CompressionType,
167        clock: Arc<dyn Clock>,
168    ) -> Self {
169        Self {
170            object_store,
171            producer,
172            data_path_prefix,
173            flush_interval,
174            flush_size_bytes,
175            batch_compression,
176            batch: Batch::new(),
177            clock,
178        }
179    }
180
181    async fn run(&mut self, mut rx: mpsc::Receiver<IngestMessage>, shutdown: CancellationToken) {
182        loop {
183            let sleep_duration = match self.batch.started_at {
184                Some(started) => (started + self.flush_interval)
185                    .duration_since(self.clock.now())
186                    .unwrap_or(Duration::ZERO),
187                None => self.flush_interval,
188            };
189
190            tokio::select! {
191                biased;
192                _ = shutdown.cancelled() => {
193                    return;
194                },
195                msg = rx.recv() => {
196                    match msg {
197                        Some(IngestMessage::Add { entries, metadata, ingestion_time_ms, notifier }) => {
198                            if let Err(e) = self.batch.add(entries, metadata, ingestion_time_ms, notifier.clone(), self.clock.now()) {
199                                let _ = notifier.send(Some(Err(e)));
200                            } else if self.batch.size_bytes >= self.flush_size_bytes {
201                                let _ = self.write_batch().await;
202                            }
203                        }
204                        Some(IngestMessage::Flush { result_sender }) => {
205                            let _ = result_sender.send(self.write_batch().await);
206                        }
207                        None => break,
208                    }
209                },
210                _ = tokio::time::sleep(sleep_duration) => {
211                    let _ = self.write_batch().await;
212                },
213            }
214        }
215    }
216
217    async fn write_batch(&mut self) -> Result<()> {
218        if self.batch.is_empty() {
219            return Ok(());
220        }
221        let data_and_notifiers = self.batch.take();
222        let result = self
223            .write_and_enqueue(data_and_notifiers.entries, data_and_notifiers.metadata)
224            .await;
225
226        for notifier in data_and_notifiers.notifiers {
227            let _ = notifier.send(Some(result.clone()));
228        }
229
230        result
231    }
232
233    async fn write_and_enqueue(&self, entries: Vec<Bytes>, metadata: Vec<Metadata>) -> Result<()> {
234        let payload = encode_batch(&entries, self.batch_compression)?;
235        let id = ulid::Ulid::new();
236        let path = Path::from(format!("{}/{}.batch", self.data_path_prefix, id));
237        self.object_store
238            .put(&path, PutPayload::from(payload))
239            .await
240            .map_err(|e| Error::Storage(e.to_string()))?;
241
242        self.producer.enqueue(path.to_string(), metadata).await?;
243
244        Ok(())
245    }
246}
247
248struct BatchWriter {
249    producer: Arc<QueueProducer>,
250    sender: mpsc::Sender<IngestMessage>,
251    cancellation_token: CancellationToken,
252    handle: tokio::task::JoinHandle<()>,
253}
254
255impl BatchWriter {
256    fn new(
257        object_store: Arc<dyn ObjectStore>,
258        config: &IngestorConfig,
259        clock: Arc<dyn Clock>,
260    ) -> Self {
261        let (sender, receiver) = mpsc::channel(config.max_buffered_inputs);
262        let producer = Arc::new(QueueProducer::with_object_store(
263            config.manifest_path.clone(),
264            object_store.clone(),
265        ));
266        let mut task = BatchWriterTask::new(
267            object_store,
268            producer.clone(),
269            config.data_path_prefix.clone(),
270            config.flush_interval,
271            config.flush_size_bytes,
272            config.batch_compression,
273            clock,
274        );
275        let shutdown = CancellationToken::new();
276        let cancellation_token = shutdown.clone();
277        let handle = tokio::spawn(async move { task.run(receiver, shutdown).await });
278        Self {
279            producer,
280            sender,
281            cancellation_token,
282            handle,
283        }
284    }
285
286    async fn add(
287        &self,
288        entries: Vec<Bytes>,
289        metadata: Bytes,
290        ingestion_time_ms: i64,
291    ) -> Result<DurabilityWatcher> {
292        let (notifier_sender, notifier_receiver) = tokio::sync::watch::channel(None);
293        self.sender
294            .send(IngestMessage::Add {
295                entries,
296                metadata,
297                ingestion_time_ms,
298                notifier: notifier_sender,
299            })
300            .await
301            .map_err(|_| Error::Storage("ingestor shut down".to_string()))?;
302        Ok(DurabilityWatcher {
303            receiver: notifier_receiver,
304        })
305    }
306
307    async fn flush(&self) -> Result<()> {
308        let (result_sender, result_receiver) = tokio::sync::oneshot::channel();
309        self.sender
310            .send(IngestMessage::Flush { result_sender })
311            .await
312            .map_err(|_| Error::Storage("batch writer task not running".to_string()))?;
313        result_receiver
314            .await
315            .map_err(|_| Error::Storage("batch writer task not running".to_string()))?
316    }
317
318    fn conflict_rate(&self) -> f64 {
319        self.producer.conflict_rate()
320    }
321    async fn close(self) -> Result<()> {
322        self.flush().await?;
323        self.cancellation_token.cancel();
324        let _ = self.handle.await;
325        Ok(())
326    }
327}
328
329pub struct Ingestor {
330    writer: BatchWriter,
331    clock: Arc<dyn Clock>,
332}
333
334impl Ingestor {
335    /// Create a new ingestor from the given configuration and clock.
336    pub fn new(config: IngestorConfig, clock: Arc<dyn Clock>) -> Result<Self> {
337        let object_store = common::storage::factory::create_object_store(&config.object_store)
338            .map_err(|e| Error::Storage(e.to_string()))?;
339        Self::with_object_store(config, object_store, clock)
340    }
341
342    fn with_object_store(
343        config: IngestorConfig,
344        object_store: Arc<dyn ObjectStore>,
345        clock: Arc<dyn Clock>,
346    ) -> Result<Self> {
347        let writer = BatchWriter::new(object_store, &config, clock.clone());
348        Ok(Self { writer, clock })
349    }
350
351    /// Submit a set of entries and associated metadata for ingestion.
352    ///
353    /// Returns a [`WriteHandle`] that can be used to check or await durability.
354    /// Applies backpressure when the message buffer is full.
355    ///
356    /// Returns [`Error::InvalidInput`] if `entries` is empty or if `metadata`
357    /// exceeds 2³²−1 bytes.
358    pub async fn ingest(&self, entries: Vec<Bytes>, metadata: Bytes) -> Result<WriteHandle> {
359        if entries.is_empty() {
360            return Err(Error::InvalidInput("entries must not be empty".to_string()));
361        }
362        if metadata.len() > u32::MAX as usize {
363            return Err(Error::InvalidInput(format!(
364                "metadata size {} exceeds u32::MAX",
365                metadata.len()
366            )));
367        }
368        let ingestion_time_ms = millis(self.clock.now());
369        let durability_watcher = self
370            .writer
371            .add(entries, metadata, ingestion_time_ms)
372            .await?;
373        Ok(WriteHandle {
374            watcher: durability_watcher,
375        })
376    }
377
378    /// Flush the current batch, blocking until all pending entries are durably written.
379    pub async fn flush(&self) -> Result<()> {
380        self.writer.flush().await
381    }
382
383    /// Return the fraction of manifest writes that encountered optimistic-concurrency conflicts.
384    pub fn conflict_rate(&self) -> f64 {
385        self.writer.conflict_rate()
386    }
387
388    /// Shut down the ingestor, flushing any remaining buffered entries before returning.
389    pub async fn close(self) -> Result<()> {
390        self.writer.close().await
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use crate::config::IngestorConfig;
398    use crate::model::decode_batch;
399    use crate::queue::{Manifest, QueueEntry};
400    use bytes::Bytes;
401    use common::ObjectStoreConfig;
402    use common::clock::{MockClock, SystemClock};
403    use slatedb::object_store::ObjectStore;
404    use slatedb::object_store::memory::InMemory;
405    use std::time::UNIX_EPOCH;
406
407    async fn read_manifest_entries(store: &Arc<dyn ObjectStore>, path: &str) -> Vec<QueueEntry> {
408        let path = slatedb::object_store::path::Path::from(path);
409        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
410        let manifest = Manifest::from_bytes(data).unwrap();
411        manifest.iter().map(|e| e.unwrap()).collect()
412    }
413
414    fn test_config() -> IngestorConfig {
415        IngestorConfig {
416            object_store: ObjectStoreConfig::InMemory,
417            data_path_prefix: "test-ingest".to_string(),
418            manifest_path: "test/manifest".to_string(),
419            flush_interval: Duration::from_hours(24),
420            flush_size_bytes: 64 * 1024 * 1024,
421            max_buffered_inputs: 1000,
422            batch_compression: CompressionType::None,
423        }
424    }
425
426    #[tokio::test]
427    async fn should_ingest_entries_and_enqueue_location() {
428        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
429        let ingestor =
430            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
431                .unwrap();
432
433        ingestor
434            .ingest(vec![Bytes::from("data1")], Bytes::new())
435            .await
436            .unwrap();
437        ingestor
438            .ingest(vec![Bytes::from("data2")], Bytes::new())
439            .await
440            .unwrap();
441        ingestor.flush().await.unwrap();
442
443        let entries = read_manifest_entries(&store, "test/manifest").await;
444        assert_eq!(entries.len(), 1);
445        assert!(entries[0].location.starts_with("test-ingest/"));
446        assert!(entries[0].location.ends_with(".batch"));
447    }
448
449    #[tokio::test]
450    async fn should_write_valid_batch_to_object_store() {
451        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
452        let ingestor =
453            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
454                .unwrap();
455
456        ingestor
457            .ingest(vec![Bytes::from("mydata")], Bytes::new())
458            .await
459            .unwrap();
460        ingestor.flush().await.unwrap();
461
462        let entries = read_manifest_entries(&store, "test/manifest").await;
463        let path = Path::from(entries[0].location.as_str());
464        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
465        let parsed = decode_batch(data).unwrap();
466
467        assert_eq!(parsed.len(), 1);
468        assert_eq!(parsed[0], Bytes::from("mydata"));
469    }
470
471    #[tokio::test]
472    async fn should_flush_when_batch_size_exceeded() {
473        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
474
475        let mut config = test_config();
476        config.flush_size_bytes = 10;
477
478        let ingestor =
479            Ingestor::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
480
481        let mut watcher = ingestor
482            .ingest(vec![Bytes::from("some-long-data")], Bytes::new())
483            .await
484            .unwrap();
485        watcher.watcher.await_durable().await.unwrap();
486
487        let entries = read_manifest_entries(&store, "test/manifest").await;
488        assert_eq!(entries.len(), 1);
489    }
490
491    #[tokio::test]
492    async fn should_flush_when_interval_elapsed() {
493        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
494
495        let mut config = test_config();
496        config.flush_interval = Duration::from_millis(50);
497        config.flush_size_bytes = 64 * 1024 * 1024;
498
499        let ingestor =
500            Ingestor::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
501
502        let mut watcher = ingestor
503            .ingest(vec![Bytes::from("v1")], Bytes::new())
504            .await
505            .unwrap();
506
507        assert!(watcher.watcher.result().is_none());
508        let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
509        assert!(store.get(&manifest_path).await.is_err());
510
511        watcher.watcher.await_durable().await.unwrap();
512
513        let entries = read_manifest_entries(&store, "test/manifest").await;
514        assert_eq!(entries.len(), 1);
515    }
516
517    #[tokio::test]
518    async fn should_not_flush_below_thresholds() {
519        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
520        let ingestor =
521            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
522                .unwrap();
523
524        let watcher = ingestor
525            .ingest(vec![Bytes::from("v")], Bytes::new())
526            .await
527            .unwrap();
528
529        assert!(watcher.watcher.result().is_none());
530
531        let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
532        assert!(store.get(&manifest_path).await.is_err());
533
534        ingestor.flush().await.unwrap();
535
536        assert!(watcher.watcher.result().unwrap().is_ok());
537
538        let entries = read_manifest_entries(&store, "test/manifest").await;
539        assert_eq!(entries.len(), 1);
540    }
541
542    #[tokio::test]
543    async fn should_batch_multiple_ingests_into_single_file() {
544        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
545        let ingestor =
546            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
547                .unwrap();
548
549        let watcher1 = ingestor
550            .ingest(vec![Bytes::from("data1")], Bytes::new())
551            .await
552            .unwrap();
553        let watcher2 = ingestor
554            .ingest(vec![Bytes::from("data2")], Bytes::new())
555            .await
556            .unwrap();
557
558        ingestor.flush().await.unwrap();
559
560        assert!(watcher1.watcher.result().unwrap().is_ok());
561        assert!(watcher2.watcher.result().unwrap().is_ok());
562
563        let entries = read_manifest_entries(&store, "test/manifest").await;
564        assert_eq!(entries.len(), 1);
565
566        let path = Path::from(entries[0].location.as_str());
567        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
568        let parsed = decode_batch(data).unwrap();
569        assert_eq!(parsed.len(), 2);
570        assert_eq!(parsed[0], Bytes::from("data1"));
571        assert_eq!(parsed[1], Bytes::from("data2"));
572    }
573
574    #[tokio::test]
575    async fn should_apply_backpressure_when_buffer_full() {
576        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
577
578        let mut config = test_config();
579        config.max_buffered_inputs = 1;
580
581        let ingestor =
582            Ingestor::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
583
584        // First ingest fills the single-slot buffer
585        ingestor
586            .ingest(vec![Bytes::from("data1")], Bytes::new())
587            .await
588            .unwrap();
589
590        // Second ingest succeeds once the background task consumes the first message
591        ingestor
592            .ingest(vec![Bytes::from("data2")], Bytes::new())
593            .await
594            .unwrap();
595
596        ingestor.flush().await.unwrap();
597
598        let entries = read_manifest_entries(&store, "test/manifest").await;
599        assert!(!entries.is_empty());
600    }
601
602    #[tokio::test]
603    async fn should_record_metadata_and_ingestion_time_in_queue_entry() {
604        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
605        let fixed_time = UNIX_EPOCH + Duration::from_millis(1_700_000_000_000);
606        let clock = Arc::new(MockClock::with_time(fixed_time));
607
608        let ingestor = Ingestor::with_object_store(test_config(), store.clone(), clock).unwrap();
609
610        let metadata = Bytes::from(r#"{"topic":"events"}"#);
611        ingestor
612            .ingest(vec![Bytes::from("payload")], metadata.clone())
613            .await
614            .unwrap();
615        ingestor.flush().await.unwrap();
616
617        let entries = read_manifest_entries(&store, "test/manifest").await;
618        assert_eq!(entries.len(), 1);
619        assert_eq!(entries[0].metadata.len(), 1);
620        assert_eq!(entries[0].metadata[0].payload, metadata);
621        assert_eq!(entries[0].metadata[0].ingestion_time_ms, 1_700_000_000_000);
622    }
623
624    #[tokio::test]
625    async fn should_flush_remaining_entries_on_close() {
626        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
627        let ingestor =
628            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
629                .unwrap();
630
631        ingestor
632            .ingest(vec![Bytes::from("unflushed")], Bytes::new())
633            .await
634            .unwrap();
635
636        ingestor.close().await.unwrap();
637
638        let entries = read_manifest_entries(&store, "test/manifest").await;
639        assert_eq!(entries.len(), 1);
640
641        let path = Path::from(entries[0].location.as_str());
642        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
643        let parsed = decode_batch(data).unwrap();
644        assert_eq!(parsed, vec![Bytes::from("unflushed")]);
645    }
646
647    #[tokio::test]
648    async fn should_produce_separate_batches_per_flush() {
649        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
650        let ingestor =
651            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
652                .unwrap();
653
654        ingestor
655            .ingest(vec![Bytes::from("batch1")], Bytes::new())
656            .await
657            .unwrap();
658        ingestor.flush().await.unwrap();
659
660        ingestor
661            .ingest(vec![Bytes::from("batch2")], Bytes::new())
662            .await
663            .unwrap();
664        ingestor.flush().await.unwrap();
665
666        let entries = read_manifest_entries(&store, "test/manifest").await;
667        assert_eq!(entries.len(), 2);
668        assert_ne!(entries[0].location, entries[1].location);
669
670        let data1 = store
671            .get(&Path::from(entries[0].location.as_str()))
672            .await
673            .unwrap()
674            .bytes()
675            .await
676            .unwrap();
677        assert_eq!(decode_batch(data1).unwrap(), vec![Bytes::from("batch1")]);
678
679        let data2 = store
680            .get(&Path::from(entries[1].location.as_str()))
681            .await
682            .unwrap()
683            .bytes()
684            .await
685            .unwrap();
686        assert_eq!(decode_batch(data2).unwrap(), vec![Bytes::from("batch2")]);
687    }
688
689    #[tokio::test]
690    async fn should_not_flush_empty_batch() {
691        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
692        let ingestor =
693            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
694                .unwrap();
695
696        ingestor.flush().await.unwrap();
697
698        let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
699        assert!(store.get(&manifest_path).await.is_err());
700    }
701
702    #[tokio::test]
703    async fn should_preserve_all_empty_entries_batch() {
704        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
705        let ingestor =
706            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
707                .unwrap();
708
709        ingestor
710            .ingest(vec![Bytes::new(), Bytes::new()], Bytes::from("meta"))
711            .await
712            .unwrap();
713        ingestor.flush().await.unwrap();
714
715        let entries = read_manifest_entries(&store, "test/manifest").await;
716        assert_eq!(entries.len(), 1);
717        assert!(!entries[0].location.is_empty());
718        assert_eq!(entries[0].metadata.len(), 1);
719        assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
720        assert_eq!(entries[0].metadata[0].start_index, 0);
721
722        let data = store
723            .get(&Path::from(entries[0].location.clone()))
724            .await
725            .unwrap()
726            .bytes()
727            .await
728            .unwrap();
729        let records = decode_batch(data).unwrap();
730        assert_eq!(records, vec![Bytes::new(), Bytes::new()]);
731    }
732
733    #[tokio::test]
734    async fn should_preserve_empty_and_non_empty_entries_in_batch() {
735        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
736        let ingestor =
737            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
738                .unwrap();
739
740        ingestor
741            .ingest(
742                vec![
743                    Bytes::from("a"),
744                    Bytes::new(),
745                    Bytes::from("b"),
746                    Bytes::new(),
747                ],
748                Bytes::from("meta"),
749            )
750            .await
751            .unwrap();
752        ingestor.flush().await.unwrap();
753
754        let entries = read_manifest_entries(&store, "test/manifest").await;
755        assert_eq!(entries.len(), 1);
756        assert!(!entries[0].location.is_empty());
757        assert_eq!(entries[0].metadata.len(), 1);
758        assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
759        assert_eq!(entries[0].metadata[0].start_index, 0);
760
761        let data = store
762            .get(&Path::from(entries[0].location.clone()))
763            .await
764            .unwrap()
765            .bytes()
766            .await
767            .unwrap();
768        let records = decode_batch(data).unwrap();
769        assert_eq!(
770            records,
771            vec![
772                Bytes::from("a"),
773                Bytes::new(),
774                Bytes::from("b"),
775                Bytes::new()
776            ]
777        );
778    }
779
780    #[tokio::test]
781    async fn should_reject_empty_entries() {
782        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
783        let ingestor =
784            Ingestor::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
785                .unwrap();
786
787        let result = ingestor.ingest(vec![], Bytes::new()).await;
788        assert!(matches!(result, Err(Error::InvalidInput(_))));
789
790        let result = ingestor.ingest(vec![], Bytes::from("meta")).await;
791        assert!(matches!(result, Err(Error::InvalidInput(_))));
792    }
793}