1use std::ops::{Range, RangeBounds};
8use std::sync::Arc;
9use std::time::Duration;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use tokio::sync::RwLock;
14use tokio::sync::watch;
15use tokio::task::JoinHandle;
16use tokio::time::MissedTickBehavior;
17
18use common::storage::factory::create_storage_read;
19use common::{StorageRead, StorageSemantics};
20
21use crate::config::{CountOptions, ReaderConfig, ScanOptions, SegmentConfig};
22use crate::error::{Error, Result};
23use crate::listing::LogKeyIterator;
24use crate::model::{LogEntry, Segment, SegmentId, Sequence};
25use crate::range::{normalize_segment_id, normalize_sequence};
26use crate::segment::{LogSegment, SegmentCache};
27use crate::storage::{LogStorageRead, SegmentIterator};
28
29#[async_trait]
56pub trait LogRead {
57 async fn scan(
83 &self,
84 key: Bytes,
85 seq_range: impl RangeBounds<Sequence> + Send,
86 ) -> Result<LogIterator> {
87 self.scan_with_options(key, seq_range, ScanOptions::default())
88 .await
89 }
90
91 async fn scan_with_options(
106 &self,
107 key: Bytes,
108 seq_range: impl RangeBounds<Sequence> + Send,
109 options: ScanOptions,
110 ) -> Result<LogIterator>;
111
112 async fn count(&self, key: Bytes, seq_range: impl RangeBounds<Sequence> + Send) -> Result<u64> {
131 self.count_with_options(key, seq_range, CountOptions::default())
132 .await
133 }
134
135 async fn count_with_options(
147 &self,
148 key: Bytes,
149 seq_range: impl RangeBounds<Sequence> + Send,
150 options: CountOptions,
151 ) -> Result<u64>;
152
153 async fn list_keys(
181 &self,
182 segment_range: impl RangeBounds<SegmentId> + Send,
183 ) -> Result<LogKeyIterator>;
184
185 async fn list_segments(
211 &self,
212 seq_range: impl RangeBounds<Sequence> + Send,
213 ) -> Result<Vec<Segment>>;
214}
215
216pub(crate) struct LogReadInner {
221 pub(crate) storage: LogStorageRead,
222 pub(crate) segments: SegmentCache,
223}
224
225impl LogReadInner {
226 pub(crate) fn new(storage: LogStorageRead, segments: SegmentCache) -> Self {
228 Self { storage, segments }
229 }
230
231 pub(crate) fn scan_with_options(
233 &self,
234 key: Bytes,
235 seq_range: Range<Sequence>,
236 _options: &ScanOptions,
237 ) -> LogIterator {
238 LogIterator::open(self.storage.clone(), &self.segments, key, seq_range)
239 }
240
241 pub(crate) async fn list_keys(
243 &self,
244 segment_range: Range<SegmentId>,
245 ) -> Result<LogKeyIterator> {
246 self.storage.list_keys(segment_range).await
247 }
248
249 pub(crate) fn list_segments(&self, seq_range: &Range<Sequence>) -> Vec<Segment> {
251 self.segments
252 .find_covering(seq_range)
253 .into_iter()
254 .map(|s| s.into())
255 .collect()
256 }
257}
258
259pub struct LogDbReader {
307 inner: Arc<RwLock<LogReadInner>>,
308 shutdown_tx: watch::Sender<bool>,
309 refresh_task: Option<JoinHandle<()>>,
310}
311
312impl LogDbReader {
313 pub async fn open(config: ReaderConfig) -> Result<Self> {
352 let reader_options = slatedb::config::DbReaderOptions {
353 manifest_poll_interval: config.refresh_interval,
354 ..Default::default()
355 };
356 let storage: Arc<dyn StorageRead> =
357 create_storage_read(&config.storage, StorageSemantics::new(), reader_options)
358 .await
359 .map_err(|e| Error::Storage(e.to_string()))?;
360 let log_storage = LogStorageRead::new(storage);
361 let segments = SegmentCache::open(&log_storage, SegmentConfig::default()).await?;
362 let inner = Arc::new(RwLock::new(LogReadInner::new(log_storage, segments)));
363
364 let (shutdown_tx, refresh_task) =
365 Self::spawn_refresh_task(Arc::clone(&inner), config.refresh_interval);
366
367 Ok(Self {
368 inner,
369 shutdown_tx,
370 refresh_task: Some(refresh_task),
371 })
372 }
373
374 fn spawn_refresh_task(
376 inner: Arc<RwLock<LogReadInner>>,
377 interval: Duration,
378 ) -> (watch::Sender<bool>, JoinHandle<()>) {
379 let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
380
381 let task = tokio::spawn(async move {
382 let mut ticker = tokio::time::interval(interval);
383 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
384
385 loop {
386 tokio::select! {
387 _ = ticker.tick() => {
388 let after_segment_id = {
390 let read_inner = inner.read().await;
391 read_inner.segments.latest().map(|s| s.id())
392 };
393
394 let mut read_inner = inner.write().await;
396 let storage = read_inner.storage.clone();
397 if let Err(e) = read_inner.segments.refresh(&storage, after_segment_id).await {
398 tracing::warn!("Failed to refresh segment cache: {}", e);
399 }
400 }
401 _ = shutdown_rx.changed() => {
402 if *shutdown_rx.borrow() {
403 break;
404 }
405 }
406 }
407 }
408 });
409
410 (shutdown_tx, task)
411 }
412
413 #[cfg(test)]
415 pub(crate) async fn new(storage: Arc<dyn StorageRead>) -> Result<Self> {
416 let log_storage = LogStorageRead::new(storage);
417 let segments = SegmentCache::open(&log_storage, SegmentConfig::default()).await?;
418 let inner = Arc::new(RwLock::new(LogReadInner::new(log_storage, segments)));
419 let (shutdown_tx, _) = watch::channel(false);
420 Ok(Self {
421 inner,
422 shutdown_tx,
423 refresh_task: None,
424 })
425 }
426
427 pub async fn close(self) {
432 let _ = self.shutdown_tx.send(true);
434
435 if let Some(task) = self.refresh_task {
437 let timeout = tokio::time::timeout(Duration::from_secs(5), task).await;
438 if timeout.is_err() {
439 tracing::warn!("Refresh task did not stop within timeout");
440 }
441 }
442 }
443}
444
445#[async_trait]
446impl LogRead for LogDbReader {
447 async fn scan_with_options(
448 &self,
449 key: Bytes,
450 seq_range: impl RangeBounds<Sequence> + Send,
451 options: ScanOptions,
452 ) -> Result<LogIterator> {
453 let seq_range = normalize_sequence(&seq_range);
454 let inner = self.inner.read().await;
455 Ok(inner.scan_with_options(key, seq_range, &options))
456 }
457
458 async fn count_with_options(
459 &self,
460 _key: Bytes,
461 _seq_range: impl RangeBounds<Sequence> + Send,
462 _options: CountOptions,
463 ) -> Result<u64> {
464 todo!()
465 }
466
467 async fn list_keys(
468 &self,
469 segment_range: impl RangeBounds<SegmentId> + Send,
470 ) -> Result<LogKeyIterator> {
471 let segment_range = normalize_segment_id(&segment_range);
472 let inner = self.inner.read().await;
473 inner.list_keys(segment_range).await
474 }
475
476 async fn list_segments(
477 &self,
478 seq_range: impl RangeBounds<Sequence> + Send,
479 ) -> Result<Vec<Segment>> {
480 let seq_range = normalize_sequence(&seq_range);
481 let inner = self.inner.read().await;
482 Ok(inner.list_segments(&seq_range))
483 }
484}
485
486pub struct LogIterator {
492 storage: LogStorageRead,
493 segments: Vec<LogSegment>,
494 key: Bytes,
495 seq_range: Range<Sequence>,
496 current_segment_idx: usize,
497 current_iter: Option<SegmentIterator>,
498}
499
500impl LogIterator {
501 pub(crate) fn open(
503 storage: LogStorageRead,
504 segment_cache: &SegmentCache,
505 key: Bytes,
506 seq_range: Range<Sequence>,
507 ) -> Self {
508 let segments = segment_cache.find_covering(&seq_range);
509 Self {
510 storage,
511 segments,
512 key,
513 seq_range,
514 current_segment_idx: 0,
515 current_iter: None,
516 }
517 }
518
519 #[cfg(test)]
521 pub(crate) fn new(
522 storage: LogStorageRead,
523 segments: Vec<LogSegment>,
524 key: Bytes,
525 seq_range: Range<Sequence>,
526 ) -> Self {
527 Self {
528 storage,
529 segments,
530 key,
531 seq_range,
532 current_segment_idx: 0,
533 current_iter: None,
534 }
535 }
536
537 pub async fn next(&mut self) -> Result<Option<LogEntry>> {
539 loop {
540 if let Some(iter) = &mut self.current_iter {
542 if let Some(entry) = iter.next().await? {
543 return Ok(Some(entry));
544 }
545 self.current_iter = None;
547 self.current_segment_idx += 1;
548 }
549
550 if !self.advance_segment().await? {
552 return Ok(None);
553 }
554 }
555 }
556
557 async fn advance_segment(&mut self) -> Result<bool> {
561 if self.current_segment_idx >= self.segments.len() {
562 return Ok(false);
563 }
564
565 let segment = &self.segments[self.current_segment_idx];
566 let iter = self
567 .storage
568 .scan_entries(segment, &self.key, self.seq_range.clone())
569 .await?;
570 self.current_iter = Some(iter);
571 Ok(true)
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578 use crate::serde::SegmentMeta;
579 use crate::storage::LogStorage;
580
581 fn entry(key: &[u8], seq: u64, value: &[u8]) -> LogEntry {
582 LogEntry {
583 key: Bytes::copy_from_slice(key),
584 sequence: seq,
585 value: Bytes::copy_from_slice(value),
586 }
587 }
588
589 #[tokio::test]
590 async fn should_return_none_when_no_segments() {
591 let storage = LogStorage::in_memory();
592 let segments = vec![];
593
594 let mut iter =
595 LogIterator::new(storage.as_read(), segments, Bytes::from("key"), 0..u64::MAX);
596
597 assert!(iter.next().await.unwrap().is_none());
598 }
599
600 #[tokio::test]
601 async fn should_iterate_entries_in_single_segment() {
602 let storage = LogStorage::in_memory();
603 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
604 storage
605 .write_entry(&segment, &entry(b"key", 0, b"value0"))
606 .await
607 .unwrap();
608 storage
609 .write_entry(&segment, &entry(b"key", 1, b"value1"))
610 .await
611 .unwrap();
612 storage
613 .write_entry(&segment, &entry(b"key", 2, b"value2"))
614 .await
615 .unwrap();
616
617 let mut iter = LogIterator::new(
618 storage.as_read(),
619 vec![segment],
620 Bytes::from("key"),
621 0..u64::MAX,
622 );
623
624 let entry = iter.next().await.unwrap().unwrap();
625 assert_eq!(entry.sequence, 0);
626 assert_eq!(entry.value.as_ref(), b"value0");
627
628 let entry = iter.next().await.unwrap().unwrap();
629 assert_eq!(entry.sequence, 1);
630 assert_eq!(entry.value.as_ref(), b"value1");
631
632 let entry = iter.next().await.unwrap().unwrap();
633 assert_eq!(entry.sequence, 2);
634 assert_eq!(entry.value.as_ref(), b"value2");
635
636 assert!(iter.next().await.unwrap().is_none());
637 }
638
639 #[tokio::test]
640 async fn should_iterate_entries_across_multiple_segments() {
641 let storage = LogStorage::in_memory();
642 let segment0 = LogSegment::new(0, SegmentMeta::new(0, 1000));
643 let segment1 = LogSegment::new(1, SegmentMeta::new(100, 2000));
644 storage
646 .write_entry(&segment0, &entry(b"key", 0, b"value0"))
647 .await
648 .unwrap();
649 storage
650 .write_entry(&segment0, &entry(b"key", 1, b"value1"))
651 .await
652 .unwrap();
653 storage
655 .write_entry(&segment1, &entry(b"key", 100, b"value100"))
656 .await
657 .unwrap();
658 storage
659 .write_entry(&segment1, &entry(b"key", 101, b"value101"))
660 .await
661 .unwrap();
662
663 let mut iter = LogIterator::new(
664 storage.as_read(),
665 vec![segment0, segment1],
666 Bytes::from("key"),
667 0..u64::MAX,
668 );
669
670 let entry = iter.next().await.unwrap().unwrap();
672 assert_eq!(entry.sequence, 0);
673 assert_eq!(entry.value.as_ref(), b"value0");
674
675 let entry = iter.next().await.unwrap().unwrap();
676 assert_eq!(entry.sequence, 1);
677 assert_eq!(entry.value.as_ref(), b"value1");
678
679 let entry = iter.next().await.unwrap().unwrap();
681 assert_eq!(entry.sequence, 100);
682 assert_eq!(entry.value.as_ref(), b"value100");
683
684 let entry = iter.next().await.unwrap().unwrap();
685 assert_eq!(entry.sequence, 101);
686 assert_eq!(entry.value.as_ref(), b"value101");
687
688 assert!(iter.next().await.unwrap().is_none());
689 }
690
691 #[tokio::test]
692 async fn should_filter_by_sequence_range() {
693 let storage = LogStorage::in_memory();
694 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
695 storage
696 .write_entry(&segment, &entry(b"key", 0, b"value0"))
697 .await
698 .unwrap();
699 storage
700 .write_entry(&segment, &entry(b"key", 1, b"value1"))
701 .await
702 .unwrap();
703 storage
704 .write_entry(&segment, &entry(b"key", 2, b"value2"))
705 .await
706 .unwrap();
707 storage
708 .write_entry(&segment, &entry(b"key", 3, b"value3"))
709 .await
710 .unwrap();
711
712 let mut iter = LogIterator::new(storage.as_read(), vec![segment], Bytes::from("key"), 1..3);
713
714 let entry = iter.next().await.unwrap().unwrap();
715 assert_eq!(entry.sequence, 1);
716
717 let entry = iter.next().await.unwrap().unwrap();
718 assert_eq!(entry.sequence, 2);
719
720 assert!(iter.next().await.unwrap().is_none());
721 }
722
723 #[tokio::test]
724 async fn should_filter_entries_for_specified_key() {
725 let storage = LogStorage::in_memory();
726 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
727 storage
728 .write_entry(&segment, &entry(b"key1", 0, b"k1v0"))
729 .await
730 .unwrap();
731 storage
732 .write_entry(&segment, &entry(b"key2", 0, b"k2v0"))
733 .await
734 .unwrap();
735 storage
736 .write_entry(&segment, &entry(b"key1", 1, b"k1v1"))
737 .await
738 .unwrap();
739 storage
740 .write_entry(&segment, &entry(b"key2", 1, b"k2v1"))
741 .await
742 .unwrap();
743
744 let mut iter = LogIterator::new(
745 storage.as_read(),
746 vec![segment],
747 Bytes::from("key1"),
748 0..u64::MAX,
749 );
750
751 let entry = iter.next().await.unwrap().unwrap();
752 assert_eq!(entry.key.as_ref(), b"key1");
753 assert_eq!(entry.sequence, 0);
754
755 let entry = iter.next().await.unwrap().unwrap();
756 assert_eq!(entry.key.as_ref(), b"key1");
757 assert_eq!(entry.sequence, 1);
758
759 assert!(iter.next().await.unwrap().is_none());
760 }
761
762 #[tokio::test]
763 async fn should_return_none_when_no_entries_in_range() {
764 let storage = LogStorage::in_memory();
765 let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
766 storage
767 .write_entry(&segment, &entry(b"key", 0, b"value0"))
768 .await
769 .unwrap();
770 storage
771 .write_entry(&segment, &entry(b"key", 1, b"value1"))
772 .await
773 .unwrap();
774
775 let mut iter =
776 LogIterator::new(storage.as_read(), vec![segment], Bytes::from("key"), 10..20);
777
778 assert!(iter.next().await.unwrap().is_none());
779 }
780
781 #[tokio::test]
782 async fn open_spawns_refresh_task() {
783 use common::StorageConfig;
784
785 let config = ReaderConfig {
786 storage: StorageConfig::InMemory,
787 refresh_interval: Duration::from_millis(100),
788 };
789
790 let reader = LogDbReader::open(config).await.unwrap();
791
792 assert!(reader.refresh_task.is_some());
794
795 reader.close().await;
797 }
798
799 #[tokio::test]
800 async fn close_stops_refresh_task_gracefully() {
801 use common::StorageConfig;
802
803 let config = ReaderConfig {
804 storage: StorageConfig::InMemory,
805 refresh_interval: Duration::from_millis(50),
806 };
807
808 let reader = LogDbReader::open(config).await.unwrap();
809 assert!(reader.refresh_task.is_some());
810
811 let close_result =
813 tokio::time::timeout(Duration::from_secs(1), async { reader.close().await }).await;
814
815 assert!(
816 close_result.is_ok(),
817 "close() should complete within timeout"
818 );
819 }
820}