use std::ops::RangeBounds;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use common::clock::{Clock, SystemClock};
use common::coordinator::{Durability, EpochWatcher, EpochWatermarks};
use common::storage::factory::create_storage;
use common::{StorageRuntime, StorageSemantics};
use tokio::sync::RwLock;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use crate::config::{CountOptions, ScanOptions, SegmentConfig};
use crate::error::{AppendResult, Error, Result};
use crate::listing::ListingCache;
use crate::listing::LogKeyIterator;
use crate::model::{AppendOutput, Record, Segment, SegmentId, Sequence};
use crate::range::{normalize_segment_id, normalize_sequence};
use crate::reader::{LogIterator, LogRead, LogReadView};
use crate::segment::{LogSegment, SegmentCache};
use crate::serde::SEQ_BLOCK_KEY;
use crate::writer::{LogWrite, LogWriteHandle, LogWriter, LogWriterConfig, WrittenView};
pub struct LogDb {
handle: LogWriteHandle,
writer_task: JoinHandle<()>,
storage: Arc<dyn common::Storage>,
clock: Arc<dyn Clock>,
read_view: Arc<RwLock<LogReadView>>,
epoch_watcher: EpochWatcher,
flushed_subscriber_task: JoinHandle<()>,
}
impl LogDb {
pub async fn open(config: crate::config::Config) -> Result<Self> {
LogDbBuilder::new(config).build().await
}
#[cfg(feature = "http-server")]
pub fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
self.storage.register_metrics(registry);
}
pub async fn try_append(&self, records: Vec<Record>) -> AppendResult<AppendOutput> {
self.append_inner(records, None).await
}
pub async fn append_timeout(
&self,
records: Vec<Record>,
timeout: Duration,
) -> AppendResult<AppendOutput> {
self.append_inner(records, Some(timeout)).await
}
async fn append_inner(
&self,
records: Vec<Record>,
timeout: Option<Duration>,
) -> AppendResult<AppendOutput> {
if records.is_empty() {
return Ok(AppendOutput { start_sequence: 0 });
}
let write = LogWrite {
records,
timestamp_ms: self.current_time_ms(),
};
let result = if let Some(t) = timeout {
self.handle.append_timeout(write, t).await
} else {
self.handle.try_append(write).await
}?;
Ok(result.expect("non-empty append must produce output"))
}
fn current_time_ms(&self) -> i64 {
self.clock
.now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
#[cfg(feature = "http-server")]
pub(crate) async fn check_storage(&self) -> Result<()> {
let seq_key = Bytes::from_static(&crate::serde::SEQ_BLOCK_KEY);
let _ = self.storage.get(seq_key).await?;
Ok(())
}
#[cfg(test)]
pub(crate) async fn seal_segment(&self) -> Result<()> {
self.handle.force_seal(self.current_time_ms()).await?;
self.flush().await?;
Ok(())
}
pub async fn flush(&self) -> Result<()> {
self.handle.flush().await
}
async fn sync_to_flushed(&self) -> Result<()> {
let target = self.handle.flushed_epoch();
self.epoch_watcher
.clone()
.wait(target, Durability::Written)
.await
.map_err(|_| Error::Internal("writer shut down".into()))?;
Ok(())
}
pub async fn close(self) -> Result<()> {
self.flush().await?;
drop(self.handle);
let _ = self.writer_task.await;
self.flushed_subscriber_task.abort();
self.storage
.close()
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
}
#[cfg(test)]
pub(crate) async fn new(storage: Arc<dyn common::Storage>) -> Result<Self> {
Self::from_storage(storage, SegmentConfig::default()).await
}
async fn from_storage(
storage: Arc<dyn common::Storage>,
segment_config: SegmentConfig,
) -> Result<Self> {
let clock: Arc<dyn Clock> = Arc::new(SystemClock);
let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
.await
.map_err(|e| Error::Internal(e.to_string()))?;
let snapshot = storage
.snapshot()
.await
.map_err(|e| Error::Storage(e.to_string()))?;
let segment_cache = SegmentCache::open(snapshot.as_ref(), segment_config).await?;
let listing_cache = ListingCache::new();
let (writer, mut handle) = LogWriter::new(
storage.clone(),
sequence_allocator,
segment_cache.clone(),
listing_cache,
LogWriterConfig::default(),
)
.await
.map_err(Error::Storage)?;
let written_rx = handle.written_rx();
let writer_task = handle.spawn(writer);
let read_view = Arc::new(RwLock::new(LogReadView::new(
snapshot as Arc<dyn common::StorageRead>,
segment_cache,
)));
let (epoch_watcher, flushed_subscriber_task) =
spawn_flushed_subscriber(written_rx, Arc::clone(&read_view));
Ok(Self {
handle,
writer_task,
storage,
clock,
read_view,
epoch_watcher,
flushed_subscriber_task,
})
}
}
#[async_trait]
impl LogRead for LogDb {
async fn scan_with_options(
&self,
key: Bytes,
seq_range: impl RangeBounds<Sequence> + Send,
options: ScanOptions,
) -> Result<LogIterator> {
self.sync_to_flushed().await?;
let seq_range = normalize_sequence(&seq_range);
let view = self.read_view.read().await;
Ok(view.scan_with_options(key, seq_range, &options))
}
async fn count_with_options(
&self,
_key: Bytes,
_seq_range: impl RangeBounds<Sequence> + Send,
_options: CountOptions,
) -> Result<u64> {
todo!()
}
async fn list_keys(
&self,
segment_range: impl RangeBounds<SegmentId> + Send,
) -> Result<LogKeyIterator> {
self.sync_to_flushed().await?;
let segment_range = normalize_segment_id(&segment_range);
let view = self.read_view.read().await;
view.list_keys(segment_range).await
}
async fn list_segments(
&self,
seq_range: impl RangeBounds<Sequence> + Send,
) -> Result<Vec<Segment>> {
self.sync_to_flushed().await?;
let seq_range = normalize_sequence(&seq_range);
let view = self.read_view.read().await;
Ok(view.list_segments(&seq_range))
}
}
pub struct LogDbBuilder {
config: crate::config::Config,
storage_runtime: StorageRuntime,
}
impl LogDbBuilder {
pub fn new(config: crate::config::Config) -> Self {
Self {
config,
storage_runtime: StorageRuntime::new(),
}
}
pub fn with_storage_runtime(mut self, runtime: StorageRuntime) -> Self {
self.storage_runtime = runtime;
self
}
pub async fn build(self) -> Result<LogDb> {
let storage = create_storage(
&self.config.storage,
self.storage_runtime,
StorageSemantics::new(),
)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
LogDb::from_storage(storage, self.config.segmentation).await
}
}
fn spawn_flushed_subscriber(
mut written_rx: watch::Receiver<WrittenView>,
read_view: Arc<RwLock<LogReadView>>,
) -> (EpochWatcher, JoinHandle<()>) {
let (watermarks, watcher) = EpochWatermarks::new();
let task = tokio::spawn(async move {
let mut last_segments: Option<Arc<[LogSegment]>> = None;
while written_rx.changed().await.is_ok() {
let view = written_rx.borrow_and_update().clone();
let mut rv = read_view.write().await;
rv.update_snapshot(view.snapshot as Arc<dyn common::StorageRead>);
if !last_segments
.as_ref()
.is_some_and(|s| Arc::ptr_eq(s, &view.segments))
{
rv.replace_segments(&view.segments);
last_segments = Some(Arc::clone(&view.segments));
}
watermarks.update_written(view.epoch);
}
});
(watcher, task)
}
#[cfg(test)]
mod tests {
use common::StorageConfig;
use common::storage::factory::create_storage;
use super::*;
use crate::config::Config;
use crate::reader::LogDbReader;
fn test_config() -> Config {
Config {
storage: StorageConfig::InMemory,
..Default::default()
}
}
#[tokio::test]
async fn should_open_log_with_in_memory_config() {
let config = test_config();
let result = LogDb::open(config).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn should_append_single_record() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
}];
log.try_append(records).await.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 0);
assert_eq!(entry.value, Bytes::from("order-1"));
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_append_multiple_records_in_batch() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-2"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-3"),
},
];
log.try_append(records).await.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry0 = iter.next().await.unwrap().unwrap();
assert_eq!(entry0.sequence, 0);
assert_eq!(entry0.value, Bytes::from("order-1"));
let entry1 = iter.next().await.unwrap().unwrap();
assert_eq!(entry1.sequence, 1);
assert_eq!(entry1.value, Bytes::from("order-2"));
let entry2 = iter.next().await.unwrap().unwrap();
assert_eq!(entry2.sequence, 2);
assert_eq!(entry2.value, Bytes::from("order-3"));
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_append_empty_records_without_error() {
let log = LogDb::open(test_config()).await.unwrap();
let records: Vec<Record> = vec![];
let result = log.try_append(records).await;
assert!(result.is_ok());
let mut iter = log.scan(Bytes::from("any-key"), ..).await.unwrap();
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_assign_sequential_sequences_across_appends() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-1"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-2"),
},
])
.await
.unwrap();
log.try_append(vec![Record {
key: Bytes::from("events"),
value: Bytes::from("event-3"),
}])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
let entry0 = iter.next().await.unwrap().unwrap();
assert_eq!(entry0.sequence, 0);
let entry1 = iter.next().await.unwrap().unwrap();
assert_eq!(entry1.sequence, 1);
let entry2 = iter.next().await.unwrap().unwrap();
assert_eq!(entry2.sequence, 2);
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_store_records_with_correct_keys_and_values() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![
Record {
key: Bytes::from("topic-a"),
value: Bytes::from("message-a"),
},
Record {
key: Bytes::from("topic-b"),
value: Bytes::from("message-b"),
},
];
log.try_append(records).await.unwrap();
let mut iter_a = log.scan(Bytes::from("topic-a"), ..).await.unwrap();
let entry_a = iter_a.next().await.unwrap().unwrap();
assert_eq!(entry_a.key, Bytes::from("topic-a"));
assert_eq!(entry_a.value, Bytes::from("message-a"));
assert!(iter_a.next().await.unwrap().is_none());
let mut iter_b = log.scan(Bytes::from("topic-b"), ..).await.unwrap();
let entry_b = iter_b.next().await.unwrap().unwrap();
assert_eq!(entry_b.key, Bytes::from("topic-b"));
assert_eq!(entry_b.value, Bytes::from("message-b"));
assert!(iter_b.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_scan_all_entries_for_key() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-2"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-3"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[0].value, Bytes::from("order-1"));
assert_eq!(entries[1].sequence, 1);
assert_eq!(entries[1].value, Bytes::from("order-2"));
assert_eq!(entries[2].sequence, 2);
assert_eq!(entries[2].value, Bytes::from("order-3"));
}
#[tokio::test]
async fn should_scan_with_sequence_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-0"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-1"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-2"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-3"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-4"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("events"), 1..4).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 1);
assert_eq!(entries[1].sequence, 2);
assert_eq!(entries[2].sequence, 3);
}
#[tokio::test]
async fn should_scan_from_starting_sequence() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-0"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-1"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-2"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("logs"), 1..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 1);
assert_eq!(entries[1].sequence, 2);
}
#[tokio::test]
async fn should_scan_up_to_ending_sequence() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-0"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-1"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-2"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("logs"), ..2).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[1].sequence, 1);
}
#[tokio::test]
async fn should_scan_only_entries_for_specified_key() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a-0"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b-0"),
},
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a-1"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b-1"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("key-a"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].key, Bytes::from("key-a"));
assert_eq!(entries[0].value, Bytes::from("value-a-0"));
assert_eq!(entries[1].key, Bytes::from("key-a"));
assert_eq!(entries[1].value, Bytes::from("value-a-1"));
}
#[tokio::test]
async fn should_return_empty_iterator_for_unknown_key() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("existing"),
value: Bytes::from("value"),
}])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("unknown"), ..).await.unwrap();
let entry = iter.next().await.unwrap();
assert!(entry.is_none());
}
#[tokio::test]
async fn should_return_empty_iterator_for_empty_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("value-0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("value-1"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("key"), 10..20).await.unwrap();
let entry = iter.next().await.unwrap();
assert!(entry.is_none());
}
#[tokio::test]
async fn should_scan_entries_via_log_reader() {
let storage = create_storage(
&StorageConfig::InMemory,
StorageRuntime::new(),
StorageSemantics::new(),
)
.await
.unwrap();
let log = LogDb::new(storage.clone()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-2"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-3"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let reader = LogDbReader::new(storage).await.unwrap();
let mut iter = reader.scan(Bytes::from("orders"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[0].value, Bytes::from("order-1"));
assert_eq!(entries[1].sequence, 1);
assert_eq!(entries[1].value, Bytes::from("order-2"));
assert_eq!(entries[2].sequence, 2);
assert_eq!(entries[2].value, Bytes::from("order-3"));
}
#[tokio::test]
async fn should_scan_across_multiple_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-0"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-2"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-3"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 4);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[0].value, Bytes::from("event-0"));
assert_eq!(entries[1].sequence, 1);
assert_eq!(entries[1].value, Bytes::from("event-1"));
assert_eq!(entries[2].sequence, 2);
assert_eq!(entries[2].value, Bytes::from("event-2"));
assert_eq!(entries[3].sequence, 3);
assert_eq!(entries[3].value, Bytes::from("event-3"));
}
#[tokio::test]
async fn should_scan_range_spanning_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("data"),
value: Bytes::from("seg0-0"),
},
Record {
key: Bytes::from("data"),
value: Bytes::from("seg0-1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("data"),
value: Bytes::from("seg1-2"),
},
Record {
key: Bytes::from("data"),
value: Bytes::from("seg1-3"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("data"),
value: Bytes::from("seg2-4"),
},
Record {
key: Bytes::from("data"),
value: Bytes::from("seg2-5"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("data"), 1..5).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 4);
assert_eq!(entries[0].sequence, 1);
assert_eq!(entries[1].sequence, 2);
assert_eq!(entries[2].sequence, 3);
assert_eq!(entries[3].sequence, 4);
}
#[tokio::test]
async fn should_scan_single_segment_in_multi_segment_log() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v2"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v3"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("key"), 2..4).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 2);
assert_eq!(entries[1].sequence, 3);
}
#[tokio::test]
async fn should_list_keys_returns_iterator() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b"),
},
])
.await
.unwrap();
let _iter = log.list_keys(..).await.unwrap();
}
#[tokio::test]
async fn should_list_keys_via_log_reader() {
let storage = create_storage(
&StorageConfig::InMemory,
StorageRuntime::new(),
StorageSemantics::new(),
)
.await
.unwrap();
let log = LogDb::new(storage.clone()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let reader = LogDbReader::new(storage).await.unwrap();
let _iter = reader.list_keys(..).await.unwrap();
}
#[tokio::test]
async fn should_list_keys_in_single_segment() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b"),
},
Record {
key: Bytes::from("key-c"),
value: Bytes::from("value-c"),
},
])
.await
.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 3);
assert_eq!(keys[0], Bytes::from("key-a"));
assert_eq!(keys[1], Bytes::from("key-b"));
assert_eq!(keys[2], Bytes::from("key-c"));
}
#[tokio::test]
async fn should_list_keys_across_segments_after_roll() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a-0"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b-0"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-c"),
value: Bytes::from("value-c-1"),
},
Record {
key: Bytes::from("key-d"),
value: Bytes::from("value-d-1"),
},
])
.await
.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 4);
assert_eq!(keys[0], Bytes::from("key-a"));
assert_eq!(keys[1], Bytes::from("key-b"));
assert_eq!(keys[2], Bytes::from("key-c"));
assert_eq!(keys[3], Bytes::from("key-d"));
}
#[tokio::test]
async fn should_deduplicate_keys_across_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("shared-key"),
value: Bytes::from("value-0"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("shared-key"),
value: Bytes::from("value-1"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("shared-key"),
value: Bytes::from("value-2"),
}])
.await
.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 1);
assert_eq!(keys[0], Bytes::from("shared-key"));
}
#[tokio::test]
async fn should_list_keys_in_lexicographic_order() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("zebra"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("apple"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("mango"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys[0], Bytes::from("apple"));
assert_eq!(keys[1], Bytes::from("mango"));
assert_eq!(keys[2], Bytes::from("zebra"));
}
#[tokio::test]
async fn should_list_empty_when_no_entries() {
let log = LogDb::open(test_config()).await.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_list_keys_respects_segment_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-seg0"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("key-seg0-b"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-seg1"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("key-seg1-b"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-seg2"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("key-seg2-b"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
let mut iter = log.list_keys(1..2).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 2);
assert_eq!(keys[0], Bytes::from("key-seg1"));
assert_eq!(keys[1], Bytes::from("key-seg1-b"));
}
#[tokio::test]
async fn should_list_segments_returns_empty_when_no_segments() {
let log = LogDb::open(test_config()).await.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert!(segments.is_empty());
}
#[tokio::test]
async fn should_list_segments_returns_single_segment() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value"),
}])
.await
.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[0].start_seq, 0);
}
#[tokio::test]
async fn should_list_segments_returns_multiple_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-0"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-1"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-2"),
}])
.await
.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 3);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[0].start_seq, 0);
assert_eq!(segments[1].id, 1);
assert_eq!(segments[1].start_seq, 1);
assert_eq!(segments[2].id, 2);
assert_eq!(segments[2].start_seq, 2);
}
#[tokio::test]
async fn should_list_segments_filters_by_sequence_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v2"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v3"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v4"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v5"),
},
])
.await
.unwrap();
let segments = log.list_segments(2..4).await.unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id, 1);
assert_eq!(segments[0].start_seq, 2);
}
#[tokio::test]
async fn should_list_segments_via_log_reader() {
let storage = create_storage(
&StorageConfig::InMemory,
StorageRuntime::new(),
StorageSemantics::new(),
)
.await
.unwrap();
let log = LogDb::new(storage.clone()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-0"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-1"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let reader = LogDbReader::new(storage).await.unwrap();
let segments = reader.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[1].id, 1);
}
#[tokio::test]
async fn should_list_segments_includes_start_time() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value"),
}])
.await
.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 1);
assert!(segments[0].start_time_ms > 1577836800000); }
#[tokio::test]
async fn should_try_append_single_record() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
}];
let result = log.try_append(records).await.unwrap();
assert_eq!(result.start_sequence, 0);
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.value, Bytes::from("order-1"));
}
#[tokio::test]
async fn should_append_timeout_single_record() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
}];
let result = log
.append_timeout(records, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(result.start_sequence, 0);
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.value, Bytes::from("order-1"));
}
#[tokio::test]
async fn should_return_empty_records_on_try_append_empty() {
let log = LogDb::open(test_config()).await.unwrap();
let result = log.try_append(vec![]).await.unwrap();
assert_eq!(result.start_sequence, 0);
}
#[tokio::test]
async fn should_return_empty_records_on_append_timeout_empty() {
let log = LogDb::open(test_config()).await.unwrap();
let result = log
.append_timeout(vec![], Duration::from_secs(1))
.await
.unwrap();
assert_eq!(result.start_sequence, 0);
}
#[tokio::test]
async fn should_scan_without_flush() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v1"),
},
])
.await
.unwrap();
let mut iter = log.scan(Bytes::from("key"), ..).await.unwrap();
let e0 = iter.next().await.unwrap().unwrap();
assert_eq!(e0.sequence, 0);
assert_eq!(e0.value, Bytes::from("v0"));
let e1 = iter.next().await.unwrap().unwrap();
assert_eq!(e1.sequence, 1);
assert_eq!(e1.value, Bytes::from("v1"));
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_list_keys_without_flush() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("alpha"),
value: Bytes::from("v"),
},
Record {
key: Bytes::from("beta"),
value: Bytes::from("v"),
},
])
.await
.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys, vec![Bytes::from("alpha"), Bytes::from("beta")]);
}
#[tokio::test]
async fn should_list_segments_without_flush() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value"),
}])
.await
.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[0].start_seq, 0);
}
}