Skip to main content

buffer/
producer.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant, SystemTime};
3
4use bytes::Bytes;
5use common::clock::Clock;
6use slatedb::object_store::path::Path;
7use slatedb::object_store::{ObjectStore, PutPayload};
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use crate::config::ProducerConfig;
12use crate::error::{Error, Result};
13use crate::metric_names;
14use crate::model::{CompressionType, encode_batch};
15use crate::queue::{Metadata, QueueProducer};
16use crate::util::millis;
17
18type Notifier = tokio::sync::watch::Sender<Option<Result<()>>>;
19
20#[derive(Clone)]
21pub struct DurabilityWatcher {
22    receiver: tokio::sync::watch::Receiver<Option<Result<()>>>,
23}
24
25impl DurabilityWatcher {
26    /// Return the outcome of the write if the batch has already been flushed,
27    /// or `None` if the flush has not completed yet.
28    pub fn result(&self) -> Option<Result<()>> {
29        self.receiver.borrow().clone()
30    }
31
32    /// Wait until the batch containing this write has been durably flushed.
33    pub async fn await_durable(&mut self) -> Result<()> {
34        self.receiver
35            .wait_for(|v| v.is_some())
36            .await
37            .map_err(|_| Error::Storage("buffer shut down".to_string()))?
38            .clone()
39            .expect("value must be present after wait_for")
40    }
41}
42
43pub struct WriteHandle {
44    pub watcher: DurabilityWatcher,
45    pub ingestion_time_ms: i64,
46}
47
48enum ProduceMessage {
49    Add {
50        entries: Vec<Bytes>,
51        metadata: Bytes,
52        ingestion_time_ms: i64,
53        notifier: Notifier,
54    },
55    Flush {
56        result_sender: tokio::sync::oneshot::Sender<Result<()>>,
57    },
58}
59
60#[derive(Default)]
61struct DataAndNotifiers {
62    entries: Vec<Bytes>,
63    metadata: Vec<Metadata>,
64    notifiers: Vec<Notifier>,
65}
66
67impl DataAndNotifiers {
68    fn add(
69        &mut self,
70        entries: Vec<Bytes>,
71        metadata: Bytes,
72        ingestion_time_ms: i64,
73        notifier: Notifier,
74    ) -> Result<()> {
75        let start_index = self.entries.len() as u32;
76        self.entries.extend(entries);
77        if self.entries.len() > u32::MAX as usize {
78            return Err(Error::InvalidInput(format!(
79                "batch entry count {} exceeds u32::MAX",
80                self.entries.len()
81            )));
82        }
83        self.metadata.push(Metadata {
84            start_index,
85            ingestion_time_ms,
86            payload: metadata,
87        });
88        self.notifiers.push(notifier);
89        Ok(())
90    }
91
92    fn is_empty(&self) -> bool {
93        self.entries.is_empty() && self.metadata.is_empty() && self.notifiers.is_empty()
94    }
95}
96
97struct Batch {
98    data_and_notifiers: DataAndNotifiers,
99    size_bytes: usize,
100    started_at: Option<SystemTime>,
101}
102
103impl Batch {
104    fn new() -> Self {
105        Self {
106            data_and_notifiers: DataAndNotifiers::default(),
107            size_bytes: 0,
108            started_at: None,
109        }
110    }
111
112    fn add(
113        &mut self,
114        entries: Vec<Bytes>,
115        metadata: Bytes,
116        ingestion_time_ms: i64,
117        notifier: Notifier,
118        now: SystemTime,
119    ) -> Result<()> {
120        let mut entry_size_sum = 0usize;
121        for e in &entries {
122            if e.len() > u32::MAX as usize {
123                return Err(Error::InvalidInput(format!(
124                    "entry size {} exceeds u32::MAX",
125                    e.len()
126                )));
127            }
128            entry_size_sum += e.len();
129        }
130        self.size_bytes += entry_size_sum + metadata.len();
131        self.data_and_notifiers
132            .add(entries, metadata, ingestion_time_ms, notifier)?;
133        if self.started_at.is_none() {
134            self.started_at = Some(now);
135        }
136        Ok(())
137    }
138
139    fn take(&mut self) -> DataAndNotifiers {
140        self.size_bytes = 0;
141        self.started_at = None;
142        std::mem::take(&mut self.data_and_notifiers)
143    }
144
145    fn is_empty(&self) -> bool {
146        self.data_and_notifiers.is_empty()
147    }
148}
149
150struct BatchWriterTask {
151    object_store: Arc<dyn ObjectStore>,
152    producer: Arc<QueueProducer>,
153    data_path_prefix: String,
154    flush_interval: Duration,
155    flush_size_bytes: usize,
156    batch_compression: CompressionType,
157    batch: Batch,
158    clock: Arc<dyn Clock>,
159}
160
161impl BatchWriterTask {
162    fn new(
163        object_store: Arc<dyn ObjectStore>,
164        producer: Arc<QueueProducer>,
165        data_path_prefix: String,
166        flush_interval: Duration,
167        flush_size_bytes: usize,
168        batch_compression: CompressionType,
169        clock: Arc<dyn Clock>,
170    ) -> Self {
171        Self {
172            object_store,
173            producer,
174            data_path_prefix,
175            flush_interval,
176            flush_size_bytes,
177            batch_compression,
178            batch: Batch::new(),
179            clock,
180        }
181    }
182
183    async fn run(&mut self, mut rx: mpsc::Receiver<ProduceMessage>, shutdown: CancellationToken) {
184        loop {
185            let sleep_duration = match self.batch.started_at {
186                Some(started) => (started + self.flush_interval)
187                    .duration_since(self.clock.now())
188                    .unwrap_or(Duration::ZERO),
189                None => self.flush_interval,
190            };
191
192            tokio::select! {
193                biased;
194                _ = shutdown.cancelled() => {
195                    return;
196                },
197                msg = rx.recv() => {
198                    match msg {
199                        Some(ProduceMessage::Add { entries, metadata, ingestion_time_ms, notifier }) => {
200                            if let Err(e) = self.batch.add(entries, metadata, ingestion_time_ms, notifier.clone(), self.clock.now()) {
201                                let _ = notifier.send(Some(Err(e)));
202                            } else if self.batch.size_bytes >= self.flush_size_bytes {
203                                let _ = self.write_batch().await;
204                            }
205                        }
206                        Some(ProduceMessage::Flush { result_sender }) => {
207                            let _ = result_sender.send(self.write_batch().await);
208                        }
209                        None => break,
210                    }
211                },
212                _ = tokio::time::sleep(sleep_duration) => {
213                    let _ = self.write_batch().await;
214                },
215            }
216        }
217    }
218
219    async fn write_batch(&mut self) -> Result<()> {
220        if self.batch.is_empty() {
221            return Ok(());
222        }
223        let data_and_notifiers = self.batch.take();
224        let result = self
225            .write_and_enqueue(data_and_notifiers.entries, data_and_notifiers.metadata)
226            .await;
227
228        for notifier in data_and_notifiers.notifiers {
229            let _ = notifier.send(Some(result.clone()));
230        }
231
232        result
233    }
234
235    async fn write_and_enqueue(&self, entries: Vec<Bytes>, metadata: Vec<Metadata>) -> Result<()> {
236        let start = Instant::now();
237        let raw_bytes: u64 = entries.iter().map(|e| e.len() as u64).sum();
238        let entry_count = entries.len() as u64;
239
240        let payload = encode_batch(&entries, self.batch_compression)?;
241        let written_bytes = payload.len() as u64;
242
243        let id = ulid::Ulid::new();
244        let path = Path::from(format!("{}/{}.batch", self.data_path_prefix, id));
245        self.object_store
246            .put(&path, PutPayload::from(payload))
247            .await
248            .map_err(|e| Error::Storage(e.to_string()))?;
249
250        self.producer.enqueue(path.to_string(), metadata).await?;
251
252        metrics::counter!(metric_names::BATCHES_FLUSHED).increment(1);
253        metrics::counter!(metric_names::ENTRIES_FLUSHED).increment(entry_count);
254        metrics::counter!(metric_names::BYTES_FLUSHED).increment(raw_bytes);
255        metrics::counter!(metric_names::BYTES_WRITTEN).increment(written_bytes);
256        metrics::histogram!(metric_names::FLUSH_DURATION_SECONDS)
257            .record(start.elapsed().as_secs_f64());
258
259        Ok(())
260    }
261}
262
263struct BatchWriter {
264    producer: Arc<QueueProducer>,
265    sender: mpsc::Sender<ProduceMessage>,
266    cancellation_token: CancellationToken,
267    handle: tokio::task::JoinHandle<()>,
268}
269
270impl BatchWriter {
271    fn new(
272        object_store: Arc<dyn ObjectStore>,
273        config: &ProducerConfig,
274        clock: Arc<dyn Clock>,
275    ) -> Self {
276        let (sender, receiver) = mpsc::channel(config.max_buffered_inputs);
277        let producer = Arc::new(QueueProducer::with_object_store(
278            config.manifest_path.clone(),
279            object_store.clone(),
280        ));
281        let mut task = BatchWriterTask::new(
282            object_store,
283            producer.clone(),
284            config.data_path_prefix.clone(),
285            config.flush_interval,
286            config.flush_size_bytes,
287            config.batch_compression,
288            clock,
289        );
290        let shutdown = CancellationToken::new();
291        let cancellation_token = shutdown.clone();
292        let handle = tokio::spawn(async move { task.run(receiver, shutdown).await });
293        Self {
294            producer,
295            sender,
296            cancellation_token,
297            handle,
298        }
299    }
300
301    async fn add(
302        &self,
303        entries: Vec<Bytes>,
304        metadata: Bytes,
305        ingestion_time_ms: i64,
306    ) -> Result<DurabilityWatcher> {
307        let (notifier_sender, notifier_receiver) = tokio::sync::watch::channel(None);
308        self.sender
309            .send(ProduceMessage::Add {
310                entries,
311                metadata,
312                ingestion_time_ms,
313                notifier: notifier_sender,
314            })
315            .await
316            .map_err(|_| Error::Storage("buffer shut down".to_string()))?;
317        Ok(DurabilityWatcher {
318            receiver: notifier_receiver,
319        })
320    }
321
322    async fn flush(&self) -> Result<()> {
323        let (result_sender, result_receiver) = tokio::sync::oneshot::channel();
324        self.sender
325            .send(ProduceMessage::Flush { result_sender })
326            .await
327            .map_err(|_| Error::Storage("batch writer task not running".to_string()))?;
328        result_receiver
329            .await
330            .map_err(|_| Error::Storage("batch writer task not running".to_string()))?
331    }
332
333    fn conflict_rate(&self) -> f64 {
334        self.producer.conflict_rate()
335    }
336    async fn close(self) -> Result<()> {
337        self.flush().await?;
338        self.cancellation_token.cancel();
339        let _ = self.handle.await;
340        Ok(())
341    }
342}
343
344pub struct Producer {
345    writer: BatchWriter,
346    clock: Arc<dyn Clock>,
347}
348
349impl Producer {
350    /// Create a new buffer from the given configuration and clock.
351    pub fn new(config: ProducerConfig, clock: Arc<dyn Clock>) -> Result<Self> {
352        let object_store = common::storage::factory::create_object_store(&config.object_store)
353            .map_err(|e| Error::Storage(e.to_string()))?;
354        Self::with_object_store(config, object_store, clock)
355    }
356
357    /// Create a new buffer using a pre-built object store.
358    ///
359    /// This is useful when you need to share an object store instance
360    /// between a `Producer` and a [`Consumer`](crate::Consumer) (e.g. in tests).
361    pub fn with_object_store(
362        config: ProducerConfig,
363        object_store: Arc<dyn ObjectStore>,
364        clock: Arc<dyn Clock>,
365    ) -> Result<Self> {
366        metric_names::describe_buffer_metrics();
367        let writer = BatchWriter::new(object_store, &config, clock.clone());
368        Ok(Self { writer, clock })
369    }
370
371    /// Submit a set of entries and associated metadata for ingestion.
372    ///
373    /// Returns a [`WriteHandle`] that can be used to check or await durability.
374    /// Applies backpressure when the message buffer is full.
375    ///
376    /// Returns [`Error::InvalidInput`] if `entries` is empty or if `metadata`
377    /// exceeds 2³²−1 bytes.
378    pub async fn produce(&self, entries: Vec<Bytes>, metadata: Bytes) -> Result<WriteHandle> {
379        if entries.is_empty() {
380            return Err(Error::InvalidInput("entries must not be empty".to_string()));
381        }
382        if metadata.len() > u32::MAX as usize {
383            return Err(Error::InvalidInput(format!(
384                "metadata size {} exceeds u32::MAX",
385                metadata.len()
386            )));
387        }
388        let ingestion_time_ms = millis(self.clock.now());
389        let durability_watcher = self
390            .writer
391            .add(entries, metadata, ingestion_time_ms)
392            .await?;
393        Ok(WriteHandle {
394            watcher: durability_watcher,
395            ingestion_time_ms,
396        })
397    }
398
399    /// Flush the current batch, blocking until all pending entries are durably written.
400    pub async fn flush(&self) -> Result<()> {
401        self.writer.flush().await
402    }
403
404    /// Return the fraction of manifest writes that encountered optimistic-concurrency conflicts.
405    pub fn conflict_rate(&self) -> f64 {
406        self.writer.conflict_rate()
407    }
408
409    /// Shut down the buffer, flushing any remaining buffered entries before returning.
410    pub async fn close(self) -> Result<()> {
411        self.writer.close().await
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use crate::config::ProducerConfig;
419    use crate::model::decode_batch;
420    use crate::queue::{Manifest, QueueEntry};
421    use bytes::Bytes;
422    use common::ObjectStoreConfig;
423    use common::clock::{MockClock, SystemClock};
424    use slatedb::object_store::ObjectStore;
425    use slatedb::object_store::memory::InMemory;
426    use std::time::UNIX_EPOCH;
427
428    async fn read_manifest_entries(store: &Arc<dyn ObjectStore>, path: &str) -> Vec<QueueEntry> {
429        let path = slatedb::object_store::path::Path::from(path);
430        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
431        let manifest = Manifest::from_bytes(data).unwrap();
432        manifest.iter().map(|e| e.unwrap()).collect()
433    }
434
435    fn test_config() -> ProducerConfig {
436        ProducerConfig {
437            object_store: ObjectStoreConfig::InMemory,
438            data_path_prefix: "test-ingest".to_string(),
439            manifest_path: "test/manifest".to_string(),
440            flush_interval: Duration::from_hours(24),
441            flush_size_bytes: 64 * 1024 * 1024,
442            max_buffered_inputs: 1000,
443            batch_compression: CompressionType::None,
444        }
445    }
446
447    #[tokio::test]
448    async fn should_ingest_entries_and_enqueue_location() {
449        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
450        let buffer =
451            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
452                .unwrap();
453
454        buffer
455            .produce(vec![Bytes::from("data1")], Bytes::new())
456            .await
457            .unwrap();
458        buffer
459            .produce(vec![Bytes::from("data2")], Bytes::new())
460            .await
461            .unwrap();
462        buffer.flush().await.unwrap();
463
464        let entries = read_manifest_entries(&store, "test/manifest").await;
465        assert_eq!(entries.len(), 1);
466        assert!(entries[0].location.starts_with("test-ingest/"));
467        assert!(entries[0].location.ends_with(".batch"));
468    }
469
470    #[tokio::test]
471    async fn should_write_valid_batch_to_object_store() {
472        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
473        let buffer =
474            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
475                .unwrap();
476
477        buffer
478            .produce(vec![Bytes::from("mydata")], Bytes::new())
479            .await
480            .unwrap();
481        buffer.flush().await.unwrap();
482
483        let entries = read_manifest_entries(&store, "test/manifest").await;
484        let path = Path::from(entries[0].location.as_str());
485        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
486        let parsed = decode_batch(data).unwrap();
487
488        assert_eq!(parsed.len(), 1);
489        assert_eq!(parsed[0], Bytes::from("mydata"));
490    }
491
492    #[tokio::test]
493    async fn should_flush_when_batch_size_exceeded() {
494        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
495
496        let mut config = test_config();
497        config.flush_size_bytes = 10;
498
499        let buffer =
500            Producer::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
501
502        let mut watcher = buffer
503            .produce(vec![Bytes::from("some-long-data")], Bytes::new())
504            .await
505            .unwrap();
506        watcher.watcher.await_durable().await.unwrap();
507
508        let entries = read_manifest_entries(&store, "test/manifest").await;
509        assert_eq!(entries.len(), 1);
510    }
511
512    #[tokio::test]
513    async fn should_flush_when_interval_elapsed() {
514        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
515
516        let mut config = test_config();
517        config.flush_interval = Duration::from_millis(50);
518        config.flush_size_bytes = 64 * 1024 * 1024;
519
520        let buffer =
521            Producer::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
522
523        let mut watcher = buffer
524            .produce(vec![Bytes::from("v1")], Bytes::new())
525            .await
526            .unwrap();
527
528        assert!(watcher.watcher.result().is_none());
529        let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
530        assert!(store.get(&manifest_path).await.is_err());
531
532        watcher.watcher.await_durable().await.unwrap();
533
534        let entries = read_manifest_entries(&store, "test/manifest").await;
535        assert_eq!(entries.len(), 1);
536    }
537
538    #[tokio::test]
539    async fn should_not_flush_below_thresholds() {
540        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
541        let buffer =
542            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
543                .unwrap();
544
545        let watcher = buffer
546            .produce(vec![Bytes::from("v")], Bytes::new())
547            .await
548            .unwrap();
549
550        assert!(watcher.watcher.result().is_none());
551
552        let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
553        assert!(store.get(&manifest_path).await.is_err());
554
555        buffer.flush().await.unwrap();
556
557        assert!(watcher.watcher.result().unwrap().is_ok());
558
559        let entries = read_manifest_entries(&store, "test/manifest").await;
560        assert_eq!(entries.len(), 1);
561    }
562
563    #[tokio::test]
564    async fn should_batch_multiple_ingests_into_single_file() {
565        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
566        let buffer =
567            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
568                .unwrap();
569
570        let watcher1 = buffer
571            .produce(vec![Bytes::from("data1")], Bytes::new())
572            .await
573            .unwrap();
574        let watcher2 = buffer
575            .produce(vec![Bytes::from("data2")], Bytes::new())
576            .await
577            .unwrap();
578
579        buffer.flush().await.unwrap();
580
581        assert!(watcher1.watcher.result().unwrap().is_ok());
582        assert!(watcher2.watcher.result().unwrap().is_ok());
583
584        let entries = read_manifest_entries(&store, "test/manifest").await;
585        assert_eq!(entries.len(), 1);
586
587        let path = Path::from(entries[0].location.as_str());
588        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
589        let parsed = decode_batch(data).unwrap();
590        assert_eq!(parsed.len(), 2);
591        assert_eq!(parsed[0], Bytes::from("data1"));
592        assert_eq!(parsed[1], Bytes::from("data2"));
593    }
594
595    #[tokio::test]
596    async fn should_apply_backpressure_when_buffer_full() {
597        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
598
599        let mut config = test_config();
600        config.max_buffered_inputs = 1;
601
602        let buffer =
603            Producer::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
604
605        // First ingest fills the single-slot buffer
606        buffer
607            .produce(vec![Bytes::from("data1")], Bytes::new())
608            .await
609            .unwrap();
610
611        // Second ingest succeeds once the background task consumes the first message
612        buffer
613            .produce(vec![Bytes::from("data2")], Bytes::new())
614            .await
615            .unwrap();
616
617        buffer.flush().await.unwrap();
618
619        let entries = read_manifest_entries(&store, "test/manifest").await;
620        assert!(!entries.is_empty());
621    }
622
623    #[tokio::test]
624    async fn should_record_metadata_and_ingestion_time_in_queue_entry() {
625        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
626        let millis = 1_700_000_000_000i64;
627        let fixed_time = UNIX_EPOCH + Duration::from_millis(millis as u64);
628        let clock = Arc::new(MockClock::with_time(fixed_time));
629
630        let buffer = Producer::with_object_store(test_config(), store.clone(), clock).unwrap();
631
632        let metadata = Bytes::from(r#"{"topic":"events"}"#);
633        let handle = buffer
634            .produce(vec![Bytes::from("payload")], metadata.clone())
635            .await
636            .unwrap();
637        assert_eq!(handle.ingestion_time_ms, millis);
638        buffer.flush().await.unwrap();
639
640        let entries = read_manifest_entries(&store, "test/manifest").await;
641        assert_eq!(entries.len(), 1);
642        assert_eq!(entries[0].metadata.len(), 1);
643        assert_eq!(entries[0].metadata[0].payload, metadata);
644        assert_eq!(entries[0].metadata[0].ingestion_time_ms, millis);
645    }
646
647    #[tokio::test]
648    async fn should_flush_remaining_entries_on_close() {
649        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
650        let buffer =
651            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
652                .unwrap();
653
654        buffer
655            .produce(vec![Bytes::from("unflushed")], Bytes::new())
656            .await
657            .unwrap();
658
659        buffer.close().await.unwrap();
660
661        let entries = read_manifest_entries(&store, "test/manifest").await;
662        assert_eq!(entries.len(), 1);
663
664        let path = Path::from(entries[0].location.as_str());
665        let data = store.get(&path).await.unwrap().bytes().await.unwrap();
666        let parsed = decode_batch(data).unwrap();
667        assert_eq!(parsed, vec![Bytes::from("unflushed")]);
668    }
669
670    #[tokio::test]
671    async fn should_produce_separate_batches_per_flush() {
672        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
673        let buffer =
674            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
675                .unwrap();
676
677        buffer
678            .produce(vec![Bytes::from("batch1")], Bytes::new())
679            .await
680            .unwrap();
681        buffer.flush().await.unwrap();
682
683        buffer
684            .produce(vec![Bytes::from("batch2")], Bytes::new())
685            .await
686            .unwrap();
687        buffer.flush().await.unwrap();
688
689        let entries = read_manifest_entries(&store, "test/manifest").await;
690        assert_eq!(entries.len(), 2);
691        assert_ne!(entries[0].location, entries[1].location);
692
693        let data1 = store
694            .get(&Path::from(entries[0].location.as_str()))
695            .await
696            .unwrap()
697            .bytes()
698            .await
699            .unwrap();
700        assert_eq!(decode_batch(data1).unwrap(), vec![Bytes::from("batch1")]);
701
702        let data2 = store
703            .get(&Path::from(entries[1].location.as_str()))
704            .await
705            .unwrap()
706            .bytes()
707            .await
708            .unwrap();
709        assert_eq!(decode_batch(data2).unwrap(), vec![Bytes::from("batch2")]);
710    }
711
712    #[tokio::test]
713    async fn should_not_flush_empty_batch() {
714        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
715        let buffer =
716            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
717                .unwrap();
718
719        buffer.flush().await.unwrap();
720
721        let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
722        assert!(store.get(&manifest_path).await.is_err());
723    }
724
725    #[tokio::test]
726    async fn should_preserve_all_empty_entries_batch() {
727        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
728        let buffer =
729            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
730                .unwrap();
731
732        buffer
733            .produce(vec![Bytes::new(), Bytes::new()], Bytes::from("meta"))
734            .await
735            .unwrap();
736        buffer.flush().await.unwrap();
737
738        let entries = read_manifest_entries(&store, "test/manifest").await;
739        assert_eq!(entries.len(), 1);
740        assert!(!entries[0].location.is_empty());
741        assert_eq!(entries[0].metadata.len(), 1);
742        assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
743        assert_eq!(entries[0].metadata[0].start_index, 0);
744
745        let data = store
746            .get(&Path::from(entries[0].location.clone()))
747            .await
748            .unwrap()
749            .bytes()
750            .await
751            .unwrap();
752        let records = decode_batch(data).unwrap();
753        assert_eq!(records, vec![Bytes::new(), Bytes::new()]);
754    }
755
756    #[tokio::test]
757    async fn should_preserve_empty_and_non_empty_entries_in_batch() {
758        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
759        let buffer =
760            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
761                .unwrap();
762
763        buffer
764            .produce(
765                vec![
766                    Bytes::from("a"),
767                    Bytes::new(),
768                    Bytes::from("b"),
769                    Bytes::new(),
770                ],
771                Bytes::from("meta"),
772            )
773            .await
774            .unwrap();
775        buffer.flush().await.unwrap();
776
777        let entries = read_manifest_entries(&store, "test/manifest").await;
778        assert_eq!(entries.len(), 1);
779        assert!(!entries[0].location.is_empty());
780        assert_eq!(entries[0].metadata.len(), 1);
781        assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
782        assert_eq!(entries[0].metadata[0].start_index, 0);
783
784        let data = store
785            .get(&Path::from(entries[0].location.clone()))
786            .await
787            .unwrap()
788            .bytes()
789            .await
790            .unwrap();
791        let records = decode_batch(data).unwrap();
792        assert_eq!(
793            records,
794            vec![
795                Bytes::from("a"),
796                Bytes::new(),
797                Bytes::from("b"),
798                Bytes::new()
799            ]
800        );
801    }
802
803    #[tokio::test]
804    async fn should_reject_empty_entries() {
805        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
806        let buffer =
807            Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
808                .unwrap();
809
810        let result = buffer.produce(vec![], Bytes::new()).await;
811        assert!(matches!(result, Err(Error::InvalidInput(_))));
812
813        let result = buffer.produce(vec![], Bytes::from("meta")).await;
814        assert!(matches!(result, Err(Error::InvalidInput(_))));
815    }
816}