1use std::ops::RangeBounds;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12use common::clock::{Clock, SystemClock};
13use common::coordinator::{
14 Durability, WriteCoordinator, WriteCoordinatorConfig, WriteCoordinatorHandle,
15};
16use common::storage::factory::create_storage;
17use common::{StorageRuntime, StorageSemantics};
18use tokio::sync::RwLock;
19use tokio::task::JoinHandle;
20
21use crate::config::{CountOptions, ScanOptions, WriteOptions};
22use crate::delta::{LogContext, LogDelta, LogFlusher, LogWrite};
23use crate::error::{Error, Result};
24use crate::listing::ListingCache;
25use crate::listing::LogKeyIterator;
26use crate::model::{AppendResult, Record, Segment, SegmentId, Sequence};
27use crate::range::{normalize_segment_id, normalize_sequence};
28use crate::reader::{LogIterator, LogRead};
29use crate::segment::SegmentCache;
30use crate::storage::LogStorage;
31
32const WRITE_CHANNEL: &str = "write";
33
34pub struct LogDb {
80 handle: WriteCoordinatorHandle<LogDelta>,
81 coordinator: WriteCoordinator<LogDelta, LogFlusher>,
82 storage: LogStorage,
83 clock: Arc<dyn Clock>,
84 read_inner: Arc<RwLock<crate::reader::LogReadInner>>,
85 flush_subscriber_task: JoinHandle<()>,
86}
87
88impl LogDb {
89 pub async fn open(config: crate::config::Config) -> Result<Self> {
110 LogDbBuilder::new(config).build().await
111 }
112
113 pub async fn append(&self, records: Vec<Record>) -> Result<AppendResult> {
148 self.append_with_options(records, WriteOptions::default())
149 .await
150 }
151
152 pub async fn append_with_options(
183 &self,
184 records: Vec<Record>,
185 options: WriteOptions,
186 ) -> Result<AppendResult> {
187 if records.is_empty() {
188 return Ok(AppendResult { start_sequence: 0 });
189 }
190
191 let write = LogWrite {
192 records,
193 timestamp_ms: self.current_time_ms(),
194 force_seal: false,
195 };
196 let mut write_handle = self.handle.write(write).await?;
197 let result = write_handle.wait(Durability::Applied).await?;
198
199 if options.await_durable {
200 self.flush().await?;
201 }
202
203 Ok(result)
204 }
205
206 fn current_time_ms(&self) -> i64 {
208 self.clock
209 .now()
210 .duration_since(std::time::UNIX_EPOCH)
211 .unwrap()
212 .as_millis() as i64
213 }
214
215 pub async fn check_storage(&self) -> Result<()> {
224 let seq_key = Bytes::from_static(&crate::serde::SEQ_BLOCK_KEY);
227 let _ = self.storage.as_read().get(seq_key).await?;
228 Ok(())
229 }
230
231 #[cfg(test)]
237 pub(crate) async fn seal_segment(&self) -> Result<()> {
238 let write = LogWrite {
239 records: vec![],
240 timestamp_ms: self.current_time_ms(),
241 force_seal: true,
242 };
243 self.handle.write(write).await?;
244 self.flush().await?;
245 Ok(())
246 }
247
248 pub async fn flush(&self) -> Result<()> {
254 let mut flush_handle = self.handle.flush(true).await?;
255 flush_handle.wait(Durability::Durable).await?;
256 let mut inner = self.read_inner.write().await;
258 let after = inner.segments.latest().map(|s| s.id());
259 let storage = inner.storage.clone();
260 inner.segments.refresh(&storage, after).await?;
261 Ok(())
262 }
263
264 pub async fn close(self) -> Result<()> {
269 self.coordinator.stop().await.map_err(Error::Internal)?;
270 self.flush_subscriber_task.abort();
271 self.storage.close().await?;
272 Ok(())
273 }
274
275 #[cfg(test)]
277 pub(crate) async fn new(storage: Arc<dyn common::Storage>) -> Result<Self> {
278 use crate::config::SegmentConfig;
279 use crate::reader::LogReadInner;
280 use crate::serde::SEQ_BLOCK_KEY;
281
282 let log_storage = LogStorage::new(storage.clone());
283 let clock: Arc<dyn Clock> = Arc::new(SystemClock);
284
285 let log_storage_read = log_storage.as_read();
286 let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
287 let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
288 .await
289 .map_err(|e| Error::Internal(e.to_string()))?;
290 let segment_cache = SegmentCache::open(&log_storage_read, SegmentConfig::default()).await?;
291 let listing_cache = ListingCache::new();
292
293 let context = LogContext {
294 sequence_allocator,
295 segment_cache: segment_cache.clone(),
296 listing_cache,
297 };
298
299 let flusher = LogFlusher::new(log_storage.clone());
300 let mut coordinator = WriteCoordinator::new(
301 WriteCoordinatorConfig::default(),
302 vec![WRITE_CHANNEL.to_string()],
303 context,
304 log_storage.snapshot().await?,
305 flusher,
306 );
307 let handle = coordinator.handle(WRITE_CHANNEL);
308
309 let read_inner = Arc::new(RwLock::new(LogReadInner::new(
310 log_storage_read,
311 segment_cache,
312 )));
313
314 let flush_subscriber_task = spawn_flush_subscriber(&coordinator, Arc::clone(&read_inner));
315 coordinator.start();
316
317 Ok(Self {
318 handle,
319 coordinator,
320 storage: log_storage,
321 clock,
322 read_inner,
323 flush_subscriber_task,
324 })
325 }
326}
327
328#[async_trait]
329impl LogRead for LogDb {
330 async fn scan_with_options(
331 &self,
332 key: Bytes,
333 seq_range: impl RangeBounds<Sequence> + Send,
334 options: ScanOptions,
335 ) -> Result<LogIterator> {
336 let seq_range = normalize_sequence(&seq_range);
337 let inner = self.read_inner.read().await;
338 Ok(inner.scan_with_options(key, seq_range, &options))
339 }
340
341 async fn count_with_options(
342 &self,
343 _key: Bytes,
344 _seq_range: impl RangeBounds<Sequence> + Send,
345 _options: CountOptions,
346 ) -> Result<u64> {
347 todo!()
348 }
349
350 async fn list_keys(
351 &self,
352 segment_range: impl RangeBounds<SegmentId> + Send,
353 ) -> Result<LogKeyIterator> {
354 let segment_range = normalize_segment_id(&segment_range);
355 let inner = self.read_inner.read().await;
356 inner.list_keys(segment_range).await
357 }
358
359 async fn list_segments(
360 &self,
361 seq_range: impl RangeBounds<Sequence> + Send,
362 ) -> Result<Vec<Segment>> {
363 let seq_range = normalize_sequence(&seq_range);
364 let inner = self.read_inner.read().await;
365 Ok(inner.list_segments(&seq_range))
366 }
367}
368
369pub struct LogDbBuilder {
397 config: crate::config::Config,
398 storage_runtime: StorageRuntime,
399}
400
401impl LogDbBuilder {
402 pub fn new(config: crate::config::Config) -> Self {
404 Self {
405 config,
406 storage_runtime: StorageRuntime::new(),
407 }
408 }
409
410 pub fn with_storage_runtime(mut self, runtime: StorageRuntime) -> Self {
414 self.storage_runtime = runtime;
415 self
416 }
417
418 pub async fn build(self) -> Result<LogDb> {
420 use crate::reader::LogReadInner;
421 use crate::serde::SEQ_BLOCK_KEY;
422
423 let storage = create_storage(
424 &self.config.storage,
425 self.storage_runtime,
426 StorageSemantics::new(),
427 )
428 .await
429 .map_err(|e| Error::Storage(e.to_string()))?;
430
431 let log_storage = LogStorage::new(storage.clone());
432 let clock: Arc<dyn Clock> = Arc::new(SystemClock);
433
434 let log_storage_read = log_storage.as_read();
435 let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
436 let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
437 .await
438 .map_err(|e| Error::Internal(e.to_string()))?;
439 let segment_cache = SegmentCache::open(&log_storage_read, self.config.segmentation).await?;
440 let listing_cache = ListingCache::new();
441
442 let context = LogContext {
443 sequence_allocator,
444 segment_cache: segment_cache.clone(),
445 listing_cache,
446 };
447
448 let flusher = LogFlusher::new(log_storage.clone());
449 let snapshot = log_storage.snapshot().await?;
450 let mut coordinator = WriteCoordinator::new(
451 WriteCoordinatorConfig::default(),
452 vec![WRITE_CHANNEL.to_string()],
453 context,
454 snapshot,
455 flusher,
456 );
457 let handle = coordinator.handle(WRITE_CHANNEL);
458
459 let read_inner = Arc::new(RwLock::new(LogReadInner::new(
460 log_storage_read,
461 segment_cache,
462 )));
463
464 let flush_subscriber_task = spawn_flush_subscriber(&coordinator, Arc::clone(&read_inner));
465 coordinator.start();
466
467 Ok(LogDb {
468 handle,
469 coordinator,
470 storage: log_storage,
471 clock,
472 read_inner,
473 flush_subscriber_task,
474 })
475 }
476}
477
478fn spawn_flush_subscriber(
479 handle: &WriteCoordinator<LogDelta, LogFlusher>,
480 read_inner: Arc<RwLock<crate::reader::LogReadInner>>,
481) -> JoinHandle<()> {
482 let (mut subscriber, _) = handle.subscribe();
483 tokio::spawn(async move {
484 while let Ok(result) = subscriber.recv().await {
485 let Some(broadcast) = &result.last_flushed_delta else {
486 continue;
487 };
488 if !broadcast.val.new_segments.is_empty() {
489 let mut inner = read_inner.write().await;
490 for segment in &broadcast.val.new_segments {
491 inner.segments.insert(segment.clone());
492 }
493 }
494 }
495 })
496}
497
498#[cfg(test)]
499mod tests {
500 use common::StorageConfig;
501 use common::storage::factory::create_storage;
502
503 use super::*;
504 use crate::config::Config;
505 use crate::reader::LogDbReader;
506
507 fn test_config() -> Config {
508 Config {
509 storage: StorageConfig::InMemory,
510 ..Default::default()
511 }
512 }
513
514 #[tokio::test]
515 async fn should_open_log_with_in_memory_config() {
516 let config = test_config();
518
519 let result = LogDb::open(config).await;
521
522 assert!(result.is_ok());
524 }
525
526 #[tokio::test]
527 async fn should_append_single_record() {
528 let log = LogDb::open(test_config()).await.unwrap();
530 let records = vec![Record {
531 key: Bytes::from("orders"),
532 value: Bytes::from("order-1"),
533 }];
534
535 log.append(records).await.unwrap();
537 log.flush().await.unwrap();
538
539 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
541 let entry = iter.next().await.unwrap().unwrap();
542 assert_eq!(entry.sequence, 0);
543 assert_eq!(entry.value, Bytes::from("order-1"));
544 assert!(iter.next().await.unwrap().is_none());
545 }
546
547 #[tokio::test]
548 async fn should_append_multiple_records_in_batch() {
549 let log = LogDb::open(test_config()).await.unwrap();
551 let records = vec![
552 Record {
553 key: Bytes::from("orders"),
554 value: Bytes::from("order-1"),
555 },
556 Record {
557 key: Bytes::from("orders"),
558 value: Bytes::from("order-2"),
559 },
560 Record {
561 key: Bytes::from("orders"),
562 value: Bytes::from("order-3"),
563 },
564 ];
565
566 log.append(records).await.unwrap();
568 log.flush().await.unwrap();
569
570 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
572
573 let entry0 = iter.next().await.unwrap().unwrap();
574 assert_eq!(entry0.sequence, 0);
575 assert_eq!(entry0.value, Bytes::from("order-1"));
576
577 let entry1 = iter.next().await.unwrap().unwrap();
578 assert_eq!(entry1.sequence, 1);
579 assert_eq!(entry1.value, Bytes::from("order-2"));
580
581 let entry2 = iter.next().await.unwrap().unwrap();
582 assert_eq!(entry2.sequence, 2);
583 assert_eq!(entry2.value, Bytes::from("order-3"));
584
585 assert!(iter.next().await.unwrap().is_none());
586 }
587
588 #[tokio::test]
589 async fn should_append_empty_records_without_error() {
590 let log = LogDb::open(test_config()).await.unwrap();
592 let records: Vec<Record> = vec![];
593
594 let result = log.append(records).await;
596
597 assert!(result.is_ok());
599
600 let mut iter = log.scan(Bytes::from("any-key"), ..).await.unwrap();
602 assert!(iter.next().await.unwrap().is_none());
603 }
604
605 #[tokio::test]
606 async fn should_assign_sequential_sequences_across_appends() {
607 let log = LogDb::open(test_config()).await.unwrap();
609
610 log.append(vec![
612 Record {
613 key: Bytes::from("events"),
614 value: Bytes::from("event-1"),
615 },
616 Record {
617 key: Bytes::from("events"),
618 value: Bytes::from("event-2"),
619 },
620 ])
621 .await
622 .unwrap();
623
624 log.append(vec![Record {
626 key: Bytes::from("events"),
627 value: Bytes::from("event-3"),
628 }])
629 .await
630 .unwrap();
631 log.flush().await.unwrap();
632
633 let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
635
636 let entry0 = iter.next().await.unwrap().unwrap();
637 assert_eq!(entry0.sequence, 0);
638
639 let entry1 = iter.next().await.unwrap().unwrap();
640 assert_eq!(entry1.sequence, 1);
641
642 let entry2 = iter.next().await.unwrap().unwrap();
643 assert_eq!(entry2.sequence, 2);
644
645 assert!(iter.next().await.unwrap().is_none());
646 }
647
648 #[tokio::test]
649 async fn should_store_records_with_correct_keys_and_values() {
650 let log = LogDb::open(test_config()).await.unwrap();
652 let records = vec![
653 Record {
654 key: Bytes::from("topic-a"),
655 value: Bytes::from("message-a"),
656 },
657 Record {
658 key: Bytes::from("topic-b"),
659 value: Bytes::from("message-b"),
660 },
661 ];
662
663 log.append(records).await.unwrap();
665 log.flush().await.unwrap();
666
667 let mut iter_a = log.scan(Bytes::from("topic-a"), ..).await.unwrap();
669 let entry_a = iter_a.next().await.unwrap().unwrap();
670 assert_eq!(entry_a.key, Bytes::from("topic-a"));
671 assert_eq!(entry_a.value, Bytes::from("message-a"));
672 assert!(iter_a.next().await.unwrap().is_none());
673
674 let mut iter_b = log.scan(Bytes::from("topic-b"), ..).await.unwrap();
676 let entry_b = iter_b.next().await.unwrap().unwrap();
677 assert_eq!(entry_b.key, Bytes::from("topic-b"));
678 assert_eq!(entry_b.value, Bytes::from("message-b"));
679 assert!(iter_b.next().await.unwrap().is_none());
680 }
681
682 #[tokio::test]
683 async fn should_scan_all_entries_for_key() {
684 let log = LogDb::open(test_config()).await.unwrap();
686 log.append(vec![
687 Record {
688 key: Bytes::from("orders"),
689 value: Bytes::from("order-1"),
690 },
691 Record {
692 key: Bytes::from("orders"),
693 value: Bytes::from("order-2"),
694 },
695 Record {
696 key: Bytes::from("orders"),
697 value: Bytes::from("order-3"),
698 },
699 ])
700 .await
701 .unwrap();
702 log.flush().await.unwrap();
703
704 let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
706 let mut entries = vec![];
707 while let Some(entry) = iter.next().await.unwrap() {
708 entries.push(entry);
709 }
710
711 assert_eq!(entries.len(), 3);
713 assert_eq!(entries[0].sequence, 0);
714 assert_eq!(entries[0].value, Bytes::from("order-1"));
715 assert_eq!(entries[1].sequence, 1);
716 assert_eq!(entries[1].value, Bytes::from("order-2"));
717 assert_eq!(entries[2].sequence, 2);
718 assert_eq!(entries[2].value, Bytes::from("order-3"));
719 }
720
721 #[tokio::test]
722 async fn should_scan_with_sequence_range() {
723 let log = LogDb::open(test_config()).await.unwrap();
725 log.append(vec![
726 Record {
727 key: Bytes::from("events"),
728 value: Bytes::from("event-0"),
729 },
730 Record {
731 key: Bytes::from("events"),
732 value: Bytes::from("event-1"),
733 },
734 Record {
735 key: Bytes::from("events"),
736 value: Bytes::from("event-2"),
737 },
738 Record {
739 key: Bytes::from("events"),
740 value: Bytes::from("event-3"),
741 },
742 Record {
743 key: Bytes::from("events"),
744 value: Bytes::from("event-4"),
745 },
746 ])
747 .await
748 .unwrap();
749 log.flush().await.unwrap();
750
751 let mut iter = log.scan(Bytes::from("events"), 1..4).await.unwrap();
753 let mut entries = vec![];
754 while let Some(entry) = iter.next().await.unwrap() {
755 entries.push(entry);
756 }
757
758 assert_eq!(entries.len(), 3);
760 assert_eq!(entries[0].sequence, 1);
761 assert_eq!(entries[1].sequence, 2);
762 assert_eq!(entries[2].sequence, 3);
763 }
764
765 #[tokio::test]
766 async fn should_scan_from_starting_sequence() {
767 let log = LogDb::open(test_config()).await.unwrap();
769 log.append(vec![
770 Record {
771 key: Bytes::from("logs"),
772 value: Bytes::from("log-0"),
773 },
774 Record {
775 key: Bytes::from("logs"),
776 value: Bytes::from("log-1"),
777 },
778 Record {
779 key: Bytes::from("logs"),
780 value: Bytes::from("log-2"),
781 },
782 ])
783 .await
784 .unwrap();
785 log.flush().await.unwrap();
786
787 let mut iter = log.scan(Bytes::from("logs"), 1..).await.unwrap();
789 let mut entries = vec![];
790 while let Some(entry) = iter.next().await.unwrap() {
791 entries.push(entry);
792 }
793
794 assert_eq!(entries.len(), 2);
796 assert_eq!(entries[0].sequence, 1);
797 assert_eq!(entries[1].sequence, 2);
798 }
799
800 #[tokio::test]
801 async fn should_scan_up_to_ending_sequence() {
802 let log = LogDb::open(test_config()).await.unwrap();
804 log.append(vec![
805 Record {
806 key: Bytes::from("logs"),
807 value: Bytes::from("log-0"),
808 },
809 Record {
810 key: Bytes::from("logs"),
811 value: Bytes::from("log-1"),
812 },
813 Record {
814 key: Bytes::from("logs"),
815 value: Bytes::from("log-2"),
816 },
817 ])
818 .await
819 .unwrap();
820 log.flush().await.unwrap();
821
822 let mut iter = log.scan(Bytes::from("logs"), ..2).await.unwrap();
824 let mut entries = vec![];
825 while let Some(entry) = iter.next().await.unwrap() {
826 entries.push(entry);
827 }
828
829 assert_eq!(entries.len(), 2);
831 assert_eq!(entries[0].sequence, 0);
832 assert_eq!(entries[1].sequence, 1);
833 }
834
835 #[tokio::test]
836 async fn should_scan_only_entries_for_specified_key() {
837 let log = LogDb::open(test_config()).await.unwrap();
839 log.append(vec![
840 Record {
841 key: Bytes::from("key-a"),
842 value: Bytes::from("value-a-0"),
843 },
844 Record {
845 key: Bytes::from("key-b"),
846 value: Bytes::from("value-b-0"),
847 },
848 Record {
849 key: Bytes::from("key-a"),
850 value: Bytes::from("value-a-1"),
851 },
852 Record {
853 key: Bytes::from("key-b"),
854 value: Bytes::from("value-b-1"),
855 },
856 ])
857 .await
858 .unwrap();
859 log.flush().await.unwrap();
860
861 let mut iter = log.scan(Bytes::from("key-a"), ..).await.unwrap();
863 let mut entries = vec![];
864 while let Some(entry) = iter.next().await.unwrap() {
865 entries.push(entry);
866 }
867
868 assert_eq!(entries.len(), 2);
870 assert_eq!(entries[0].key, Bytes::from("key-a"));
871 assert_eq!(entries[0].value, Bytes::from("value-a-0"));
872 assert_eq!(entries[1].key, Bytes::from("key-a"));
873 assert_eq!(entries[1].value, Bytes::from("value-a-1"));
874 }
875
876 #[tokio::test]
877 async fn should_return_empty_iterator_for_unknown_key() {
878 let log = LogDb::open(test_config()).await.unwrap();
880 log.append(vec![Record {
881 key: Bytes::from("existing"),
882 value: Bytes::from("value"),
883 }])
884 .await
885 .unwrap();
886 log.flush().await.unwrap();
887
888 let mut iter = log.scan(Bytes::from("unknown"), ..).await.unwrap();
890 let entry = iter.next().await.unwrap();
891
892 assert!(entry.is_none());
894 }
895
896 #[tokio::test]
897 async fn should_return_empty_iterator_for_empty_range() {
898 let log = LogDb::open(test_config()).await.unwrap();
900 log.append(vec![
901 Record {
902 key: Bytes::from("key"),
903 value: Bytes::from("value-0"),
904 },
905 Record {
906 key: Bytes::from("key"),
907 value: Bytes::from("value-1"),
908 },
909 ])
910 .await
911 .unwrap();
912 log.flush().await.unwrap();
913
914 let mut iter = log.scan(Bytes::from("key"), 10..20).await.unwrap();
916 let entry = iter.next().await.unwrap();
917
918 assert!(entry.is_none());
920 }
921
922 #[tokio::test]
923 async fn should_scan_entries_via_log_reader() {
924 let storage = create_storage(
926 &StorageConfig::InMemory,
927 StorageRuntime::new(),
928 StorageSemantics::new(),
929 )
930 .await
931 .unwrap();
932 let log = LogDb::new(storage.clone()).await.unwrap();
933 log.append(vec![
934 Record {
935 key: Bytes::from("orders"),
936 value: Bytes::from("order-1"),
937 },
938 Record {
939 key: Bytes::from("orders"),
940 value: Bytes::from("order-2"),
941 },
942 Record {
943 key: Bytes::from("orders"),
944 value: Bytes::from("order-3"),
945 },
946 ])
947 .await
948 .unwrap();
949 log.flush().await.unwrap();
950
951 let reader = LogDbReader::new(storage).await.unwrap();
953 let mut iter = reader.scan(Bytes::from("orders"), ..).await.unwrap();
954 let mut entries = vec![];
955 while let Some(entry) = iter.next().await.unwrap() {
956 entries.push(entry);
957 }
958
959 assert_eq!(entries.len(), 3);
961 assert_eq!(entries[0].sequence, 0);
962 assert_eq!(entries[0].value, Bytes::from("order-1"));
963 assert_eq!(entries[1].sequence, 1);
964 assert_eq!(entries[1].value, Bytes::from("order-2"));
965 assert_eq!(entries[2].sequence, 2);
966 assert_eq!(entries[2].value, Bytes::from("order-3"));
967 }
968
969 #[tokio::test]
970 async fn should_scan_across_multiple_segments() {
971 let log = LogDb::open(test_config()).await.unwrap();
973
974 log.append(vec![
976 Record {
977 key: Bytes::from("events"),
978 value: Bytes::from("event-0"),
979 },
980 Record {
981 key: Bytes::from("events"),
982 value: Bytes::from("event-1"),
983 },
984 ])
985 .await
986 .unwrap();
987
988 log.seal_segment().await.unwrap();
990
991 log.append(vec![
993 Record {
994 key: Bytes::from("events"),
995 value: Bytes::from("event-2"),
996 },
997 Record {
998 key: Bytes::from("events"),
999 value: Bytes::from("event-3"),
1000 },
1001 ])
1002 .await
1003 .unwrap();
1004 log.flush().await.unwrap();
1005
1006 let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
1008 let mut entries = vec![];
1009 while let Some(entry) = iter.next().await.unwrap() {
1010 entries.push(entry);
1011 }
1012
1013 assert_eq!(entries.len(), 4);
1015 assert_eq!(entries[0].sequence, 0);
1016 assert_eq!(entries[0].value, Bytes::from("event-0"));
1017 assert_eq!(entries[1].sequence, 1);
1018 assert_eq!(entries[1].value, Bytes::from("event-1"));
1019 assert_eq!(entries[2].sequence, 2);
1020 assert_eq!(entries[2].value, Bytes::from("event-2"));
1021 assert_eq!(entries[3].sequence, 3);
1022 assert_eq!(entries[3].value, Bytes::from("event-3"));
1023 }
1024
1025 #[tokio::test]
1026 async fn should_scan_range_spanning_segments() {
1027 let log = LogDb::open(test_config()).await.unwrap();
1029
1030 log.append(vec![
1032 Record {
1033 key: Bytes::from("data"),
1034 value: Bytes::from("seg0-0"),
1035 },
1036 Record {
1037 key: Bytes::from("data"),
1038 value: Bytes::from("seg0-1"),
1039 },
1040 ])
1041 .await
1042 .unwrap();
1043
1044 log.seal_segment().await.unwrap();
1045
1046 log.append(vec![
1048 Record {
1049 key: Bytes::from("data"),
1050 value: Bytes::from("seg1-2"),
1051 },
1052 Record {
1053 key: Bytes::from("data"),
1054 value: Bytes::from("seg1-3"),
1055 },
1056 ])
1057 .await
1058 .unwrap();
1059
1060 log.seal_segment().await.unwrap();
1061
1062 log.append(vec![
1064 Record {
1065 key: Bytes::from("data"),
1066 value: Bytes::from("seg2-4"),
1067 },
1068 Record {
1069 key: Bytes::from("data"),
1070 value: Bytes::from("seg2-5"),
1071 },
1072 ])
1073 .await
1074 .unwrap();
1075 log.flush().await.unwrap();
1076
1077 let mut iter = log.scan(Bytes::from("data"), 1..5).await.unwrap();
1079 let mut entries = vec![];
1080 while let Some(entry) = iter.next().await.unwrap() {
1081 entries.push(entry);
1082 }
1083
1084 assert_eq!(entries.len(), 4);
1086 assert_eq!(entries[0].sequence, 1);
1087 assert_eq!(entries[1].sequence, 2);
1088 assert_eq!(entries[2].sequence, 3);
1089 assert_eq!(entries[3].sequence, 4);
1090 }
1091
1092 #[tokio::test]
1093 async fn should_scan_single_segment_in_multi_segment_log() {
1094 let log = LogDb::open(test_config()).await.unwrap();
1096
1097 log.append(vec![
1099 Record {
1100 key: Bytes::from("key"),
1101 value: Bytes::from("v0"),
1102 },
1103 Record {
1104 key: Bytes::from("key"),
1105 value: Bytes::from("v1"),
1106 },
1107 ])
1108 .await
1109 .unwrap();
1110
1111 log.seal_segment().await.unwrap();
1112
1113 log.append(vec![
1115 Record {
1116 key: Bytes::from("key"),
1117 value: Bytes::from("v2"),
1118 },
1119 Record {
1120 key: Bytes::from("key"),
1121 value: Bytes::from("v3"),
1122 },
1123 ])
1124 .await
1125 .unwrap();
1126 log.flush().await.unwrap();
1127
1128 let mut iter = log.scan(Bytes::from("key"), 2..4).await.unwrap();
1130 let mut entries = vec![];
1131 while let Some(entry) = iter.next().await.unwrap() {
1132 entries.push(entry);
1133 }
1134
1135 assert_eq!(entries.len(), 2);
1137 assert_eq!(entries[0].sequence, 2);
1138 assert_eq!(entries[1].sequence, 3);
1139 }
1140
1141 #[tokio::test]
1142 async fn should_list_keys_returns_iterator() {
1143 let log = LogDb::open(test_config()).await.unwrap();
1145 log.append(vec![
1146 Record {
1147 key: Bytes::from("key-a"),
1148 value: Bytes::from("value-a"),
1149 },
1150 Record {
1151 key: Bytes::from("key-b"),
1152 value: Bytes::from("value-b"),
1153 },
1154 ])
1155 .await
1156 .unwrap();
1157 log.flush().await.unwrap();
1158
1159 let _iter = log.list_keys(..).await.unwrap();
1161
1162 }
1164
1165 #[tokio::test]
1166 async fn should_list_keys_via_log_reader() {
1167 let storage = create_storage(
1169 &StorageConfig::InMemory,
1170 StorageRuntime::new(),
1171 StorageSemantics::new(),
1172 )
1173 .await
1174 .unwrap();
1175 let log = LogDb::new(storage.clone()).await.unwrap();
1176 log.append(vec![
1177 Record {
1178 key: Bytes::from("key-a"),
1179 value: Bytes::from("value-a"),
1180 },
1181 Record {
1182 key: Bytes::from("key-b"),
1183 value: Bytes::from("value-b"),
1184 },
1185 ])
1186 .await
1187 .unwrap();
1188 log.flush().await.unwrap();
1189
1190 let reader = LogDbReader::new(storage).await.unwrap();
1192 let _iter = reader.list_keys(..).await.unwrap();
1193
1194 }
1196
1197 #[tokio::test]
1198 async fn should_list_keys_in_single_segment() {
1199 let log = LogDb::open(test_config()).await.unwrap();
1201 log.append(vec![
1202 Record {
1203 key: Bytes::from("key-a"),
1204 value: Bytes::from("value-a"),
1205 },
1206 Record {
1207 key: Bytes::from("key-b"),
1208 value: Bytes::from("value-b"),
1209 },
1210 Record {
1211 key: Bytes::from("key-c"),
1212 value: Bytes::from("value-c"),
1213 },
1214 ])
1215 .await
1216 .unwrap();
1217 log.flush().await.unwrap();
1218
1219 let mut iter = log.list_keys(..).await.unwrap();
1221 let mut keys = vec![];
1222 while let Some(key) = iter.next().await.unwrap() {
1223 keys.push(key.key);
1224 }
1225
1226 assert_eq!(keys.len(), 3);
1228 assert_eq!(keys[0], Bytes::from("key-a"));
1229 assert_eq!(keys[1], Bytes::from("key-b"));
1230 assert_eq!(keys[2], Bytes::from("key-c"));
1231 }
1232
1233 #[tokio::test]
1234 async fn should_list_keys_across_segments_after_roll() {
1235 let log = LogDb::open(test_config()).await.unwrap();
1237
1238 log.append(vec![
1240 Record {
1241 key: Bytes::from("key-a"),
1242 value: Bytes::from("value-a-0"),
1243 },
1244 Record {
1245 key: Bytes::from("key-b"),
1246 value: Bytes::from("value-b-0"),
1247 },
1248 ])
1249 .await
1250 .unwrap();
1251
1252 log.seal_segment().await.unwrap();
1254
1255 log.append(vec![
1257 Record {
1258 key: Bytes::from("key-c"),
1259 value: Bytes::from("value-c-1"),
1260 },
1261 Record {
1262 key: Bytes::from("key-d"),
1263 value: Bytes::from("value-d-1"),
1264 },
1265 ])
1266 .await
1267 .unwrap();
1268 log.flush().await.unwrap();
1269
1270 let mut iter = log.list_keys(..).await.unwrap();
1272 let mut keys = vec![];
1273 while let Some(key) = iter.next().await.unwrap() {
1274 keys.push(key.key);
1275 }
1276
1277 assert_eq!(keys.len(), 4);
1279 assert_eq!(keys[0], Bytes::from("key-a"));
1280 assert_eq!(keys[1], Bytes::from("key-b"));
1281 assert_eq!(keys[2], Bytes::from("key-c"));
1282 assert_eq!(keys[3], Bytes::from("key-d"));
1283 }
1284
1285 #[tokio::test]
1286 async fn should_deduplicate_keys_across_segments() {
1287 let log = LogDb::open(test_config()).await.unwrap();
1289
1290 log.append(vec![Record {
1292 key: Bytes::from("shared-key"),
1293 value: Bytes::from("value-0"),
1294 }])
1295 .await
1296 .unwrap();
1297
1298 log.seal_segment().await.unwrap();
1300
1301 log.append(vec![Record {
1303 key: Bytes::from("shared-key"),
1304 value: Bytes::from("value-1"),
1305 }])
1306 .await
1307 .unwrap();
1308
1309 log.seal_segment().await.unwrap();
1311
1312 log.append(vec![Record {
1314 key: Bytes::from("shared-key"),
1315 value: Bytes::from("value-2"),
1316 }])
1317 .await
1318 .unwrap();
1319 log.flush().await.unwrap();
1320
1321 let mut iter = log.list_keys(..).await.unwrap();
1323 let mut keys = vec![];
1324 while let Some(key) = iter.next().await.unwrap() {
1325 keys.push(key.key);
1326 }
1327
1328 assert_eq!(keys.len(), 1);
1330 assert_eq!(keys[0], Bytes::from("shared-key"));
1331 }
1332
1333 #[tokio::test]
1334 async fn should_list_keys_in_lexicographic_order() {
1335 let log = LogDb::open(test_config()).await.unwrap();
1337 log.append(vec![
1338 Record {
1339 key: Bytes::from("zebra"),
1340 value: Bytes::from("value"),
1341 },
1342 Record {
1343 key: Bytes::from("apple"),
1344 value: Bytes::from("value"),
1345 },
1346 Record {
1347 key: Bytes::from("mango"),
1348 value: Bytes::from("value"),
1349 },
1350 ])
1351 .await
1352 .unwrap();
1353 log.flush().await.unwrap();
1354
1355 let mut iter = log.list_keys(..).await.unwrap();
1357 let mut keys = vec![];
1358 while let Some(key) = iter.next().await.unwrap() {
1359 keys.push(key.key);
1360 }
1361
1362 assert_eq!(keys[0], Bytes::from("apple"));
1364 assert_eq!(keys[1], Bytes::from("mango"));
1365 assert_eq!(keys[2], Bytes::from("zebra"));
1366 }
1367
1368 #[tokio::test]
1369 async fn should_list_empty_when_no_entries() {
1370 let log = LogDb::open(test_config()).await.unwrap();
1372
1373 let mut iter = log.list_keys(..).await.unwrap();
1375
1376 assert!(iter.next().await.unwrap().is_none());
1378 }
1379
1380 #[tokio::test]
1381 async fn should_list_keys_respects_segment_range() {
1382 let log = LogDb::open(test_config()).await.unwrap();
1384
1385 log.append(vec![
1387 Record {
1388 key: Bytes::from("key-seg0"),
1389 value: Bytes::from("value"),
1390 },
1391 Record {
1392 key: Bytes::from("key-seg0-b"),
1393 value: Bytes::from("value"),
1394 },
1395 ])
1396 .await
1397 .unwrap();
1398
1399 log.seal_segment().await.unwrap();
1400
1401 log.append(vec![
1403 Record {
1404 key: Bytes::from("key-seg1"),
1405 value: Bytes::from("value"),
1406 },
1407 Record {
1408 key: Bytes::from("key-seg1-b"),
1409 value: Bytes::from("value"),
1410 },
1411 ])
1412 .await
1413 .unwrap();
1414
1415 log.seal_segment().await.unwrap();
1416
1417 log.append(vec![
1419 Record {
1420 key: Bytes::from("key-seg2"),
1421 value: Bytes::from("value"),
1422 },
1423 Record {
1424 key: Bytes::from("key-seg2-b"),
1425 value: Bytes::from("value"),
1426 },
1427 ])
1428 .await
1429 .unwrap();
1430 log.flush().await.unwrap();
1431
1432 let mut iter = log.list_keys(1..2).await.unwrap();
1434 let mut keys = vec![];
1435 while let Some(key) = iter.next().await.unwrap() {
1436 keys.push(key.key);
1437 }
1438
1439 assert_eq!(keys.len(), 2);
1441 assert_eq!(keys[0], Bytes::from("key-seg1"));
1442 assert_eq!(keys[1], Bytes::from("key-seg1-b"));
1443 }
1444
1445 #[tokio::test]
1446 async fn should_list_segments_returns_empty_when_no_segments() {
1447 let log = LogDb::open(test_config()).await.unwrap();
1449
1450 let segments = log.list_segments(..).await.unwrap();
1452
1453 assert!(segments.is_empty());
1455 }
1456
1457 #[tokio::test]
1458 async fn should_list_segments_returns_single_segment() {
1459 let log = LogDb::open(test_config()).await.unwrap();
1461 log.append(vec![Record {
1462 key: Bytes::from("key"),
1463 value: Bytes::from("value"),
1464 }])
1465 .await
1466 .unwrap();
1467 log.flush().await.unwrap();
1468
1469 let segments = log.list_segments(..).await.unwrap();
1471
1472 assert_eq!(segments.len(), 1);
1474 assert_eq!(segments[0].id, 0);
1475 assert_eq!(segments[0].start_seq, 0);
1476 }
1477
1478 #[tokio::test]
1479 async fn should_list_segments_returns_multiple_segments() {
1480 let log = LogDb::open(test_config()).await.unwrap();
1482
1483 log.append(vec![Record {
1485 key: Bytes::from("key"),
1486 value: Bytes::from("value-0"),
1487 }])
1488 .await
1489 .unwrap();
1490
1491 log.seal_segment().await.unwrap();
1492
1493 log.append(vec![Record {
1495 key: Bytes::from("key"),
1496 value: Bytes::from("value-1"),
1497 }])
1498 .await
1499 .unwrap();
1500
1501 log.seal_segment().await.unwrap();
1502
1503 log.append(vec![Record {
1505 key: Bytes::from("key"),
1506 value: Bytes::from("value-2"),
1507 }])
1508 .await
1509 .unwrap();
1510 log.flush().await.unwrap();
1511
1512 let segments = log.list_segments(..).await.unwrap();
1514
1515 assert_eq!(segments.len(), 3);
1517 assert_eq!(segments[0].id, 0);
1518 assert_eq!(segments[0].start_seq, 0);
1519 assert_eq!(segments[1].id, 1);
1520 assert_eq!(segments[1].start_seq, 1);
1521 assert_eq!(segments[2].id, 2);
1522 assert_eq!(segments[2].start_seq, 2);
1523 }
1524
1525 #[tokio::test]
1526 async fn should_list_segments_filters_by_sequence_range() {
1527 let log = LogDb::open(test_config()).await.unwrap();
1529
1530 log.append(vec![
1532 Record {
1533 key: Bytes::from("key"),
1534 value: Bytes::from("v0"),
1535 },
1536 Record {
1537 key: Bytes::from("key"),
1538 value: Bytes::from("v1"),
1539 },
1540 ])
1541 .await
1542 .unwrap();
1543
1544 log.seal_segment().await.unwrap();
1545
1546 log.append(vec![
1548 Record {
1549 key: Bytes::from("key"),
1550 value: Bytes::from("v2"),
1551 },
1552 Record {
1553 key: Bytes::from("key"),
1554 value: Bytes::from("v3"),
1555 },
1556 ])
1557 .await
1558 .unwrap();
1559
1560 log.seal_segment().await.unwrap();
1561
1562 log.append(vec![
1564 Record {
1565 key: Bytes::from("key"),
1566 value: Bytes::from("v4"),
1567 },
1568 Record {
1569 key: Bytes::from("key"),
1570 value: Bytes::from("v5"),
1571 },
1572 ])
1573 .await
1574 .unwrap();
1575 log.flush().await.unwrap();
1576
1577 let segments = log.list_segments(2..4).await.unwrap();
1579
1580 assert_eq!(segments.len(), 1);
1582 assert_eq!(segments[0].id, 1);
1583 assert_eq!(segments[0].start_seq, 2);
1584 }
1585
1586 #[tokio::test]
1587 async fn should_list_segments_via_log_reader() {
1588 let storage = create_storage(
1590 &StorageConfig::InMemory,
1591 StorageRuntime::new(),
1592 StorageSemantics::new(),
1593 )
1594 .await
1595 .unwrap();
1596 let log = LogDb::new(storage.clone()).await.unwrap();
1597
1598 log.append(vec![Record {
1599 key: Bytes::from("key"),
1600 value: Bytes::from("value-0"),
1601 }])
1602 .await
1603 .unwrap();
1604
1605 log.seal_segment().await.unwrap();
1606
1607 log.append(vec![Record {
1608 key: Bytes::from("key"),
1609 value: Bytes::from("value-1"),
1610 }])
1611 .await
1612 .unwrap();
1613 log.flush().await.unwrap();
1614
1615 let reader = LogDbReader::new(storage).await.unwrap();
1617 let segments = reader.list_segments(..).await.unwrap();
1618
1619 assert_eq!(segments.len(), 2);
1621 assert_eq!(segments[0].id, 0);
1622 assert_eq!(segments[1].id, 1);
1623 }
1624
1625 #[tokio::test]
1626 async fn should_list_segments_includes_start_time() {
1627 let log = LogDb::open(test_config()).await.unwrap();
1629 log.append(vec![Record {
1630 key: Bytes::from("key"),
1631 value: Bytes::from("value"),
1632 }])
1633 .await
1634 .unwrap();
1635 log.flush().await.unwrap();
1636
1637 let segments = log.list_segments(..).await.unwrap();
1639
1640 assert_eq!(segments.len(), 1);
1642 assert!(segments[0].start_time_ms > 1577836800000); }
1644}