use std::ops::{Range, RangeBounds};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::RwLock;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use crate::config::{CountOptions, ReaderConfig, ScanOptions, SegmentConfig};
use crate::error::{Error, Result};
use crate::listing::LogKeyIterator;
use crate::model::{LogEntry, Segment, SegmentId, Sequence};
use crate::range::{normalize_segment_id, normalize_sequence};
use crate::segment::{LogSegment, SegmentCache};
use crate::storage::{LogStorageRead, SegmentIterator};
use common::storage::StorageSnapshot;
use common::storage::factory::create_storage_read;
use common::{StorageRead, StorageSemantics};
#[async_trait]
pub trait LogRead {
async fn scan(
&self,
key: Bytes,
seq_range: impl RangeBounds<Sequence> + Send,
) -> Result<LogIterator> {
self.scan_with_options(key, seq_range, ScanOptions::default())
.await
}
async fn scan_with_options(
&self,
key: Bytes,
seq_range: impl RangeBounds<Sequence> + Send,
options: ScanOptions,
) -> Result<LogIterator>;
async fn count(&self, key: Bytes, seq_range: impl RangeBounds<Sequence> + Send) -> Result<u64> {
self.count_with_options(key, seq_range, CountOptions::default())
.await
}
async fn count_with_options(
&self,
key: Bytes,
seq_range: impl RangeBounds<Sequence> + Send,
options: CountOptions,
) -> Result<u64>;
async fn list_keys(
&self,
segment_range: impl RangeBounds<SegmentId> + Send,
) -> Result<LogKeyIterator>;
async fn list_segments(
&self,
seq_range: impl RangeBounds<Sequence> + Send,
) -> Result<Vec<Segment>>;
}
pub(crate) struct LogReadView {
pub(crate) storage: LogStorageRead,
pub(crate) segments: SegmentCache,
}
impl LogReadView {
pub(crate) fn new(storage: LogStorageRead, segments: SegmentCache) -> Self {
Self { storage, segments }
}
pub(crate) fn update_snapshot(&mut self, snapshot: Arc<dyn StorageSnapshot>) {
self.storage = LogStorageRead::new(snapshot as Arc<dyn StorageRead>);
}
pub(crate) fn apply_new_segments(&mut self, segments: &[LogSegment]) {
for segment in segments {
self.segments.insert(segment.clone());
}
}
pub(crate) fn scan_with_options(
&self,
key: Bytes,
seq_range: Range<Sequence>,
_options: &ScanOptions,
) -> LogIterator {
LogIterator::open(self.storage.clone(), &self.segments, key, seq_range)
}
pub(crate) async fn list_keys(
&self,
segment_range: Range<SegmentId>,
) -> Result<LogKeyIterator> {
self.storage.list_keys(segment_range).await
}
pub(crate) fn list_segments(&self, seq_range: &Range<Sequence>) -> Vec<Segment> {
self.segments
.find_covering(seq_range)
.into_iter()
.map(|s| s.into())
.collect()
}
}
pub struct LogDbReader {
read_view: Arc<RwLock<LogReadView>>,
shutdown_tx: watch::Sender<bool>,
refresh_task: Option<JoinHandle<()>>,
}
impl LogDbReader {
pub async fn open(config: ReaderConfig) -> Result<Self> {
let reader_options = slatedb::config::DbReaderOptions {
manifest_poll_interval: config.refresh_interval,
..Default::default()
};
let storage: Arc<dyn StorageRead> =
create_storage_read(&config.storage, StorageSemantics::new(), reader_options)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
let log_storage = LogStorageRead::new(storage);
let segments = SegmentCache::open(&log_storage, SegmentConfig::default()).await?;
let read_view = Arc::new(RwLock::new(LogReadView::new(log_storage, segments)));
let (shutdown_tx, refresh_task) =
Self::spawn_refresh_task(Arc::clone(&read_view), config.refresh_interval);
Ok(Self {
read_view,
shutdown_tx,
refresh_task: Some(refresh_task),
})
}
fn spawn_refresh_task(
read_view: Arc<RwLock<LogReadView>>,
interval: Duration,
) -> (watch::Sender<bool>, JoinHandle<()>) {
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
let task = tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = ticker.tick() => {
let after_segment_id = {
let view = read_view.read().await;
view.segments.latest().map(|s| s.id())
};
let mut view = read_view.write().await;
let storage = view.storage.clone();
if let Err(e) = view.segments.refresh(&storage, after_segment_id).await {
tracing::warn!("Failed to refresh segment cache: {}", e);
}
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
break;
}
}
}
}
});
(shutdown_tx, task)
}
#[cfg(test)]
pub(crate) async fn new(storage: Arc<dyn StorageRead>) -> Result<Self> {
let log_storage = LogStorageRead::new(storage);
let segments = SegmentCache::open(&log_storage, SegmentConfig::default()).await?;
let read_view = Arc::new(RwLock::new(LogReadView::new(log_storage, segments)));
let (shutdown_tx, _) = watch::channel(false);
Ok(Self {
read_view,
shutdown_tx,
refresh_task: None,
})
}
pub async fn close(self) {
let _ = self.shutdown_tx.send(true);
if let Some(task) = self.refresh_task {
let timeout = tokio::time::timeout(Duration::from_secs(5), task).await;
if timeout.is_err() {
tracing::warn!("Refresh task did not stop within timeout");
}
}
}
}
#[async_trait]
impl LogRead for LogDbReader {
async fn scan_with_options(
&self,
key: Bytes,
seq_range: impl RangeBounds<Sequence> + Send,
options: ScanOptions,
) -> Result<LogIterator> {
let seq_range = normalize_sequence(&seq_range);
let view = self.read_view.read().await;
Ok(view.scan_with_options(key, seq_range, &options))
}
async fn count_with_options(
&self,
_key: Bytes,
_seq_range: impl RangeBounds<Sequence> + Send,
_options: CountOptions,
) -> Result<u64> {
todo!()
}
async fn list_keys(
&self,
segment_range: impl RangeBounds<SegmentId> + Send,
) -> Result<LogKeyIterator> {
let segment_range = normalize_segment_id(&segment_range);
let view = self.read_view.read().await;
view.list_keys(segment_range).await
}
async fn list_segments(
&self,
seq_range: impl RangeBounds<Sequence> + Send,
) -> Result<Vec<Segment>> {
let seq_range = normalize_sequence(&seq_range);
let view = self.read_view.read().await;
Ok(view.list_segments(&seq_range))
}
}
pub struct LogIterator {
storage: LogStorageRead,
segments: Vec<LogSegment>,
key: Bytes,
seq_range: Range<Sequence>,
current_segment_idx: usize,
current_iter: Option<SegmentIterator>,
}
impl LogIterator {
pub(crate) fn open(
storage: LogStorageRead,
segment_cache: &SegmentCache,
key: Bytes,
seq_range: Range<Sequence>,
) -> Self {
let segments = segment_cache.find_covering(&seq_range);
Self {
storage,
segments,
key,
seq_range,
current_segment_idx: 0,
current_iter: None,
}
}
#[cfg(test)]
pub(crate) fn new(
storage: LogStorageRead,
segments: Vec<LogSegment>,
key: Bytes,
seq_range: Range<Sequence>,
) -> Self {
Self {
storage,
segments,
key,
seq_range,
current_segment_idx: 0,
current_iter: None,
}
}
pub async fn next(&mut self) -> Result<Option<LogEntry>> {
loop {
if let Some(iter) = &mut self.current_iter {
if let Some(entry) = iter.next().await? {
return Ok(Some(entry));
}
self.current_iter = None;
self.current_segment_idx += 1;
}
if !self.advance_segment().await? {
return Ok(None);
}
}
}
async fn advance_segment(&mut self) -> Result<bool> {
if self.current_segment_idx >= self.segments.len() {
return Ok(false);
}
let segment = &self.segments[self.current_segment_idx];
let iter = self
.storage
.scan_entries(segment, &self.key, self.seq_range.clone())
.await?;
self.current_iter = Some(iter);
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::serde::SegmentMeta;
use crate::storage::LogStorage;
fn entry(key: &[u8], seq: u64, value: &[u8]) -> LogEntry {
LogEntry {
key: Bytes::copy_from_slice(key),
sequence: seq,
value: Bytes::copy_from_slice(value),
}
}
#[tokio::test]
async fn should_return_none_when_no_segments() {
let storage = LogStorage::in_memory();
let segments = vec![];
let mut iter =
LogIterator::new(storage.as_read(), segments, Bytes::from("key"), 0..u64::MAX);
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_iterate_entries_in_single_segment() {
let storage = LogStorage::in_memory();
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
storage
.write_entry(&segment, &entry(b"key", 0, b"value0"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key", 1, b"value1"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key", 2, b"value2"))
.await
.unwrap();
let mut iter = LogIterator::new(
storage.as_read(),
vec![segment],
Bytes::from("key"),
0..u64::MAX,
);
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 0);
assert_eq!(entry.value.as_ref(), b"value0");
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 1);
assert_eq!(entry.value.as_ref(), b"value1");
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 2);
assert_eq!(entry.value.as_ref(), b"value2");
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_iterate_entries_across_multiple_segments() {
let storage = LogStorage::in_memory();
let segment0 = LogSegment::new(0, SegmentMeta::new(0, 1000));
let segment1 = LogSegment::new(1, SegmentMeta::new(100, 2000));
storage
.write_entry(&segment0, &entry(b"key", 0, b"value0"))
.await
.unwrap();
storage
.write_entry(&segment0, &entry(b"key", 1, b"value1"))
.await
.unwrap();
storage
.write_entry(&segment1, &entry(b"key", 100, b"value100"))
.await
.unwrap();
storage
.write_entry(&segment1, &entry(b"key", 101, b"value101"))
.await
.unwrap();
let mut iter = LogIterator::new(
storage.as_read(),
vec![segment0, segment1],
Bytes::from("key"),
0..u64::MAX,
);
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 0);
assert_eq!(entry.value.as_ref(), b"value0");
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 1);
assert_eq!(entry.value.as_ref(), b"value1");
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 100);
assert_eq!(entry.value.as_ref(), b"value100");
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 101);
assert_eq!(entry.value.as_ref(), b"value101");
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_filter_by_sequence_range() {
let storage = LogStorage::in_memory();
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
storage
.write_entry(&segment, &entry(b"key", 0, b"value0"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key", 1, b"value1"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key", 2, b"value2"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key", 3, b"value3"))
.await
.unwrap();
let mut iter = LogIterator::new(storage.as_read(), vec![segment], Bytes::from("key"), 1..3);
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 1);
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 2);
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_filter_entries_for_specified_key() {
let storage = LogStorage::in_memory();
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
storage
.write_entry(&segment, &entry(b"key1", 0, b"k1v0"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key2", 0, b"k2v0"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key1", 1, b"k1v1"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key2", 1, b"k2v1"))
.await
.unwrap();
let mut iter = LogIterator::new(
storage.as_read(),
vec![segment],
Bytes::from("key1"),
0..u64::MAX,
);
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.key.as_ref(), b"key1");
assert_eq!(entry.sequence, 0);
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.key.as_ref(), b"key1");
assert_eq!(entry.sequence, 1);
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_return_none_when_no_entries_in_range() {
let storage = LogStorage::in_memory();
let segment = LogSegment::new(0, SegmentMeta::new(0, 1000));
storage
.write_entry(&segment, &entry(b"key", 0, b"value0"))
.await
.unwrap();
storage
.write_entry(&segment, &entry(b"key", 1, b"value1"))
.await
.unwrap();
let mut iter =
LogIterator::new(storage.as_read(), vec![segment], Bytes::from("key"), 10..20);
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn open_spawns_refresh_task() {
use common::StorageConfig;
let config = ReaderConfig {
storage: StorageConfig::InMemory,
refresh_interval: Duration::from_millis(100),
};
let reader = LogDbReader::open(config).await.unwrap();
assert!(reader.refresh_task.is_some());
reader.close().await;
}
#[tokio::test]
async fn close_stops_refresh_task_gracefully() {
use common::StorageConfig;
let config = ReaderConfig {
storage: StorageConfig::InMemory,
refresh_interval: Duration::from_millis(50),
};
let reader = LogDbReader::open(config).await.unwrap();
assert!(reader.refresh_task.is_some());
let close_result =
tokio::time::timeout(Duration::from_secs(1), async { reader.close().await }).await;
assert!(
close_result.is_ok(),
"close() should complete within timeout"
);
}
}