use std::ops::RangeBounds;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use common::clock::{Clock, SystemClock};
use common::coordinator::{
Durability, ViewMonitor, ViewSubscriber, WriteCoordinator, WriteCoordinatorConfig,
WriteCoordinatorHandle, WriteError,
};
use common::storage::factory::create_storage;
use common::{StorageRuntime, StorageSemantics};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::config::{CountOptions, ScanOptions};
use crate::delta::{LogContext, LogDelta, LogFlusher, LogWrite};
use crate::error::{AppendError, AppendResult, Error, Result};
use crate::listing::ListingCache;
use crate::listing::LogKeyIterator;
use crate::model::{AppendOutput, Record, Segment, SegmentId, Sequence};
use crate::range::{normalize_segment_id, normalize_sequence};
use crate::reader::{LogIterator, LogRead};
use crate::segment::SegmentCache;
use crate::storage::LogStorage;
const WRITE_CHANNEL: &str = "write";
pub struct LogDb {
handle: WriteCoordinatorHandle<LogDelta>,
coordinator: WriteCoordinator<LogDelta, LogFlusher>,
storage: LogStorage,
clock: Arc<dyn Clock>,
read_view: Arc<RwLock<crate::reader::LogReadView>>,
view_monitor: ViewMonitor,
flush_subscriber_task: JoinHandle<()>,
}
impl LogDb {
pub async fn open(config: crate::config::Config) -> Result<Self> {
LogDbBuilder::new(config).build().await
}
#[cfg(feature = "http-server")]
pub fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
self.storage.register_metrics(registry);
}
pub async fn try_append(&self, records: Vec<Record>) -> AppendResult<AppendOutput> {
self.append_inner(records, None).await
}
pub async fn append_timeout(
&self,
records: Vec<Record>,
timeout: Duration,
) -> AppendResult<AppendOutput> {
self.append_inner(records, Some(timeout)).await
}
async fn append_inner(
&self,
records: Vec<Record>,
timeout: Option<Duration>,
) -> AppendResult<AppendOutput> {
if records.is_empty() {
return Ok(AppendOutput { start_sequence: 0 });
}
let write = LogWrite {
records,
timestamp_ms: self.current_time_ms(),
force_seal: false,
};
let mut write_handle = if let Some(t) = timeout {
self.handle.write_timeout(write, t).await
} else {
self.handle.try_write(write).await
}
.map_err(|e| match e {
WriteError::Backpressure(w) => AppendError::QueueFull(w.records),
WriteError::TimeoutError(w) => AppendError::Timeout(w.records),
WriteError::Shutdown => AppendError::Shutdown,
_ => unreachable!(),
})?;
let output = write_handle
.wait(Durability::Applied)
.await
.map_err(|e| match e {
WriteError::Shutdown => AppendError::Shutdown,
WriteError::ApplyError(_, msg) => AppendError::InvalidRecord(msg),
_ => unreachable!(),
})?;
Ok(output.expect("non-empty append must produce output"))
}
fn current_time_ms(&self) -> i64 {
self.clock
.now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
#[cfg(feature = "http-server")]
pub(crate) async fn check_storage(&self) -> Result<()> {
let seq_key = Bytes::from_static(&crate::serde::SEQ_BLOCK_KEY);
let _ = self.storage.as_read().get(seq_key).await?;
Ok(())
}
#[cfg(test)]
pub(crate) async fn seal_segment(&self) -> Result<()> {
let write = LogWrite {
records: vec![],
timestamp_ms: self.current_time_ms(),
force_seal: true,
};
self.handle
.try_write(write)
.await
.map_err(|e| e.discard_inner())?;
self.flush().await?;
Ok(())
}
pub async fn flush(&self) -> Result<()> {
let mut flush_handle = self.handle.flush(true).await?;
flush_handle.wait(Durability::Durable).await?;
Ok(())
}
#[cfg(test)]
pub(crate) async fn flush_soft(&self) -> Result<()> {
let mut flush_handle = self.handle.flush(false).await?;
flush_handle.wait(Durability::Flushed).await?;
Ok(())
}
async fn sync_reader_to_flushed(&self) -> Result<()> {
let epoch = self.handle.flushed_epoch();
self.view_monitor
.clone()
.wait(epoch, Durability::Flushed)
.await
.map_err(|e| Error::Internal(e.to_string()))?;
Ok(())
}
pub async fn close(self) -> Result<()> {
self.coordinator.stop().await.map_err(Error::Internal)?;
self.flush_subscriber_task.abort();
self.storage.close().await?;
Ok(())
}
#[cfg(test)]
pub(crate) async fn new(storage: Arc<dyn common::Storage>) -> Result<Self> {
use crate::config::SegmentConfig;
use crate::reader::LogReadView;
use crate::serde::SEQ_BLOCK_KEY;
use crate::storage::LogStorageRead;
let log_storage = LogStorage::new(storage.clone());
let clock: Arc<dyn Clock> = Arc::new(SystemClock);
let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
.await
.map_err(|e| Error::Internal(e.to_string()))?;
let snapshot = log_storage.snapshot().await?;
let segment_cache = {
let bootstrap = LogStorageRead::new(snapshot.clone() as Arc<dyn common::StorageRead>);
SegmentCache::open(&bootstrap, SegmentConfig::default()).await?
};
let listing_cache = ListingCache::new();
let context = LogContext {
sequence_allocator,
segment_cache: segment_cache.clone(),
listing_cache,
};
let flusher = LogFlusher::new(log_storage.clone());
let mut coordinator = WriteCoordinator::new(
WriteCoordinatorConfig::default(),
vec![WRITE_CHANNEL.to_string()],
context,
snapshot,
flusher,
);
let handle = coordinator.handle(WRITE_CHANNEL);
let (mut view_subscriber, monitor) = coordinator.subscribe();
let initial_view = view_subscriber.initialize();
let read_view = Arc::new(RwLock::new(LogReadView::new(
LogStorageRead::new(initial_view.snapshot.clone() as Arc<dyn common::StorageRead>),
segment_cache,
)));
let flush_subscriber_task = spawn_flush_subscriber(view_subscriber, Arc::clone(&read_view));
coordinator.start();
Ok(Self {
handle,
coordinator,
storage: log_storage,
clock,
read_view,
view_monitor: monitor,
flush_subscriber_task,
})
}
}
#[async_trait]
impl LogRead for LogDb {
async fn scan_with_options(
&self,
key: Bytes,
seq_range: impl RangeBounds<Sequence> + Send,
options: ScanOptions,
) -> Result<LogIterator> {
self.sync_reader_to_flushed().await?;
let seq_range = normalize_sequence(&seq_range);
let view = self.read_view.read().await;
Ok(view.scan_with_options(key, seq_range, &options))
}
async fn count_with_options(
&self,
_key: Bytes,
_seq_range: impl RangeBounds<Sequence> + Send,
_options: CountOptions,
) -> Result<u64> {
todo!()
}
async fn list_keys(
&self,
segment_range: impl RangeBounds<SegmentId> + Send,
) -> Result<LogKeyIterator> {
self.sync_reader_to_flushed().await?;
let segment_range = normalize_segment_id(&segment_range);
let view = self.read_view.read().await;
view.list_keys(segment_range).await
}
async fn list_segments(
&self,
seq_range: impl RangeBounds<Sequence> + Send,
) -> Result<Vec<Segment>> {
self.sync_reader_to_flushed().await?;
let seq_range = normalize_sequence(&seq_range);
let view = self.read_view.read().await;
Ok(view.list_segments(&seq_range))
}
}
pub struct LogDbBuilder {
config: crate::config::Config,
storage_runtime: StorageRuntime,
}
impl LogDbBuilder {
pub fn new(config: crate::config::Config) -> Self {
Self {
config,
storage_runtime: StorageRuntime::new(),
}
}
pub fn with_storage_runtime(mut self, runtime: StorageRuntime) -> Self {
self.storage_runtime = runtime;
self
}
pub async fn build(self) -> Result<LogDb> {
use crate::reader::LogReadView;
use crate::serde::SEQ_BLOCK_KEY;
use crate::storage::LogStorageRead;
let storage = create_storage(
&self.config.storage,
self.storage_runtime,
StorageSemantics::new(),
)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
let log_storage = LogStorage::new(storage.clone());
let clock: Arc<dyn Clock> = Arc::new(SystemClock);
let seq_key = Bytes::from_static(&SEQ_BLOCK_KEY);
let sequence_allocator = common::SequenceAllocator::load(storage.as_ref(), seq_key)
.await
.map_err(|e| Error::Internal(e.to_string()))?;
let snapshot = log_storage.snapshot().await?;
let segment_cache = {
let bootstrap = LogStorageRead::new(snapshot.clone() as Arc<dyn common::StorageRead>);
SegmentCache::open(&bootstrap, self.config.segmentation).await?
};
let listing_cache = ListingCache::new();
let context = LogContext {
sequence_allocator,
segment_cache: segment_cache.clone(),
listing_cache,
};
let flusher = LogFlusher::new(log_storage.clone());
let mut coordinator = WriteCoordinator::new(
WriteCoordinatorConfig::default(),
vec![WRITE_CHANNEL.to_string()],
context,
snapshot,
flusher,
);
let handle = coordinator.handle(WRITE_CHANNEL);
let (mut view_subscriber, monitor) = coordinator.subscribe();
let initial_view = view_subscriber.initialize();
let read_view = Arc::new(RwLock::new(LogReadView::new(
LogStorageRead::new(initial_view.snapshot.clone() as Arc<dyn common::StorageRead>),
segment_cache,
)));
let flush_subscriber_task = spawn_flush_subscriber(view_subscriber, Arc::clone(&read_view));
coordinator.start();
Ok(LogDb {
handle,
coordinator,
storage: log_storage,
clock,
read_view,
view_monitor: monitor,
flush_subscriber_task,
})
}
}
fn spawn_flush_subscriber(
mut subscriber: ViewSubscriber<LogDelta>,
read_view: Arc<RwLock<crate::reader::LogReadView>>,
) -> JoinHandle<()> {
tokio::spawn(async move {
while let Ok(view) = subscriber.recv().await {
let Some(flushed) = &view.last_flushed_delta else {
continue;
};
let mut rv = read_view.write().await;
rv.update_snapshot(view.snapshot.clone());
if !flushed.val.new_segments.is_empty() {
rv.apply_new_segments(&flushed.val.new_segments);
}
subscriber.update_flushed(flushed.epoch_range.end - 1);
}
})
}
#[cfg(test)]
mod tests {
use common::StorageConfig;
use common::storage::factory::create_storage;
use super::*;
use crate::config::Config;
use crate::reader::LogDbReader;
fn test_config() -> Config {
Config {
storage: StorageConfig::InMemory,
..Default::default()
}
}
#[tokio::test]
async fn should_open_log_with_in_memory_config() {
let config = test_config();
let result = LogDb::open(config).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn should_append_single_record() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
}];
log.try_append(records).await.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.sequence, 0);
assert_eq!(entry.value, Bytes::from("order-1"));
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_append_multiple_records_in_batch() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-2"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-3"),
},
];
log.try_append(records).await.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry0 = iter.next().await.unwrap().unwrap();
assert_eq!(entry0.sequence, 0);
assert_eq!(entry0.value, Bytes::from("order-1"));
let entry1 = iter.next().await.unwrap().unwrap();
assert_eq!(entry1.sequence, 1);
assert_eq!(entry1.value, Bytes::from("order-2"));
let entry2 = iter.next().await.unwrap().unwrap();
assert_eq!(entry2.sequence, 2);
assert_eq!(entry2.value, Bytes::from("order-3"));
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_append_empty_records_without_error() {
let log = LogDb::open(test_config()).await.unwrap();
let records: Vec<Record> = vec![];
let result = log.try_append(records).await;
assert!(result.is_ok());
let mut iter = log.scan(Bytes::from("any-key"), ..).await.unwrap();
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_assign_sequential_sequences_across_appends() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-1"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-2"),
},
])
.await
.unwrap();
log.try_append(vec![Record {
key: Bytes::from("events"),
value: Bytes::from("event-3"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
let entry0 = iter.next().await.unwrap().unwrap();
assert_eq!(entry0.sequence, 0);
let entry1 = iter.next().await.unwrap().unwrap();
assert_eq!(entry1.sequence, 1);
let entry2 = iter.next().await.unwrap().unwrap();
assert_eq!(entry2.sequence, 2);
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_store_records_with_correct_keys_and_values() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![
Record {
key: Bytes::from("topic-a"),
value: Bytes::from("message-a"),
},
Record {
key: Bytes::from("topic-b"),
value: Bytes::from("message-b"),
},
];
log.try_append(records).await.unwrap();
log.flush().await.unwrap();
let mut iter_a = log.scan(Bytes::from("topic-a"), ..).await.unwrap();
let entry_a = iter_a.next().await.unwrap().unwrap();
assert_eq!(entry_a.key, Bytes::from("topic-a"));
assert_eq!(entry_a.value, Bytes::from("message-a"));
assert!(iter_a.next().await.unwrap().is_none());
let mut iter_b = log.scan(Bytes::from("topic-b"), ..).await.unwrap();
let entry_b = iter_b.next().await.unwrap().unwrap();
assert_eq!(entry_b.key, Bytes::from("topic-b"));
assert_eq!(entry_b.value, Bytes::from("message-b"));
assert!(iter_b.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_scan_all_entries_for_key() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-2"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-3"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[0].value, Bytes::from("order-1"));
assert_eq!(entries[1].sequence, 1);
assert_eq!(entries[1].value, Bytes::from("order-2"));
assert_eq!(entries[2].sequence, 2);
assert_eq!(entries[2].value, Bytes::from("order-3"));
}
#[tokio::test]
async fn should_scan_with_sequence_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-0"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-1"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-2"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-3"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-4"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("events"), 1..4).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 1);
assert_eq!(entries[1].sequence, 2);
assert_eq!(entries[2].sequence, 3);
}
#[tokio::test]
async fn should_scan_from_starting_sequence() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-0"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-1"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-2"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("logs"), 1..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 1);
assert_eq!(entries[1].sequence, 2);
}
#[tokio::test]
async fn should_scan_up_to_ending_sequence() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-0"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-1"),
},
Record {
key: Bytes::from("logs"),
value: Bytes::from("log-2"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("logs"), ..2).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[1].sequence, 1);
}
#[tokio::test]
async fn should_scan_only_entries_for_specified_key() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a-0"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b-0"),
},
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a-1"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b-1"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("key-a"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].key, Bytes::from("key-a"));
assert_eq!(entries[0].value, Bytes::from("value-a-0"));
assert_eq!(entries[1].key, Bytes::from("key-a"));
assert_eq!(entries[1].value, Bytes::from("value-a-1"));
}
#[tokio::test]
async fn should_return_empty_iterator_for_unknown_key() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("existing"),
value: Bytes::from("value"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("unknown"), ..).await.unwrap();
let entry = iter.next().await.unwrap();
assert!(entry.is_none());
}
#[tokio::test]
async fn should_return_empty_iterator_for_empty_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("value-0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("value-1"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("key"), 10..20).await.unwrap();
let entry = iter.next().await.unwrap();
assert!(entry.is_none());
}
#[tokio::test]
async fn should_scan_entries_via_log_reader() {
let storage = create_storage(
&StorageConfig::InMemory,
StorageRuntime::new(),
StorageSemantics::new(),
)
.await
.unwrap();
let log = LogDb::new(storage.clone()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-2"),
},
Record {
key: Bytes::from("orders"),
value: Bytes::from("order-3"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let reader = LogDbReader::new(storage).await.unwrap();
let mut iter = reader.scan(Bytes::from("orders"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[0].value, Bytes::from("order-1"));
assert_eq!(entries[1].sequence, 1);
assert_eq!(entries[1].value, Bytes::from("order-2"));
assert_eq!(entries[2].sequence, 2);
assert_eq!(entries[2].value, Bytes::from("order-3"));
}
#[tokio::test]
async fn should_scan_across_multiple_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-0"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("events"),
value: Bytes::from("event-2"),
},
Record {
key: Bytes::from("events"),
value: Bytes::from("event-3"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("events"), ..).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 4);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[0].value, Bytes::from("event-0"));
assert_eq!(entries[1].sequence, 1);
assert_eq!(entries[1].value, Bytes::from("event-1"));
assert_eq!(entries[2].sequence, 2);
assert_eq!(entries[2].value, Bytes::from("event-2"));
assert_eq!(entries[3].sequence, 3);
assert_eq!(entries[3].value, Bytes::from("event-3"));
}
#[tokio::test]
async fn should_scan_range_spanning_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("data"),
value: Bytes::from("seg0-0"),
},
Record {
key: Bytes::from("data"),
value: Bytes::from("seg0-1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("data"),
value: Bytes::from("seg1-2"),
},
Record {
key: Bytes::from("data"),
value: Bytes::from("seg1-3"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("data"),
value: Bytes::from("seg2-4"),
},
Record {
key: Bytes::from("data"),
value: Bytes::from("seg2-5"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("data"), 1..5).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 4);
assert_eq!(entries[0].sequence, 1);
assert_eq!(entries[1].sequence, 2);
assert_eq!(entries[2].sequence, 3);
assert_eq!(entries[3].sequence, 4);
}
#[tokio::test]
async fn should_scan_single_segment_in_multi_segment_log() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v2"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v3"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("key"), 2..4).await.unwrap();
let mut entries = vec![];
while let Some(entry) = iter.next().await.unwrap() {
entries.push(entry);
}
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 2);
assert_eq!(entries[1].sequence, 3);
}
#[tokio::test]
async fn should_list_keys_returns_iterator() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let _iter = log.list_keys(..).await.unwrap();
}
#[tokio::test]
async fn should_list_keys_via_log_reader() {
let storage = create_storage(
&StorageConfig::InMemory,
StorageRuntime::new(),
StorageSemantics::new(),
)
.await
.unwrap();
let log = LogDb::new(storage.clone()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let reader = LogDbReader::new(storage).await.unwrap();
let _iter = reader.list_keys(..).await.unwrap();
}
#[tokio::test]
async fn should_list_keys_in_single_segment() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b"),
},
Record {
key: Bytes::from("key-c"),
value: Bytes::from("value-c"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 3);
assert_eq!(keys[0], Bytes::from("key-a"));
assert_eq!(keys[1], Bytes::from("key-b"));
assert_eq!(keys[2], Bytes::from("key-c"));
}
#[tokio::test]
async fn should_list_keys_across_segments_after_roll() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-a"),
value: Bytes::from("value-a-0"),
},
Record {
key: Bytes::from("key-b"),
value: Bytes::from("value-b-0"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-c"),
value: Bytes::from("value-c-1"),
},
Record {
key: Bytes::from("key-d"),
value: Bytes::from("value-d-1"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 4);
assert_eq!(keys[0], Bytes::from("key-a"));
assert_eq!(keys[1], Bytes::from("key-b"));
assert_eq!(keys[2], Bytes::from("key-c"));
assert_eq!(keys[3], Bytes::from("key-d"));
}
#[tokio::test]
async fn should_deduplicate_keys_across_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("shared-key"),
value: Bytes::from("value-0"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("shared-key"),
value: Bytes::from("value-1"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("shared-key"),
value: Bytes::from("value-2"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 1);
assert_eq!(keys[0], Bytes::from("shared-key"));
}
#[tokio::test]
async fn should_list_keys_in_lexicographic_order() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("zebra"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("apple"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("mango"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys[0], Bytes::from("apple"));
assert_eq!(keys[1], Bytes::from("mango"));
assert_eq!(keys[2], Bytes::from("zebra"));
}
#[tokio::test]
async fn should_list_empty_when_no_entries() {
let log = LogDb::open(test_config()).await.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_list_keys_respects_segment_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-seg0"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("key-seg0-b"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-seg1"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("key-seg1-b"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key-seg2"),
value: Bytes::from("value"),
},
Record {
key: Bytes::from("key-seg2-b"),
value: Bytes::from("value"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let mut iter = log.list_keys(1..2).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys.len(), 2);
assert_eq!(keys[0], Bytes::from("key-seg1"));
assert_eq!(keys[1], Bytes::from("key-seg1-b"));
}
#[tokio::test]
async fn should_list_segments_returns_empty_when_no_segments() {
let log = LogDb::open(test_config()).await.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert!(segments.is_empty());
}
#[tokio::test]
async fn should_list_segments_returns_single_segment() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[0].start_seq, 0);
}
#[tokio::test]
async fn should_list_segments_returns_multiple_segments() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-0"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-1"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-2"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 3);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[0].start_seq, 0);
assert_eq!(segments[1].id, 1);
assert_eq!(segments[1].start_seq, 1);
assert_eq!(segments[2].id, 2);
assert_eq!(segments[2].start_seq, 2);
}
#[tokio::test]
async fn should_list_segments_filters_by_sequence_range() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v1"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v2"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v3"),
},
])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v4"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v5"),
},
])
.await
.unwrap();
log.flush().await.unwrap();
let segments = log.list_segments(2..4).await.unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id, 1);
assert_eq!(segments[0].start_seq, 2);
}
#[tokio::test]
async fn should_list_segments_via_log_reader() {
let storage = create_storage(
&StorageConfig::InMemory,
StorageRuntime::new(),
StorageSemantics::new(),
)
.await
.unwrap();
let log = LogDb::new(storage.clone()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-0"),
}])
.await
.unwrap();
log.seal_segment().await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value-1"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let reader = LogDbReader::new(storage).await.unwrap();
let segments = reader.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[1].id, 1);
}
#[tokio::test]
async fn should_list_segments_includes_start_time() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value"),
}])
.await
.unwrap();
log.flush().await.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 1);
assert!(segments[0].start_time_ms > 1577836800000); }
#[tokio::test]
async fn should_try_append_single_record() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
}];
let result = log.try_append(records).await.unwrap();
assert_eq!(result.start_sequence, 0);
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.value, Bytes::from("order-1"));
}
#[tokio::test]
async fn should_append_timeout_single_record() {
let log = LogDb::open(test_config()).await.unwrap();
let records = vec![Record {
key: Bytes::from("orders"),
value: Bytes::from("order-1"),
}];
let result = log
.append_timeout(records, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(result.start_sequence, 0);
log.flush().await.unwrap();
let mut iter = log.scan(Bytes::from("orders"), ..).await.unwrap();
let entry = iter.next().await.unwrap().unwrap();
assert_eq!(entry.value, Bytes::from("order-1"));
}
#[tokio::test]
async fn should_return_empty_records_on_try_append_empty() {
let log = LogDb::open(test_config()).await.unwrap();
let result = log.try_append(vec![]).await.unwrap();
assert_eq!(result.start_sequence, 0);
}
#[tokio::test]
async fn should_return_empty_records_on_append_timeout_empty() {
let log = LogDb::open(test_config()).await.unwrap();
let result = log
.append_timeout(vec![], Duration::from_secs(1))
.await
.unwrap();
assert_eq!(result.start_sequence, 0);
}
#[tokio::test]
async fn should_scan_after_flush_soft() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("key"),
value: Bytes::from("v0"),
},
Record {
key: Bytes::from("key"),
value: Bytes::from("v1"),
},
])
.await
.unwrap();
log.flush_soft().await.unwrap();
let mut iter = log.scan(Bytes::from("key"), ..).await.unwrap();
let e0 = iter.next().await.unwrap().unwrap();
assert_eq!(e0.sequence, 0);
assert_eq!(e0.value, Bytes::from("v0"));
let e1 = iter.next().await.unwrap().unwrap();
assert_eq!(e1.sequence, 1);
assert_eq!(e1.value, Bytes::from("v1"));
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn should_list_keys_after_flush_soft() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![
Record {
key: Bytes::from("alpha"),
value: Bytes::from("v"),
},
Record {
key: Bytes::from("beta"),
value: Bytes::from("v"),
},
])
.await
.unwrap();
log.flush_soft().await.unwrap();
let mut iter = log.list_keys(..).await.unwrap();
let mut keys = vec![];
while let Some(key) = iter.next().await.unwrap() {
keys.push(key.key);
}
assert_eq!(keys, vec![Bytes::from("alpha"), Bytes::from("beta")]);
}
#[tokio::test]
async fn should_list_segments_after_flush_soft() {
let log = LogDb::open(test_config()).await.unwrap();
log.try_append(vec![Record {
key: Bytes::from("key"),
value: Bytes::from("value"),
}])
.await
.unwrap();
log.flush_soft().await.unwrap();
let segments = log.list_segments(..).await.unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].id, 0);
assert_eq!(segments[0].start_seq, 0);
}
}