1use std::sync::Arc;
2use std::time::{Duration, Instant, SystemTime};
3
4use bytes::Bytes;
5use common::clock::Clock;
6use slatedb::object_store::path::Path;
7use slatedb::object_store::{ObjectStore, PutPayload};
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10
11use crate::config::ProducerConfig;
12use crate::error::{Error, Result};
13use crate::metric_names;
14use crate::model::{CompressionType, encode_batch};
15use crate::queue::{Metadata, QueueProducer};
16use crate::util::millis;
17
18type Notifier = tokio::sync::watch::Sender<Option<Result<()>>>;
19
20#[derive(Clone)]
21pub struct DurabilityWatcher {
22 receiver: tokio::sync::watch::Receiver<Option<Result<()>>>,
23}
24
25impl DurabilityWatcher {
26 pub fn result(&self) -> Option<Result<()>> {
29 self.receiver.borrow().clone()
30 }
31
32 pub async fn await_durable(&mut self) -> Result<()> {
34 self.receiver
35 .wait_for(|v| v.is_some())
36 .await
37 .map_err(|_| Error::Storage("buffer shut down".to_string()))?
38 .clone()
39 .expect("value must be present after wait_for")
40 }
41}
42
43pub struct WriteHandle {
44 pub watcher: DurabilityWatcher,
45 pub ingestion_time_ms: i64,
46}
47
48enum ProduceMessage {
49 Add {
50 entries: Vec<Bytes>,
51 metadata: Bytes,
52 ingestion_time_ms: i64,
53 notifier: Notifier,
54 },
55 Flush {
56 result_sender: tokio::sync::oneshot::Sender<Result<()>>,
57 },
58}
59
60#[derive(Default)]
61struct DataAndNotifiers {
62 entries: Vec<Bytes>,
63 metadata: Vec<Metadata>,
64 notifiers: Vec<Notifier>,
65}
66
67impl DataAndNotifiers {
68 fn add(
69 &mut self,
70 entries: Vec<Bytes>,
71 metadata: Bytes,
72 ingestion_time_ms: i64,
73 notifier: Notifier,
74 ) -> Result<()> {
75 let start_index = self.entries.len() as u32;
76 self.entries.extend(entries);
77 if self.entries.len() > u32::MAX as usize {
78 return Err(Error::InvalidInput(format!(
79 "batch entry count {} exceeds u32::MAX",
80 self.entries.len()
81 )));
82 }
83 self.metadata.push(Metadata {
84 start_index,
85 ingestion_time_ms,
86 payload: metadata,
87 });
88 self.notifiers.push(notifier);
89 Ok(())
90 }
91
92 fn is_empty(&self) -> bool {
93 self.entries.is_empty() && self.metadata.is_empty() && self.notifiers.is_empty()
94 }
95}
96
97struct Batch {
98 data_and_notifiers: DataAndNotifiers,
99 size_bytes: usize,
100 started_at: Option<SystemTime>,
101}
102
103impl Batch {
104 fn new() -> Self {
105 Self {
106 data_and_notifiers: DataAndNotifiers::default(),
107 size_bytes: 0,
108 started_at: None,
109 }
110 }
111
112 fn add(
113 &mut self,
114 entries: Vec<Bytes>,
115 metadata: Bytes,
116 ingestion_time_ms: i64,
117 notifier: Notifier,
118 now: SystemTime,
119 ) -> Result<()> {
120 let mut entry_size_sum = 0usize;
121 for e in &entries {
122 if e.len() > u32::MAX as usize {
123 return Err(Error::InvalidInput(format!(
124 "entry size {} exceeds u32::MAX",
125 e.len()
126 )));
127 }
128 entry_size_sum += e.len();
129 }
130 self.size_bytes += entry_size_sum + metadata.len();
131 self.data_and_notifiers
132 .add(entries, metadata, ingestion_time_ms, notifier)?;
133 if self.started_at.is_none() {
134 self.started_at = Some(now);
135 }
136 Ok(())
137 }
138
139 fn take(&mut self) -> DataAndNotifiers {
140 self.size_bytes = 0;
141 self.started_at = None;
142 std::mem::take(&mut self.data_and_notifiers)
143 }
144
145 fn is_empty(&self) -> bool {
146 self.data_and_notifiers.is_empty()
147 }
148}
149
150struct BatchWriterTask {
151 object_store: Arc<dyn ObjectStore>,
152 producer: Arc<QueueProducer>,
153 data_path_prefix: String,
154 flush_interval: Duration,
155 flush_size_bytes: usize,
156 batch_compression: CompressionType,
157 batch: Batch,
158 clock: Arc<dyn Clock>,
159}
160
161impl BatchWriterTask {
162 fn new(
163 object_store: Arc<dyn ObjectStore>,
164 producer: Arc<QueueProducer>,
165 data_path_prefix: String,
166 flush_interval: Duration,
167 flush_size_bytes: usize,
168 batch_compression: CompressionType,
169 clock: Arc<dyn Clock>,
170 ) -> Self {
171 Self {
172 object_store,
173 producer,
174 data_path_prefix,
175 flush_interval,
176 flush_size_bytes,
177 batch_compression,
178 batch: Batch::new(),
179 clock,
180 }
181 }
182
183 async fn run(&mut self, mut rx: mpsc::Receiver<ProduceMessage>, shutdown: CancellationToken) {
184 loop {
185 let sleep_duration = match self.batch.started_at {
186 Some(started) => (started + self.flush_interval)
187 .duration_since(self.clock.now())
188 .unwrap_or(Duration::ZERO),
189 None => self.flush_interval,
190 };
191
192 tokio::select! {
193 biased;
194 _ = shutdown.cancelled() => {
195 return;
196 },
197 msg = rx.recv() => {
198 match msg {
199 Some(ProduceMessage::Add { entries, metadata, ingestion_time_ms, notifier }) => {
200 if let Err(e) = self.batch.add(entries, metadata, ingestion_time_ms, notifier.clone(), self.clock.now()) {
201 let _ = notifier.send(Some(Err(e)));
202 } else if self.batch.size_bytes >= self.flush_size_bytes {
203 let _ = self.write_batch().await;
204 }
205 }
206 Some(ProduceMessage::Flush { result_sender }) => {
207 let _ = result_sender.send(self.write_batch().await);
208 }
209 None => break,
210 }
211 },
212 _ = tokio::time::sleep(sleep_duration) => {
213 let _ = self.write_batch().await;
214 },
215 }
216 }
217 }
218
219 async fn write_batch(&mut self) -> Result<()> {
220 if self.batch.is_empty() {
221 return Ok(());
222 }
223 let data_and_notifiers = self.batch.take();
224 let result = self
225 .write_and_enqueue(data_and_notifiers.entries, data_and_notifiers.metadata)
226 .await;
227
228 for notifier in data_and_notifiers.notifiers {
229 let _ = notifier.send(Some(result.clone()));
230 }
231
232 result
233 }
234
235 async fn write_and_enqueue(&self, entries: Vec<Bytes>, metadata: Vec<Metadata>) -> Result<()> {
236 let start = Instant::now();
237 let raw_bytes: u64 = entries.iter().map(|e| e.len() as u64).sum();
238 let entry_count = entries.len() as u64;
239
240 let payload = encode_batch(&entries, self.batch_compression)?;
241 let written_bytes = payload.len() as u64;
242
243 let id = ulid::Ulid::new();
244 let path = Path::from(format!("{}/{}.batch", self.data_path_prefix, id));
245 self.object_store
246 .put(&path, PutPayload::from(payload))
247 .await
248 .map_err(|e| Error::Storage(e.to_string()))?;
249
250 self.producer.enqueue(path.to_string(), metadata).await?;
251
252 metrics::counter!(metric_names::BATCHES_FLUSHED).increment(1);
253 metrics::counter!(metric_names::ENTRIES_FLUSHED).increment(entry_count);
254 metrics::counter!(metric_names::BYTES_FLUSHED).increment(raw_bytes);
255 metrics::counter!(metric_names::BYTES_WRITTEN).increment(written_bytes);
256 metrics::histogram!(metric_names::FLUSH_DURATION_SECONDS)
257 .record(start.elapsed().as_secs_f64());
258
259 Ok(())
260 }
261}
262
263struct BatchWriter {
264 producer: Arc<QueueProducer>,
265 sender: mpsc::Sender<ProduceMessage>,
266 cancellation_token: CancellationToken,
267 handle: tokio::task::JoinHandle<()>,
268}
269
270impl BatchWriter {
271 fn new(
272 object_store: Arc<dyn ObjectStore>,
273 config: &ProducerConfig,
274 clock: Arc<dyn Clock>,
275 ) -> Self {
276 let (sender, receiver) = mpsc::channel(config.max_buffered_inputs);
277 let producer = Arc::new(QueueProducer::with_object_store(
278 config.manifest_path.clone(),
279 object_store.clone(),
280 ));
281 let mut task = BatchWriterTask::new(
282 object_store,
283 producer.clone(),
284 config.data_path_prefix.clone(),
285 config.flush_interval,
286 config.flush_size_bytes,
287 config.batch_compression,
288 clock,
289 );
290 let shutdown = CancellationToken::new();
291 let cancellation_token = shutdown.clone();
292 let handle = tokio::spawn(async move { task.run(receiver, shutdown).await });
293 Self {
294 producer,
295 sender,
296 cancellation_token,
297 handle,
298 }
299 }
300
301 async fn add(
302 &self,
303 entries: Vec<Bytes>,
304 metadata: Bytes,
305 ingestion_time_ms: i64,
306 ) -> Result<DurabilityWatcher> {
307 let (notifier_sender, notifier_receiver) = tokio::sync::watch::channel(None);
308 self.sender
309 .send(ProduceMessage::Add {
310 entries,
311 metadata,
312 ingestion_time_ms,
313 notifier: notifier_sender,
314 })
315 .await
316 .map_err(|_| Error::Storage("buffer shut down".to_string()))?;
317 Ok(DurabilityWatcher {
318 receiver: notifier_receiver,
319 })
320 }
321
322 async fn flush(&self) -> Result<()> {
323 let (result_sender, result_receiver) = tokio::sync::oneshot::channel();
324 self.sender
325 .send(ProduceMessage::Flush { result_sender })
326 .await
327 .map_err(|_| Error::Storage("batch writer task not running".to_string()))?;
328 result_receiver
329 .await
330 .map_err(|_| Error::Storage("batch writer task not running".to_string()))?
331 }
332
333 fn conflict_rate(&self) -> f64 {
334 self.producer.conflict_rate()
335 }
336 async fn close(self) -> Result<()> {
337 self.flush().await?;
338 self.cancellation_token.cancel();
339 let _ = self.handle.await;
340 Ok(())
341 }
342}
343
344pub struct Producer {
345 writer: BatchWriter,
346 clock: Arc<dyn Clock>,
347}
348
349impl Producer {
350 pub fn new(config: ProducerConfig, clock: Arc<dyn Clock>) -> Result<Self> {
352 let object_store = common::storage::factory::create_object_store(&config.object_store)
353 .map_err(|e| Error::Storage(e.to_string()))?;
354 Self::with_object_store(config, object_store, clock)
355 }
356
357 pub fn with_object_store(
362 config: ProducerConfig,
363 object_store: Arc<dyn ObjectStore>,
364 clock: Arc<dyn Clock>,
365 ) -> Result<Self> {
366 metric_names::describe_buffer_metrics();
367 let writer = BatchWriter::new(object_store, &config, clock.clone());
368 Ok(Self { writer, clock })
369 }
370
371 pub async fn produce(&self, entries: Vec<Bytes>, metadata: Bytes) -> Result<WriteHandle> {
379 if entries.is_empty() {
380 return Err(Error::InvalidInput("entries must not be empty".to_string()));
381 }
382 if metadata.len() > u32::MAX as usize {
383 return Err(Error::InvalidInput(format!(
384 "metadata size {} exceeds u32::MAX",
385 metadata.len()
386 )));
387 }
388 let ingestion_time_ms = millis(self.clock.now());
389 let durability_watcher = self
390 .writer
391 .add(entries, metadata, ingestion_time_ms)
392 .await?;
393 Ok(WriteHandle {
394 watcher: durability_watcher,
395 ingestion_time_ms,
396 })
397 }
398
399 pub async fn flush(&self) -> Result<()> {
401 self.writer.flush().await
402 }
403
404 pub fn conflict_rate(&self) -> f64 {
406 self.writer.conflict_rate()
407 }
408
409 pub async fn close(self) -> Result<()> {
411 self.writer.close().await
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use crate::config::ProducerConfig;
419 use crate::model::decode_batch;
420 use crate::queue::{Manifest, QueueEntry};
421 use bytes::Bytes;
422 use common::ObjectStoreConfig;
423 use common::clock::{MockClock, SystemClock};
424 use slatedb::object_store::ObjectStore;
425 use slatedb::object_store::memory::InMemory;
426 use std::time::UNIX_EPOCH;
427
428 async fn read_manifest_entries(store: &Arc<dyn ObjectStore>, path: &str) -> Vec<QueueEntry> {
429 let path = slatedb::object_store::path::Path::from(path);
430 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
431 let manifest = Manifest::from_bytes(data).unwrap();
432 manifest.iter().map(|e| e.unwrap()).collect()
433 }
434
435 fn test_config() -> ProducerConfig {
436 ProducerConfig {
437 object_store: ObjectStoreConfig::InMemory,
438 data_path_prefix: "test-ingest".to_string(),
439 manifest_path: "test/manifest".to_string(),
440 flush_interval: Duration::from_hours(24),
441 flush_size_bytes: 64 * 1024 * 1024,
442 max_buffered_inputs: 1000,
443 batch_compression: CompressionType::None,
444 }
445 }
446
447 #[tokio::test]
448 async fn should_ingest_entries_and_enqueue_location() {
449 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
450 let buffer =
451 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
452 .unwrap();
453
454 buffer
455 .produce(vec![Bytes::from("data1")], Bytes::new())
456 .await
457 .unwrap();
458 buffer
459 .produce(vec![Bytes::from("data2")], Bytes::new())
460 .await
461 .unwrap();
462 buffer.flush().await.unwrap();
463
464 let entries = read_manifest_entries(&store, "test/manifest").await;
465 assert_eq!(entries.len(), 1);
466 assert!(entries[0].location.starts_with("test-ingest/"));
467 assert!(entries[0].location.ends_with(".batch"));
468 }
469
470 #[tokio::test]
471 async fn should_write_valid_batch_to_object_store() {
472 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
473 let buffer =
474 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
475 .unwrap();
476
477 buffer
478 .produce(vec![Bytes::from("mydata")], Bytes::new())
479 .await
480 .unwrap();
481 buffer.flush().await.unwrap();
482
483 let entries = read_manifest_entries(&store, "test/manifest").await;
484 let path = Path::from(entries[0].location.as_str());
485 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
486 let parsed = decode_batch(data).unwrap();
487
488 assert_eq!(parsed.len(), 1);
489 assert_eq!(parsed[0], Bytes::from("mydata"));
490 }
491
492 #[tokio::test]
493 async fn should_flush_when_batch_size_exceeded() {
494 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
495
496 let mut config = test_config();
497 config.flush_size_bytes = 10;
498
499 let buffer =
500 Producer::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
501
502 let mut watcher = buffer
503 .produce(vec![Bytes::from("some-long-data")], Bytes::new())
504 .await
505 .unwrap();
506 watcher.watcher.await_durable().await.unwrap();
507
508 let entries = read_manifest_entries(&store, "test/manifest").await;
509 assert_eq!(entries.len(), 1);
510 }
511
512 #[tokio::test]
513 async fn should_flush_when_interval_elapsed() {
514 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
515
516 let mut config = test_config();
517 config.flush_interval = Duration::from_millis(50);
518 config.flush_size_bytes = 64 * 1024 * 1024;
519
520 let buffer =
521 Producer::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
522
523 let mut watcher = buffer
524 .produce(vec![Bytes::from("v1")], Bytes::new())
525 .await
526 .unwrap();
527
528 assert!(watcher.watcher.result().is_none());
529 let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
530 assert!(store.get(&manifest_path).await.is_err());
531
532 watcher.watcher.await_durable().await.unwrap();
533
534 let entries = read_manifest_entries(&store, "test/manifest").await;
535 assert_eq!(entries.len(), 1);
536 }
537
538 #[tokio::test]
539 async fn should_not_flush_below_thresholds() {
540 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
541 let buffer =
542 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
543 .unwrap();
544
545 let watcher = buffer
546 .produce(vec![Bytes::from("v")], Bytes::new())
547 .await
548 .unwrap();
549
550 assert!(watcher.watcher.result().is_none());
551
552 let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
553 assert!(store.get(&manifest_path).await.is_err());
554
555 buffer.flush().await.unwrap();
556
557 assert!(watcher.watcher.result().unwrap().is_ok());
558
559 let entries = read_manifest_entries(&store, "test/manifest").await;
560 assert_eq!(entries.len(), 1);
561 }
562
563 #[tokio::test]
564 async fn should_batch_multiple_ingests_into_single_file() {
565 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
566 let buffer =
567 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
568 .unwrap();
569
570 let watcher1 = buffer
571 .produce(vec![Bytes::from("data1")], Bytes::new())
572 .await
573 .unwrap();
574 let watcher2 = buffer
575 .produce(vec![Bytes::from("data2")], Bytes::new())
576 .await
577 .unwrap();
578
579 buffer.flush().await.unwrap();
580
581 assert!(watcher1.watcher.result().unwrap().is_ok());
582 assert!(watcher2.watcher.result().unwrap().is_ok());
583
584 let entries = read_manifest_entries(&store, "test/manifest").await;
585 assert_eq!(entries.len(), 1);
586
587 let path = Path::from(entries[0].location.as_str());
588 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
589 let parsed = decode_batch(data).unwrap();
590 assert_eq!(parsed.len(), 2);
591 assert_eq!(parsed[0], Bytes::from("data1"));
592 assert_eq!(parsed[1], Bytes::from("data2"));
593 }
594
595 #[tokio::test]
596 async fn should_apply_backpressure_when_buffer_full() {
597 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
598
599 let mut config = test_config();
600 config.max_buffered_inputs = 1;
601
602 let buffer =
603 Producer::with_object_store(config, store.clone(), Arc::new(SystemClock)).unwrap();
604
605 buffer
607 .produce(vec![Bytes::from("data1")], Bytes::new())
608 .await
609 .unwrap();
610
611 buffer
613 .produce(vec![Bytes::from("data2")], Bytes::new())
614 .await
615 .unwrap();
616
617 buffer.flush().await.unwrap();
618
619 let entries = read_manifest_entries(&store, "test/manifest").await;
620 assert!(!entries.is_empty());
621 }
622
623 #[tokio::test]
624 async fn should_record_metadata_and_ingestion_time_in_queue_entry() {
625 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
626 let millis = 1_700_000_000_000i64;
627 let fixed_time = UNIX_EPOCH + Duration::from_millis(millis as u64);
628 let clock = Arc::new(MockClock::with_time(fixed_time));
629
630 let buffer = Producer::with_object_store(test_config(), store.clone(), clock).unwrap();
631
632 let metadata = Bytes::from(r#"{"topic":"events"}"#);
633 let handle = buffer
634 .produce(vec![Bytes::from("payload")], metadata.clone())
635 .await
636 .unwrap();
637 assert_eq!(handle.ingestion_time_ms, millis);
638 buffer.flush().await.unwrap();
639
640 let entries = read_manifest_entries(&store, "test/manifest").await;
641 assert_eq!(entries.len(), 1);
642 assert_eq!(entries[0].metadata.len(), 1);
643 assert_eq!(entries[0].metadata[0].payload, metadata);
644 assert_eq!(entries[0].metadata[0].ingestion_time_ms, millis);
645 }
646
647 #[tokio::test]
648 async fn should_flush_remaining_entries_on_close() {
649 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
650 let buffer =
651 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
652 .unwrap();
653
654 buffer
655 .produce(vec![Bytes::from("unflushed")], Bytes::new())
656 .await
657 .unwrap();
658
659 buffer.close().await.unwrap();
660
661 let entries = read_manifest_entries(&store, "test/manifest").await;
662 assert_eq!(entries.len(), 1);
663
664 let path = Path::from(entries[0].location.as_str());
665 let data = store.get(&path).await.unwrap().bytes().await.unwrap();
666 let parsed = decode_batch(data).unwrap();
667 assert_eq!(parsed, vec![Bytes::from("unflushed")]);
668 }
669
670 #[tokio::test]
671 async fn should_produce_separate_batches_per_flush() {
672 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
673 let buffer =
674 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
675 .unwrap();
676
677 buffer
678 .produce(vec![Bytes::from("batch1")], Bytes::new())
679 .await
680 .unwrap();
681 buffer.flush().await.unwrap();
682
683 buffer
684 .produce(vec![Bytes::from("batch2")], Bytes::new())
685 .await
686 .unwrap();
687 buffer.flush().await.unwrap();
688
689 let entries = read_manifest_entries(&store, "test/manifest").await;
690 assert_eq!(entries.len(), 2);
691 assert_ne!(entries[0].location, entries[1].location);
692
693 let data1 = store
694 .get(&Path::from(entries[0].location.as_str()))
695 .await
696 .unwrap()
697 .bytes()
698 .await
699 .unwrap();
700 assert_eq!(decode_batch(data1).unwrap(), vec![Bytes::from("batch1")]);
701
702 let data2 = store
703 .get(&Path::from(entries[1].location.as_str()))
704 .await
705 .unwrap()
706 .bytes()
707 .await
708 .unwrap();
709 assert_eq!(decode_batch(data2).unwrap(), vec![Bytes::from("batch2")]);
710 }
711
712 #[tokio::test]
713 async fn should_not_flush_empty_batch() {
714 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
715 let buffer =
716 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
717 .unwrap();
718
719 buffer.flush().await.unwrap();
720
721 let manifest_path = slatedb::object_store::path::Path::from("test/manifest");
722 assert!(store.get(&manifest_path).await.is_err());
723 }
724
725 #[tokio::test]
726 async fn should_preserve_all_empty_entries_batch() {
727 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
728 let buffer =
729 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
730 .unwrap();
731
732 buffer
733 .produce(vec![Bytes::new(), Bytes::new()], Bytes::from("meta"))
734 .await
735 .unwrap();
736 buffer.flush().await.unwrap();
737
738 let entries = read_manifest_entries(&store, "test/manifest").await;
739 assert_eq!(entries.len(), 1);
740 assert!(!entries[0].location.is_empty());
741 assert_eq!(entries[0].metadata.len(), 1);
742 assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
743 assert_eq!(entries[0].metadata[0].start_index, 0);
744
745 let data = store
746 .get(&Path::from(entries[0].location.clone()))
747 .await
748 .unwrap()
749 .bytes()
750 .await
751 .unwrap();
752 let records = decode_batch(data).unwrap();
753 assert_eq!(records, vec![Bytes::new(), Bytes::new()]);
754 }
755
756 #[tokio::test]
757 async fn should_preserve_empty_and_non_empty_entries_in_batch() {
758 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
759 let buffer =
760 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
761 .unwrap();
762
763 buffer
764 .produce(
765 vec![
766 Bytes::from("a"),
767 Bytes::new(),
768 Bytes::from("b"),
769 Bytes::new(),
770 ],
771 Bytes::from("meta"),
772 )
773 .await
774 .unwrap();
775 buffer.flush().await.unwrap();
776
777 let entries = read_manifest_entries(&store, "test/manifest").await;
778 assert_eq!(entries.len(), 1);
779 assert!(!entries[0].location.is_empty());
780 assert_eq!(entries[0].metadata.len(), 1);
781 assert_eq!(entries[0].metadata[0].payload, Bytes::from("meta"));
782 assert_eq!(entries[0].metadata[0].start_index, 0);
783
784 let data = store
785 .get(&Path::from(entries[0].location.clone()))
786 .await
787 .unwrap()
788 .bytes()
789 .await
790 .unwrap();
791 let records = decode_batch(data).unwrap();
792 assert_eq!(
793 records,
794 vec![
795 Bytes::from("a"),
796 Bytes::new(),
797 Bytes::from("b"),
798 Bytes::new()
799 ]
800 );
801 }
802
803 #[tokio::test]
804 async fn should_reject_empty_entries() {
805 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
806 let buffer =
807 Producer::with_object_store(test_config(), store.clone(), Arc::new(SystemClock))
808 .unwrap();
809
810 let result = buffer.produce(vec![], Bytes::new()).await;
811 assert!(matches!(result, Err(Error::InvalidInput(_))));
812
813 let result = buffer.produce(vec![], Bytes::from("meta")).await;
814 assert!(matches!(result, Err(Error::InvalidInput(_))));
815 }
816}