1use std::ops::RangeBounds;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use common::clock::{Clock, SystemClock};
15use common::coordinator::{Durability, EpochWatcher, EpochWatermarks};
16use common::storage::factory::create_storage;
17use common::{StorageRuntime, StorageSemantics};
18use tokio::sync::RwLock;
19use tokio::sync::watch;
20use tokio::task::JoinHandle;
21
22use crate::config::{CountOptions, ScanOptions, SegmentConfig};
23use crate::error::{AppendResult, Error, Result};
24use crate::listing::ListingCache;
25use crate::listing::LogKeyIterator;
26use crate::model::{AppendOutput, Record, Segment, SegmentId, Sequence};
27use crate::range::{normalize_segment_id, normalize_sequence};
28use crate::reader::{LogIterator, LogRead, LogReadView};
29use crate::segment::{LogSegment, SegmentCache};
30use crate::serde::SEQ_BLOCK_KEY;
31use crate::writer::{LogWrite, LogWriteHandle, LogWriter, LogWriterConfig, WrittenView};
32
33pub struct LogDb {
104 handle: LogWriteHandle,
105 writer_task: JoinHandle<()>,
106 storage: Arc<dyn common::Storage>,
107 clock: Arc<dyn Clock>,
108 read_view: Arc<RwLock<LogReadView>>,
109 epoch_watcher: EpochWatcher,
110 flushed_subscriber_task: JoinHandle<()>,
111}
112
113impl LogDb {
114 pub async fn open(config: crate::config::Config) -> Result<Self> {
135 LogDbBuilder::new(config).build().await
136 }
137
138 #[cfg(feature = "http-server")]
140 pub fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
141 self.storage.register_metrics(registry);
142 }
143
144 pub async fn try_append(&self, records: Vec<Record>) -> AppendResult<AppendOutput> {
177 self.append_inner(records, None).await
178 }
179
180 pub async fn append_timeout(
213 &self,
214 records: Vec<Record>,
215 timeout: Duration,
216 ) -> AppendResult<AppendOutput> {
217 self.append_inner(records, Some(timeout)).await
218 }
219
220 async fn append_inner(
222 &self,
223 records: Vec<Record>,
224 timeout: Option<Duration>,
225 ) -> AppendResult<AppendOutput> {
226 if records.is_empty() {
227 return Ok(AppendOutput { start_sequence: 0 });
228 }
229
230 let write = LogWrite {
231 records,
232 timestamp_ms: self.current_time_ms(),
233 };
234
235 let result = if let Some(t) = timeout {
236 self.handle.append_timeout(write, t).await
237 } else {
238 self.handle.try_append(write).await
239 }?;
240
241 Ok(result.expect("non-empty append must produce output"))
244 }
245
246 fn current_time_ms(&self) -> i64 {
248 self.clock
249 .now()
250 .duration_since(std::time::UNIX_EPOCH)
251 .unwrap()
252 .as_millis() as i64
253 }
254
255 #[cfg(feature = "http-server")]
264 pub(crate) async fn check_storage(&self) -> Result<()> {
265 let seq_key = Bytes::from_static(&crate::serde::SEQ_BLOCK_KEY);
268 let _ = self.storage.get(seq_key).await?;
269 Ok(())
270 }
271
272 #[cfg(test)]
278 pub(crate) async fn seal_segment(&self) -> Result<()> {
279 self.handle.force_seal(self.current_time_ms()).await?;
280 self.flush().await?;
281 Ok(())
282 }
283
284 pub async fn flush(&self) -> Result<()> {
289 self.handle.flush().await
290 }
291
292 async fn sync_to_flushed(&self) -> Result<()> {
294 let target = self.handle.flushed_epoch();
295 self.epoch_watcher
296 .clone()
297 .wait(target, Durability::Written)
298 .await
299 .map_err(|_| Error::Internal("writer shut down".into()))?;
300 Ok(())
301 }
302
303 pub async fn close(self) -> Result<()> {
309 self.flush().await?;
310 drop(self.handle);
312 let _ = self.writer_task.await;
313 self.flushed_subscriber_task.abort();
314 self.storage
315 .close()
316 .await
317 .map_err(|e| Error::Storage(e.to_string()))?;
318 Ok(())
319 }
320
321 #[cfg(test)]
323 pub(crate) async fn new(storage: Arc<dyn common::Storage>) -> Result<Self> {
324 Self::from_storage(storage, SegmentConfig::default()).await
325 }
326
327 async fn from_storage(
329 storage: Arc<dyn common::Storage>,
330 segment_config: SegmentConfig,
331 ) -> Result<Self> {
332 let clock: Arc<dyn Clock> = Arc::new(SystemClock);
333
334 let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
335 let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
336 .await
337 .map_err(|e| Error::Internal(e.to_string()))?;
338 let snapshot = storage
339 .snapshot()
340 .await
341 .map_err(|e| Error::Storage(e.to_string()))?;
342 let segment_cache = SegmentCache::open(snapshot.as_ref(), segment_config).await?;
343 let listing_cache = ListingCache::new();
344
345 let (writer, mut handle) = LogWriter::new(
346 storage.clone(),
347 sequence_allocator,
348 segment_cache.clone(),
349 listing_cache,
350 LogWriterConfig::default(),
351 )
352 .await
353 .map_err(Error::Storage)?;
354
355 let written_rx = handle.written_rx();
356 let writer_task = handle.spawn(writer);
357
358 let read_view = Arc::new(RwLock::new(LogReadView::new(
359 snapshot as Arc<dyn common::StorageRead>,
360 segment_cache,
361 )));
362
363 let (epoch_watcher, flushed_subscriber_task) =
364 spawn_flushed_subscriber(written_rx, Arc::clone(&read_view));
365
366 Ok(Self {
367 handle,
368 writer_task,
369 storage,
370 clock,
371 read_view,
372 epoch_watcher,
373 flushed_subscriber_task,
374 })
375 }
376}
377
378#[async_trait]
379impl LogRead for LogDb {
380 async fn scan_with_options(
381 &self,
382 key: Bytes,
383 seq_range: impl RangeBounds<Sequence> + Send,
384 options: ScanOptions,
385 ) -> Result<LogIterator> {
386 self.sync_to_flushed().await?;
387 let seq_range = normalize_sequence(&seq_range);
388 let view = self.read_view.read().await;
389 Ok(view.scan_with_options(key, seq_range, &options))
390 }
391
392 async fn count_with_options(
393 &self,
394 _key: Bytes,
395 _seq_range: impl RangeBounds<Sequence> + Send,
396 _options: CountOptions,
397 ) -> Result<u64> {
398 todo!()
399 }
400
401 async fn list_keys(
402 &self,
403 segment_range: impl RangeBounds<SegmentId> + Send,
404 ) -> Result<LogKeyIterator> {
405 self.sync_to_flushed().await?;
406 let segment_range = normalize_segment_id(&segment_range);
407 let view = self.read_view.read().await;
408 view.list_keys(segment_range).await
409 }
410
411 async fn list_segments(
412 &self,
413 seq_range: impl RangeBounds<Sequence> + Send,
414 ) -> Result<Vec<Segment>> {
415 self.sync_to_flushed().await?;
416 let seq_range = normalize_sequence(&seq_range);
417 let view = self.read_view.read().await;
418 Ok(view.list_segments(&seq_range))
419 }
420}
421
422pub struct LogDbBuilder {
450 config: crate::config::Config,
451 storage_runtime: StorageRuntime,
452}
453
454impl LogDbBuilder {
455 pub fn new(config: crate::config::Config) -> Self {
457 Self {
458 config,
459 storage_runtime: StorageRuntime::new(),
460 }
461 }
462
463 pub fn with_storage_runtime(mut self, runtime: StorageRuntime) -> Self {
467 self.storage_runtime = runtime;
468 self
469 }
470
471 pub async fn build(self) -> Result<LogDb> {
473 let storage = create_storage(
474 &self.config.storage,
475 self.storage_runtime,
476 StorageSemantics::new(),
477 )
478 .await
479 .map_err(|e| Error::Storage(e.to_string()))?;
480
481 LogDb::from_storage(storage, self.config.segmentation).await
482 }
483}
484
485fn spawn_flushed_subscriber(
486 mut written_rx: watch::Receiver<WrittenView>,
487 read_view: Arc<RwLock<LogReadView>>,
488) -> (EpochWatcher, JoinHandle<()>) {
489 let (watermarks, watcher) = EpochWatermarks::new();
490 let task = tokio::spawn(async move {
491 let mut last_segments: Option<Arc<[LogSegment]>> = None;
492 while written_rx.changed().await.is_ok() {
493 let view = written_rx.borrow_and_update().clone();
494
495 let mut rv = read_view.write().await;
496 rv.update_snapshot(view.snapshot as Arc<dyn common::StorageRead>);
497
498 if !last_segments
500 .as_ref()
501 .is_some_and(|s| Arc::ptr_eq(s, &view.segments))
502 {
503 rv.replace_segments(&view.segments);
504 last_segments = Some(Arc::clone(&view.segments));
505 }
506
507 watermarks.update_written(view.epoch);
508 }
509 });
510 (watcher, task)
511}
512
513#[cfg(test)]
514mod tests {
515 use common::StorageConfig;
516 use common::storage::factory::create_storage;
517
518 use super::*;
519 use crate::config::Config;
520 use crate::reader::LogDbReader;
521
522 fn test_config() -> Config {
523 Config {
524 storage: StorageConfig::InMemory,
525 ..Default::default()
526 }
527 }
528
529 #[tokio::test]
530 async fn should_open_log_with_in_memory_config() {
531 let config = test_config();
533
534 let result = LogDb::open(config).await;
536
537 assert!(result.is_ok());
539 }
540
541 #[tokio::test]
542 async fn should_append_single_record() {
543 let log = LogDb::open(test_config()).await.unwrap();
545 let records = vec![Record {
546 key: Bytes::from("orders"),
547 value: Bytes::from("order-1"),
548 }];
549
550 log.try_append(records).await.unwrap();
552
553 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
555 let entry = iter.next().await.unwrap().unwrap();
556 assert_eq!(entry.sequence, 0);
557 assert_eq!(entry.value, Bytes::from("order-1"));
558 assert!(iter.next().await.unwrap().is_none());
559 }
560
561 #[tokio::test]
562 async fn should_append_multiple_records_in_batch() {
563 let log = LogDb::open(test_config()).await.unwrap();
565 let records = vec![
566 Record {
567 key: Bytes::from("orders"),
568 value: Bytes::from("order-1"),
569 },
570 Record {
571 key: Bytes::from("orders"),
572 value: Bytes::from("order-2"),
573 },
574 Record {
575 key: Bytes::from("orders"),
576 value: Bytes::from("order-3"),
577 },
578 ];
579
580 log.try_append(records).await.unwrap();
582
583 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
585
586 let entry0 = iter.next().await.unwrap().unwrap();
587 assert_eq!(entry0.sequence, 0);
588 assert_eq!(entry0.value, Bytes::from("order-1"));
589
590 let entry1 = iter.next().await.unwrap().unwrap();
591 assert_eq!(entry1.sequence, 1);
592 assert_eq!(entry1.value, Bytes::from("order-2"));
593
594 let entry2 = iter.next().await.unwrap().unwrap();
595 assert_eq!(entry2.sequence, 2);
596 assert_eq!(entry2.value, Bytes::from("order-3"));
597
598 assert!(iter.next().await.unwrap().is_none());
599 }
600
601 #[tokio::test]
602 async fn should_append_empty_records_without_error() {
603 let log = LogDb::open(test_config()).await.unwrap();
605 let records: Vec<Record> = vec![];
606
607 let result = log.try_append(records).await;
609
610 assert!(result.is_ok());
612
613 let mut iter = log.scan(Bytes::from("any-key"), ..).await.unwrap();
615 assert!(iter.next().await.unwrap().is_none());
616 }
617
618 #[tokio::test]
619 async fn should_assign_sequential_sequences_across_appends() {
620 let log = LogDb::open(test_config()).await.unwrap();
622
623 log.try_append(vec![
625 Record {
626 key: Bytes::from("events"),
627 value: Bytes::from("event-1"),
628 },
629 Record {
630 key: Bytes::from("events"),
631 value: Bytes::from("event-2"),
632 },
633 ])
634 .await
635 .unwrap();
636
637 log.try_append(vec![Record {
639 key: Bytes::from("events"),
640 value: Bytes::from("event-3"),
641 }])
642 .await
643 .unwrap();
644
645 let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
647
648 let entry0 = iter.next().await.unwrap().unwrap();
649 assert_eq!(entry0.sequence, 0);
650
651 let entry1 = iter.next().await.unwrap().unwrap();
652 assert_eq!(entry1.sequence, 1);
653
654 let entry2 = iter.next().await.unwrap().unwrap();
655 assert_eq!(entry2.sequence, 2);
656
657 assert!(iter.next().await.unwrap().is_none());
658 }
659
660 #[tokio::test]
661 async fn should_store_records_with_correct_keys_and_values() {
662 let log = LogDb::open(test_config()).await.unwrap();
664 let records = vec![
665 Record {
666 key: Bytes::from("topic-a"),
667 value: Bytes::from("message-a"),
668 },
669 Record {
670 key: Bytes::from("topic-b"),
671 value: Bytes::from("message-b"),
672 },
673 ];
674
675 log.try_append(records).await.unwrap();
677
678 let mut iter_a = log.scan(Bytes::from("topic-a"), ..).await.unwrap();
680 let entry_a = iter_a.next().await.unwrap().unwrap();
681 assert_eq!(entry_a.key, Bytes::from("topic-a"));
682 assert_eq!(entry_a.value, Bytes::from("message-a"));
683 assert!(iter_a.next().await.unwrap().is_none());
684
685 let mut iter_b = log.scan(Bytes::from("topic-b"), ..).await.unwrap();
687 let entry_b = iter_b.next().await.unwrap().unwrap();
688 assert_eq!(entry_b.key, Bytes::from("topic-b"));
689 assert_eq!(entry_b.value, Bytes::from("message-b"));
690 assert!(iter_b.next().await.unwrap().is_none());
691 }
692
693 #[tokio::test]
694 async fn should_scan_all_entries_for_key() {
695 let log = LogDb::open(test_config()).await.unwrap();
697 log.try_append(vec![
698 Record {
699 key: Bytes::from("orders"),
700 value: Bytes::from("order-1"),
701 },
702 Record {
703 key: Bytes::from("orders"),
704 value: Bytes::from("order-2"),
705 },
706 Record {
707 key: Bytes::from("orders"),
708 value: Bytes::from("order-3"),
709 },
710 ])
711 .await
712 .unwrap();
713
714 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
716 let mut entries = vec![];
717 while let Some(entry) = iter.next().await.unwrap() {
718 entries.push(entry);
719 }
720
721 assert_eq!(entries.len(), 3);
723 assert_eq!(entries[0].sequence, 0);
724 assert_eq!(entries[0].value, Bytes::from("order-1"));
725 assert_eq!(entries[1].sequence, 1);
726 assert_eq!(entries[1].value, Bytes::from("order-2"));
727 assert_eq!(entries[2].sequence, 2);
728 assert_eq!(entries[2].value, Bytes::from("order-3"));
729 }
730
731 #[tokio::test]
732 async fn should_scan_with_sequence_range() {
733 let log = LogDb::open(test_config()).await.unwrap();
735 log.try_append(vec![
736 Record {
737 key: Bytes::from("events"),
738 value: Bytes::from("event-0"),
739 },
740 Record {
741 key: Bytes::from("events"),
742 value: Bytes::from("event-1"),
743 },
744 Record {
745 key: Bytes::from("events"),
746 value: Bytes::from("event-2"),
747 },
748 Record {
749 key: Bytes::from("events"),
750 value: Bytes::from("event-3"),
751 },
752 Record {
753 key: Bytes::from("events"),
754 value: Bytes::from("event-4"),
755 },
756 ])
757 .await
758 .unwrap();
759
760 let mut iter = log.scan(Bytes::from("events"), 1..4).await.unwrap();
762 let mut entries = vec![];
763 while let Some(entry) = iter.next().await.unwrap() {
764 entries.push(entry);
765 }
766
767 assert_eq!(entries.len(), 3);
769 assert_eq!(entries[0].sequence, 1);
770 assert_eq!(entries[1].sequence, 2);
771 assert_eq!(entries[2].sequence, 3);
772 }
773
774 #[tokio::test]
775 async fn should_scan_from_starting_sequence() {
776 let log = LogDb::open(test_config()).await.unwrap();
778 log.try_append(vec![
779 Record {
780 key: Bytes::from("logs"),
781 value: Bytes::from("log-0"),
782 },
783 Record {
784 key: Bytes::from("logs"),
785 value: Bytes::from("log-1"),
786 },
787 Record {
788 key: Bytes::from("logs"),
789 value: Bytes::from("log-2"),
790 },
791 ])
792 .await
793 .unwrap();
794
795 let mut iter = log.scan(Bytes::from("logs"), 1..).await.unwrap();
797 let mut entries = vec![];
798 while let Some(entry) = iter.next().await.unwrap() {
799 entries.push(entry);
800 }
801
802 assert_eq!(entries.len(), 2);
804 assert_eq!(entries[0].sequence, 1);
805 assert_eq!(entries[1].sequence, 2);
806 }
807
808 #[tokio::test]
809 async fn should_scan_up_to_ending_sequence() {
810 let log = LogDb::open(test_config()).await.unwrap();
812 log.try_append(vec![
813 Record {
814 key: Bytes::from("logs"),
815 value: Bytes::from("log-0"),
816 },
817 Record {
818 key: Bytes::from("logs"),
819 value: Bytes::from("log-1"),
820 },
821 Record {
822 key: Bytes::from("logs"),
823 value: Bytes::from("log-2"),
824 },
825 ])
826 .await
827 .unwrap();
828
829 let mut iter = log.scan(Bytes::from("logs"), ..2).await.unwrap();
831 let mut entries = vec![];
832 while let Some(entry) = iter.next().await.unwrap() {
833 entries.push(entry);
834 }
835
836 assert_eq!(entries.len(), 2);
838 assert_eq!(entries[0].sequence, 0);
839 assert_eq!(entries[1].sequence, 1);
840 }
841
842 #[tokio::test]
843 async fn should_scan_only_entries_for_specified_key() {
844 let log = LogDb::open(test_config()).await.unwrap();
846 log.try_append(vec![
847 Record {
848 key: Bytes::from("key-a"),
849 value: Bytes::from("value-a-0"),
850 },
851 Record {
852 key: Bytes::from("key-b"),
853 value: Bytes::from("value-b-0"),
854 },
855 Record {
856 key: Bytes::from("key-a"),
857 value: Bytes::from("value-a-1"),
858 },
859 Record {
860 key: Bytes::from("key-b"),
861 value: Bytes::from("value-b-1"),
862 },
863 ])
864 .await
865 .unwrap();
866
867 let mut iter = log.scan(Bytes::from("key-a"), ..).await.unwrap();
869 let mut entries = vec![];
870 while let Some(entry) = iter.next().await.unwrap() {
871 entries.push(entry);
872 }
873
874 assert_eq!(entries.len(), 2);
876 assert_eq!(entries[0].key, Bytes::from("key-a"));
877 assert_eq!(entries[0].value, Bytes::from("value-a-0"));
878 assert_eq!(entries[1].key, Bytes::from("key-a"));
879 assert_eq!(entries[1].value, Bytes::from("value-a-1"));
880 }
881
882 #[tokio::test]
883 async fn should_return_empty_iterator_for_unknown_key() {
884 let log = LogDb::open(test_config()).await.unwrap();
886 log.try_append(vec![Record {
887 key: Bytes::from("existing"),
888 value: Bytes::from("value"),
889 }])
890 .await
891 .unwrap();
892
893 let mut iter = log.scan(Bytes::from("unknown"), ..).await.unwrap();
895 let entry = iter.next().await.unwrap();
896
897 assert!(entry.is_none());
899 }
900
901 #[tokio::test]
902 async fn should_return_empty_iterator_for_empty_range() {
903 let log = LogDb::open(test_config()).await.unwrap();
905 log.try_append(vec![
906 Record {
907 key: Bytes::from("key"),
908 value: Bytes::from("value-0"),
909 },
910 Record {
911 key: Bytes::from("key"),
912 value: Bytes::from("value-1"),
913 },
914 ])
915 .await
916 .unwrap();
917
918 let mut iter = log.scan(Bytes::from("key"), 10..20).await.unwrap();
920 let entry = iter.next().await.unwrap();
921
922 assert!(entry.is_none());
924 }
925
926 #[tokio::test]
927 async fn should_scan_entries_via_log_reader() {
928 let storage = create_storage(
930 &StorageConfig::InMemory,
931 StorageRuntime::new(),
932 StorageSemantics::new(),
933 )
934 .await
935 .unwrap();
936 let log = LogDb::new(storage.clone()).await.unwrap();
937 log.try_append(vec![
938 Record {
939 key: Bytes::from("orders"),
940 value: Bytes::from("order-1"),
941 },
942 Record {
943 key: Bytes::from("orders"),
944 value: Bytes::from("order-2"),
945 },
946 Record {
947 key: Bytes::from("orders"),
948 value: Bytes::from("order-3"),
949 },
950 ])
951 .await
952 .unwrap();
953 log.flush().await.unwrap();
954
955 let reader = LogDbReader::new(storage).await.unwrap();
957 let mut iter = reader.scan(Bytes::from("orders"), ..).await.unwrap();
958 let mut entries = vec![];
959 while let Some(entry) = iter.next().await.unwrap() {
960 entries.push(entry);
961 }
962
963 assert_eq!(entries.len(), 3);
965 assert_eq!(entries[0].sequence, 0);
966 assert_eq!(entries[0].value, Bytes::from("order-1"));
967 assert_eq!(entries[1].sequence, 1);
968 assert_eq!(entries[1].value, Bytes::from("order-2"));
969 assert_eq!(entries[2].sequence, 2);
970 assert_eq!(entries[2].value, Bytes::from("order-3"));
971 }
972
973 #[tokio::test]
974 async fn should_scan_across_multiple_segments() {
975 let log = LogDb::open(test_config()).await.unwrap();
977
978 log.try_append(vec![
980 Record {
981 key: Bytes::from("events"),
982 value: Bytes::from("event-0"),
983 },
984 Record {
985 key: Bytes::from("events"),
986 value: Bytes::from("event-1"),
987 },
988 ])
989 .await
990 .unwrap();
991
992 log.seal_segment().await.unwrap();
994
995 log.try_append(vec![
997 Record {
998 key: Bytes::from("events"),
999 value: Bytes::from("event-2"),
1000 },
1001 Record {
1002 key: Bytes::from("events"),
1003 value: Bytes::from("event-3"),
1004 },
1005 ])
1006 .await
1007 .unwrap();
1008
1009 let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
1011 let mut entries = vec![];
1012 while let Some(entry) = iter.next().await.unwrap() {
1013 entries.push(entry);
1014 }
1015
1016 assert_eq!(entries.len(), 4);
1018 assert_eq!(entries[0].sequence, 0);
1019 assert_eq!(entries[0].value, Bytes::from("event-0"));
1020 assert_eq!(entries[1].sequence, 1);
1021 assert_eq!(entries[1].value, Bytes::from("event-1"));
1022 assert_eq!(entries[2].sequence, 2);
1023 assert_eq!(entries[2].value, Bytes::from("event-2"));
1024 assert_eq!(entries[3].sequence, 3);
1025 assert_eq!(entries[3].value, Bytes::from("event-3"));
1026 }
1027
1028 #[tokio::test]
1029 async fn should_scan_range_spanning_segments() {
1030 let log = LogDb::open(test_config()).await.unwrap();
1032
1033 log.try_append(vec![
1035 Record {
1036 key: Bytes::from("data"),
1037 value: Bytes::from("seg0-0"),
1038 },
1039 Record {
1040 key: Bytes::from("data"),
1041 value: Bytes::from("seg0-1"),
1042 },
1043 ])
1044 .await
1045 .unwrap();
1046
1047 log.seal_segment().await.unwrap();
1048
1049 log.try_append(vec![
1051 Record {
1052 key: Bytes::from("data"),
1053 value: Bytes::from("seg1-2"),
1054 },
1055 Record {
1056 key: Bytes::from("data"),
1057 value: Bytes::from("seg1-3"),
1058 },
1059 ])
1060 .await
1061 .unwrap();
1062
1063 log.seal_segment().await.unwrap();
1064
1065 log.try_append(vec![
1067 Record {
1068 key: Bytes::from("data"),
1069 value: Bytes::from("seg2-4"),
1070 },
1071 Record {
1072 key: Bytes::from("data"),
1073 value: Bytes::from("seg2-5"),
1074 },
1075 ])
1076 .await
1077 .unwrap();
1078
1079 let mut iter = log.scan(Bytes::from("data"), 1..5).await.unwrap();
1081 let mut entries = vec![];
1082 while let Some(entry) = iter.next().await.unwrap() {
1083 entries.push(entry);
1084 }
1085
1086 assert_eq!(entries.len(), 4);
1088 assert_eq!(entries[0].sequence, 1);
1089 assert_eq!(entries[1].sequence, 2);
1090 assert_eq!(entries[2].sequence, 3);
1091 assert_eq!(entries[3].sequence, 4);
1092 }
1093
1094 #[tokio::test]
1095 async fn should_scan_single_segment_in_multi_segment_log() {
1096 let log = LogDb::open(test_config()).await.unwrap();
1098
1099 log.try_append(vec![
1101 Record {
1102 key: Bytes::from("key"),
1103 value: Bytes::from("v0"),
1104 },
1105 Record {
1106 key: Bytes::from("key"),
1107 value: Bytes::from("v1"),
1108 },
1109 ])
1110 .await
1111 .unwrap();
1112
1113 log.seal_segment().await.unwrap();
1114
1115 log.try_append(vec![
1117 Record {
1118 key: Bytes::from("key"),
1119 value: Bytes::from("v2"),
1120 },
1121 Record {
1122 key: Bytes::from("key"),
1123 value: Bytes::from("v3"),
1124 },
1125 ])
1126 .await
1127 .unwrap();
1128
1129 let mut iter = log.scan(Bytes::from("key"), 2..4).await.unwrap();
1131 let mut entries = vec![];
1132 while let Some(entry) = iter.next().await.unwrap() {
1133 entries.push(entry);
1134 }
1135
1136 assert_eq!(entries.len(), 2);
1138 assert_eq!(entries[0].sequence, 2);
1139 assert_eq!(entries[1].sequence, 3);
1140 }
1141
1142 #[tokio::test]
1143 async fn should_list_keys_returns_iterator() {
1144 let log = LogDb::open(test_config()).await.unwrap();
1146 log.try_append(vec![
1147 Record {
1148 key: Bytes::from("key-a"),
1149 value: Bytes::from("value-a"),
1150 },
1151 Record {
1152 key: Bytes::from("key-b"),
1153 value: Bytes::from("value-b"),
1154 },
1155 ])
1156 .await
1157 .unwrap();
1158
1159 let _iter = log.list_keys(..).await.unwrap();
1161
1162 }
1164
1165 #[tokio::test]
1166 async fn should_list_keys_via_log_reader() {
1167 let storage = create_storage(
1169 &StorageConfig::InMemory,
1170 StorageRuntime::new(),
1171 StorageSemantics::new(),
1172 )
1173 .await
1174 .unwrap();
1175 let log = LogDb::new(storage.clone()).await.unwrap();
1176 log.try_append(vec![
1177 Record {
1178 key: Bytes::from("key-a"),
1179 value: Bytes::from("value-a"),
1180 },
1181 Record {
1182 key: Bytes::from("key-b"),
1183 value: Bytes::from("value-b"),
1184 },
1185 ])
1186 .await
1187 .unwrap();
1188 log.flush().await.unwrap();
1189
1190 let reader = LogDbReader::new(storage).await.unwrap();
1192 let _iter = reader.list_keys(..).await.unwrap();
1193
1194 }
1196
1197 #[tokio::test]
1198 async fn should_list_keys_in_single_segment() {
1199 let log = LogDb::open(test_config()).await.unwrap();
1201 log.try_append(vec![
1202 Record {
1203 key: Bytes::from("key-a"),
1204 value: Bytes::from("value-a"),
1205 },
1206 Record {
1207 key: Bytes::from("key-b"),
1208 value: Bytes::from("value-b"),
1209 },
1210 Record {
1211 key: Bytes::from("key-c"),
1212 value: Bytes::from("value-c"),
1213 },
1214 ])
1215 .await
1216 .unwrap();
1217
1218 let mut iter = log.list_keys(..).await.unwrap();
1220 let mut keys = vec![];
1221 while let Some(key) = iter.next().await.unwrap() {
1222 keys.push(key.key);
1223 }
1224
1225 assert_eq!(keys.len(), 3);
1227 assert_eq!(keys[0], Bytes::from("key-a"));
1228 assert_eq!(keys[1], Bytes::from("key-b"));
1229 assert_eq!(keys[2], Bytes::from("key-c"));
1230 }
1231
1232 #[tokio::test]
1233 async fn should_list_keys_across_segments_after_roll() {
1234 let log = LogDb::open(test_config()).await.unwrap();
1236
1237 log.try_append(vec![
1239 Record {
1240 key: Bytes::from("key-a"),
1241 value: Bytes::from("value-a-0"),
1242 },
1243 Record {
1244 key: Bytes::from("key-b"),
1245 value: Bytes::from("value-b-0"),
1246 },
1247 ])
1248 .await
1249 .unwrap();
1250
1251 log.seal_segment().await.unwrap();
1253
1254 log.try_append(vec![
1256 Record {
1257 key: Bytes::from("key-c"),
1258 value: Bytes::from("value-c-1"),
1259 },
1260 Record {
1261 key: Bytes::from("key-d"),
1262 value: Bytes::from("value-d-1"),
1263 },
1264 ])
1265 .await
1266 .unwrap();
1267
1268 let mut iter = log.list_keys(..).await.unwrap();
1270 let mut keys = vec![];
1271 while let Some(key) = iter.next().await.unwrap() {
1272 keys.push(key.key);
1273 }
1274
1275 assert_eq!(keys.len(), 4);
1277 assert_eq!(keys[0], Bytes::from("key-a"));
1278 assert_eq!(keys[1], Bytes::from("key-b"));
1279 assert_eq!(keys[2], Bytes::from("key-c"));
1280 assert_eq!(keys[3], Bytes::from("key-d"));
1281 }
1282
1283 #[tokio::test]
1284 async fn should_deduplicate_keys_across_segments() {
1285 let log = LogDb::open(test_config()).await.unwrap();
1287
1288 log.try_append(vec![Record {
1290 key: Bytes::from("shared-key"),
1291 value: Bytes::from("value-0"),
1292 }])
1293 .await
1294 .unwrap();
1295
1296 log.seal_segment().await.unwrap();
1298
1299 log.try_append(vec![Record {
1301 key: Bytes::from("shared-key"),
1302 value: Bytes::from("value-1"),
1303 }])
1304 .await
1305 .unwrap();
1306
1307 log.seal_segment().await.unwrap();
1309
1310 log.try_append(vec![Record {
1312 key: Bytes::from("shared-key"),
1313 value: Bytes::from("value-2"),
1314 }])
1315 .await
1316 .unwrap();
1317
1318 let mut iter = log.list_keys(..).await.unwrap();
1320 let mut keys = vec![];
1321 while let Some(key) = iter.next().await.unwrap() {
1322 keys.push(key.key);
1323 }
1324
1325 assert_eq!(keys.len(), 1);
1327 assert_eq!(keys[0], Bytes::from("shared-key"));
1328 }
1329
1330 #[tokio::test]
1331 async fn should_list_keys_in_lexicographic_order() {
1332 let log = LogDb::open(test_config()).await.unwrap();
1334 log.try_append(vec![
1335 Record {
1336 key: Bytes::from("zebra"),
1337 value: Bytes::from("value"),
1338 },
1339 Record {
1340 key: Bytes::from("apple"),
1341 value: Bytes::from("value"),
1342 },
1343 Record {
1344 key: Bytes::from("mango"),
1345 value: Bytes::from("value"),
1346 },
1347 ])
1348 .await
1349 .unwrap();
1350
1351 let mut iter = log.list_keys(..).await.unwrap();
1353 let mut keys = vec![];
1354 while let Some(key) = iter.next().await.unwrap() {
1355 keys.push(key.key);
1356 }
1357
1358 assert_eq!(keys[0], Bytes::from("apple"));
1360 assert_eq!(keys[1], Bytes::from("mango"));
1361 assert_eq!(keys[2], Bytes::from("zebra"));
1362 }
1363
1364 #[tokio::test]
1365 async fn should_list_empty_when_no_entries() {
1366 let log = LogDb::open(test_config()).await.unwrap();
1368
1369 let mut iter = log.list_keys(..).await.unwrap();
1371
1372 assert!(iter.next().await.unwrap().is_none());
1374 }
1375
1376 #[tokio::test]
1377 async fn should_list_keys_respects_segment_range() {
1378 let log = LogDb::open(test_config()).await.unwrap();
1380
1381 log.try_append(vec![
1383 Record {
1384 key: Bytes::from("key-seg0"),
1385 value: Bytes::from("value"),
1386 },
1387 Record {
1388 key: Bytes::from("key-seg0-b"),
1389 value: Bytes::from("value"),
1390 },
1391 ])
1392 .await
1393 .unwrap();
1394
1395 log.seal_segment().await.unwrap();
1396
1397 log.try_append(vec![
1399 Record {
1400 key: Bytes::from("key-seg1"),
1401 value: Bytes::from("value"),
1402 },
1403 Record {
1404 key: Bytes::from("key-seg1-b"),
1405 value: Bytes::from("value"),
1406 },
1407 ])
1408 .await
1409 .unwrap();
1410
1411 log.seal_segment().await.unwrap();
1412
1413 log.try_append(vec![
1415 Record {
1416 key: Bytes::from("key-seg2"),
1417 value: Bytes::from("value"),
1418 },
1419 Record {
1420 key: Bytes::from("key-seg2-b"),
1421 value: Bytes::from("value"),
1422 },
1423 ])
1424 .await
1425 .unwrap();
1426
1427 let mut iter = log.list_keys(1..2).await.unwrap();
1429 let mut keys = vec![];
1430 while let Some(key) = iter.next().await.unwrap() {
1431 keys.push(key.key);
1432 }
1433
1434 assert_eq!(keys.len(), 2);
1436 assert_eq!(keys[0], Bytes::from("key-seg1"));
1437 assert_eq!(keys[1], Bytes::from("key-seg1-b"));
1438 }
1439
1440 #[tokio::test]
1441 async fn should_list_segments_returns_empty_when_no_segments() {
1442 let log = LogDb::open(test_config()).await.unwrap();
1444
1445 let segments = log.list_segments(..).await.unwrap();
1447
1448 assert!(segments.is_empty());
1450 }
1451
1452 #[tokio::test]
1453 async fn should_list_segments_returns_single_segment() {
1454 let log = LogDb::open(test_config()).await.unwrap();
1456 log.try_append(vec![Record {
1457 key: Bytes::from("key"),
1458 value: Bytes::from("value"),
1459 }])
1460 .await
1461 .unwrap();
1462
1463 let segments = log.list_segments(..).await.unwrap();
1465
1466 assert_eq!(segments.len(), 1);
1468 assert_eq!(segments[0].id, 0);
1469 assert_eq!(segments[0].start_seq, 0);
1470 }
1471
1472 #[tokio::test]
1473 async fn should_list_segments_returns_multiple_segments() {
1474 let log = LogDb::open(test_config()).await.unwrap();
1476
1477 log.try_append(vec![Record {
1479 key: Bytes::from("key"),
1480 value: Bytes::from("value-0"),
1481 }])
1482 .await
1483 .unwrap();
1484
1485 log.seal_segment().await.unwrap();
1486
1487 log.try_append(vec![Record {
1489 key: Bytes::from("key"),
1490 value: Bytes::from("value-1"),
1491 }])
1492 .await
1493 .unwrap();
1494
1495 log.seal_segment().await.unwrap();
1496
1497 log.try_append(vec![Record {
1499 key: Bytes::from("key"),
1500 value: Bytes::from("value-2"),
1501 }])
1502 .await
1503 .unwrap();
1504
1505 let segments = log.list_segments(..).await.unwrap();
1507
1508 assert_eq!(segments.len(), 3);
1510 assert_eq!(segments[0].id, 0);
1511 assert_eq!(segments[0].start_seq, 0);
1512 assert_eq!(segments[1].id, 1);
1513 assert_eq!(segments[1].start_seq, 1);
1514 assert_eq!(segments[2].id, 2);
1515 assert_eq!(segments[2].start_seq, 2);
1516 }
1517
1518 #[tokio::test]
1519 async fn should_list_segments_filters_by_sequence_range() {
1520 let log = LogDb::open(test_config()).await.unwrap();
1522
1523 log.try_append(vec![
1525 Record {
1526 key: Bytes::from("key"),
1527 value: Bytes::from("v0"),
1528 },
1529 Record {
1530 key: Bytes::from("key"),
1531 value: Bytes::from("v1"),
1532 },
1533 ])
1534 .await
1535 .unwrap();
1536
1537 log.seal_segment().await.unwrap();
1538
1539 log.try_append(vec![
1541 Record {
1542 key: Bytes::from("key"),
1543 value: Bytes::from("v2"),
1544 },
1545 Record {
1546 key: Bytes::from("key"),
1547 value: Bytes::from("v3"),
1548 },
1549 ])
1550 .await
1551 .unwrap();
1552
1553 log.seal_segment().await.unwrap();
1554
1555 log.try_append(vec![
1557 Record {
1558 key: Bytes::from("key"),
1559 value: Bytes::from("v4"),
1560 },
1561 Record {
1562 key: Bytes::from("key"),
1563 value: Bytes::from("v5"),
1564 },
1565 ])
1566 .await
1567 .unwrap();
1568
1569 let segments = log.list_segments(2..4).await.unwrap();
1571
1572 assert_eq!(segments.len(), 1);
1574 assert_eq!(segments[0].id, 1);
1575 assert_eq!(segments[0].start_seq, 2);
1576 }
1577
1578 #[tokio::test]
1579 async fn should_list_segments_via_log_reader() {
1580 let storage = create_storage(
1582 &StorageConfig::InMemory,
1583 StorageRuntime::new(),
1584 StorageSemantics::new(),
1585 )
1586 .await
1587 .unwrap();
1588 let log = LogDb::new(storage.clone()).await.unwrap();
1589
1590 log.try_append(vec![Record {
1591 key: Bytes::from("key"),
1592 value: Bytes::from("value-0"),
1593 }])
1594 .await
1595 .unwrap();
1596
1597 log.seal_segment().await.unwrap();
1598
1599 log.try_append(vec![Record {
1600 key: Bytes::from("key"),
1601 value: Bytes::from("value-1"),
1602 }])
1603 .await
1604 .unwrap();
1605 log.flush().await.unwrap();
1606
1607 let reader = LogDbReader::new(storage).await.unwrap();
1609 let segments = reader.list_segments(..).await.unwrap();
1610
1611 assert_eq!(segments.len(), 2);
1613 assert_eq!(segments[0].id, 0);
1614 assert_eq!(segments[1].id, 1);
1615 }
1616
1617 #[tokio::test]
1618 async fn should_list_segments_includes_start_time() {
1619 let log = LogDb::open(test_config()).await.unwrap();
1621 log.try_append(vec![Record {
1622 key: Bytes::from("key"),
1623 value: Bytes::from("value"),
1624 }])
1625 .await
1626 .unwrap();
1627
1628 let segments = log.list_segments(..).await.unwrap();
1630
1631 assert_eq!(segments.len(), 1);
1633 assert!(segments[0].start_time_ms > 1577836800000); }
1635
1636 #[tokio::test]
1637 async fn should_try_append_single_record() {
1638 let log = LogDb::open(test_config()).await.unwrap();
1640 let records = vec![Record {
1641 key: Bytes::from("orders"),
1642 value: Bytes::from("order-1"),
1643 }];
1644
1645 let result = log.try_append(records).await.unwrap();
1647
1648 assert_eq!(result.start_sequence, 0);
1650 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
1651 let entry = iter.next().await.unwrap().unwrap();
1652 assert_eq!(entry.value, Bytes::from("order-1"));
1653 }
1654
1655 #[tokio::test]
1656 async fn should_append_timeout_single_record() {
1657 let log = LogDb::open(test_config()).await.unwrap();
1659 let records = vec![Record {
1660 key: Bytes::from("orders"),
1661 value: Bytes::from("order-1"),
1662 }];
1663
1664 let result = log
1666 .append_timeout(records, Duration::from_secs(5))
1667 .await
1668 .unwrap();
1669
1670 assert_eq!(result.start_sequence, 0);
1672 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
1673 let entry = iter.next().await.unwrap().unwrap();
1674 assert_eq!(entry.value, Bytes::from("order-1"));
1675 }
1676
1677 #[tokio::test]
1678 async fn should_return_empty_records_on_try_append_empty() {
1679 let log = LogDb::open(test_config()).await.unwrap();
1681
1682 let result = log.try_append(vec![]).await.unwrap();
1684
1685 assert_eq!(result.start_sequence, 0);
1687 }
1688
1689 #[tokio::test]
1690 async fn should_return_empty_records_on_append_timeout_empty() {
1691 let log = LogDb::open(test_config()).await.unwrap();
1693
1694 let result = log
1696 .append_timeout(vec![], Duration::from_secs(1))
1697 .await
1698 .unwrap();
1699
1700 assert_eq!(result.start_sequence, 0);
1702 }
1703
1704 #[tokio::test]
1705 async fn should_scan_without_flush() {
1706 let log = LogDb::open(test_config()).await.unwrap();
1708 log.try_append(vec![
1709 Record {
1710 key: Bytes::from("key"),
1711 value: Bytes::from("v0"),
1712 },
1713 Record {
1714 key: Bytes::from("key"),
1715 value: Bytes::from("v1"),
1716 },
1717 ])
1718 .await
1719 .unwrap();
1720
1721 let mut iter = log.scan(Bytes::from("key"), ..).await.unwrap();
1723 let e0 = iter.next().await.unwrap().unwrap();
1724 assert_eq!(e0.sequence, 0);
1725 assert_eq!(e0.value, Bytes::from("v0"));
1726 let e1 = iter.next().await.unwrap().unwrap();
1727 assert_eq!(e1.sequence, 1);
1728 assert_eq!(e1.value, Bytes::from("v1"));
1729 assert!(iter.next().await.unwrap().is_none());
1730 }
1731
1732 #[tokio::test]
1733 async fn should_list_keys_without_flush() {
1734 let log = LogDb::open(test_config()).await.unwrap();
1736 log.try_append(vec![
1737 Record {
1738 key: Bytes::from("alpha"),
1739 value: Bytes::from("v"),
1740 },
1741 Record {
1742 key: Bytes::from("beta"),
1743 value: Bytes::from("v"),
1744 },
1745 ])
1746 .await
1747 .unwrap();
1748
1749 let mut iter = log.list_keys(..).await.unwrap();
1751 let mut keys = vec![];
1752 while let Some(key) = iter.next().await.unwrap() {
1753 keys.push(key.key);
1754 }
1755 assert_eq!(keys, vec![Bytes::from("alpha"), Bytes::from("beta")]);
1756 }
1757
1758 #[tokio::test]
1759 async fn should_list_segments_without_flush() {
1760 let log = LogDb::open(test_config()).await.unwrap();
1762 log.try_append(vec![Record {
1763 key: Bytes::from("key"),
1764 value: Bytes::from("value"),
1765 }])
1766 .await
1767 .unwrap();
1768
1769 let segments = log.list_segments(..).await.unwrap();
1771 assert_eq!(segments.len(), 1);
1772 assert_eq!(segments[0].id, 0);
1773 assert_eq!(segments[0].start_seq, 0);
1774 }
1775}