use std::ops::RangeBounds;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use object_store::path::Path;
use object_store::ObjectStore;
use crate::db_state::SsTableId;
use crate::format::sst::SsTableFormat;
use crate::iter::{EmptyIterator, RowEntryIterator};
use crate::manifest::SsTableView;
use crate::object_stores::ObjectStores;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::tablestore::TableStore;
use crate::types::RowEntry;
pub struct WalFileIterator {
iter: Box<dyn RowEntryIterator + 'static>,
}
impl WalFileIterator {
fn new(iter: Box<dyn RowEntryIterator + 'static>) -> Self {
Self { iter }
}
pub async fn next(&mut self) -> Result<Option<RowEntry>, crate::Error> {
self.iter.next().await.map_err(Into::into)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WalFileMetadata {
pub last_modified_dt: DateTime<Utc>,
pub size_bytes: u64,
pub location: Path,
}
pub struct WalFile {
pub id: u64,
table_store: Arc<TableStore>,
}
impl WalFile {
pub async fn metadata(&self) -> Result<WalFileMetadata, crate::Error> {
let metadata = self.table_store.metadata(&SsTableId::Wal(self.id)).await?;
Ok(WalFileMetadata {
last_modified_dt: metadata.last_modified,
size_bytes: metadata.size,
location: metadata.location,
})
}
pub async fn iterator(&self) -> Result<WalFileIterator, crate::Error> {
let sst = self.table_store.open_sst(&SsTableId::Wal(self.id)).await?;
let iter = match SstIterator::new_owned_initialized(
..,
SsTableView::identity(sst),
Arc::clone(&self.table_store),
SstIteratorOptions {
blocks_to_fetch: 65_536,
..Default::default()
},
)
.await
{
Ok(Some(iter)) => Box::new(iter) as Box<dyn RowEntryIterator + 'static>,
Ok(None) => Box::new(EmptyIterator::new()) as Box<dyn RowEntryIterator + 'static>,
Err(err) => return Err(err.into()),
};
Ok(WalFileIterator::new(iter))
}
pub fn next_id(&self) -> u64 {
self.id + 1
}
pub fn next_file(&self) -> Self {
Self {
id: self.next_id(),
table_store: Arc::clone(&self.table_store),
}
}
}
pub struct WalReader {
table_store: Arc<TableStore>,
}
impl WalReader {
pub fn new<P: Into<Path>>(path: P, object_store: Arc<dyn ObjectStore>) -> Self {
let sst_format = SsTableFormat::default();
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store, None),
sst_format,
path.into(),
None,
));
Self { table_store }
}
pub async fn list<R: RangeBounds<u64>>(&self, range: R) -> Result<Vec<WalFile>, crate::Error> {
let result = self.table_store.list_wal_ssts(range).await;
Ok(result?
.into_iter()
.map(|wal_file| WalFile {
id: wal_file.id.unwrap_wal_id(),
table_store: Arc::clone(&self.table_store),
})
.collect())
}
pub fn get(&self, id: u64) -> WalFile {
WalFile {
id,
table_store: Arc::clone(&self.table_store),
}
}
}
#[cfg(test)]
mod tests {
use std::error::Error;
use super::*;
use crate::config::{FlushOptions, FlushType, PutOptions, WriteOptions};
use crate::test_utils::StringConcatMergeOperator;
use crate::types::ValueDeletable;
use crate::Db;
use object_store::memory::InMemory;
fn has_not_found_object_store_source(err: &crate::Error) -> bool {
err.source()
.and_then(|source| source.downcast_ref::<object_store::Error>())
.is_some_and(|source| matches!(source, object_store::Error::NotFound { .. }))
|| err
.source()
.and_then(|source| source.downcast_ref::<Arc<object_store::Error>>())
.is_some_and(|source| {
matches!(source.as_ref(), object_store::Error::NotFound { .. })
})
}
#[tokio::test]
async fn test_list_and_iterator() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader";
let db = Db::open(path, main_store.clone()).await.unwrap();
db.put_with_options(
b"k2",
b"v2",
&PutOptions::default(),
&WriteOptions::default(),
)
.await
.unwrap();
db.put_with_options(
b"k1",
b"v1",
&PutOptions::default(),
&WriteOptions::default(),
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let wal_reader = WalReader::new(path, main_store.clone());
let wal_files = wal_reader.list(..).await.unwrap();
assert!(!wal_files.is_empty());
let mut rows = Vec::new();
for wal_file in wal_files {
let mut iter = wal_file.iterator().await.unwrap();
while let Some(entry) = iter.next().await.unwrap() {
rows.push(entry);
}
}
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].key.as_ref(), b"k2");
assert!(matches!(
&rows[0].value,
ValueDeletable::Value(value) if value.as_ref() == b"v2"
));
assert_eq!(rows[1].key.as_ref(), b"k1");
assert!(matches!(
&rows[1].value,
ValueDeletable::Value(value) if value.as_ref() == b"v1"
));
}
#[tokio::test]
async fn test_reads_from_wal_object_store() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let wal_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_store";
let db = Db::builder(path, main_store.clone())
.with_wal_object_store(wal_store.clone())
.build()
.await
.unwrap();
db.put_with_options(
b"k1",
b"v1",
&PutOptions::default(),
&WriteOptions::default(),
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let wal_reader = WalReader::new(path, wal_store.clone());
let wal_files = wal_reader.list(..).await.unwrap();
assert!(!wal_files.is_empty());
let mut rows = Vec::new();
for wal_file in wal_files {
let mut iter = wal_file.iterator().await.unwrap();
while let Some(entry) = iter.next().await.unwrap() {
rows.push(entry);
}
}
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].key.as_ref(), b"k1");
assert!(matches!(
&rows[0].value,
ValueDeletable::Value(value) if value.as_ref() == b"v1"
));
}
#[tokio::test]
async fn test_wal_file_metadata_matches_object_store_metadata() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader_metadata";
let db = Db::open(path, main_store.clone()).await.unwrap();
db.put_with_options(
b"k1",
b"v1",
&PutOptions::default(),
&WriteOptions::default(),
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let wal_reader = WalReader::new(path, main_store.clone());
let wal_files = wal_reader.list(..).await.unwrap();
assert!(!wal_files.is_empty());
for wal_file in wal_files {
let wal_metadata = wal_file.metadata().await.unwrap();
let object_metadata = main_store.head(&wal_metadata.location).await.unwrap();
assert_eq!(wal_metadata.last_modified_dt, object_metadata.last_modified);
assert_eq!(wal_metadata.size_bytes, object_metadata.size);
assert_eq!(wal_metadata.location, object_metadata.location);
}
}
#[tokio::test]
async fn test_get_returns_wal_file_for_id() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader_get_missing";
let wal_reader = WalReader::new(path, main_store);
let wal_file = wal_reader.get(42);
assert_eq!(wal_file.id, 42);
}
#[test]
fn test_wal_file_next_id_and_next_file() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader_next_id";
let wal_reader = WalReader::new(path, main_store);
let wal_file = wal_reader.get(41);
assert_eq!(wal_file.next_id(), 42);
let next = wal_file.next_file();
assert_eq!(next.id, 42);
assert!(Arc::ptr_eq(&wal_file.table_store, &next.table_store));
}
#[tokio::test]
async fn test_metadata_returns_error_when_file_deleted() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader_missing_metadata";
let db = Db::open(path, main_store.clone()).await.unwrap();
db.put_with_options(
b"k1",
b"v1",
&PutOptions::default(),
&WriteOptions::default(),
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let wal_reader = WalReader::new(path, main_store.clone());
let wal_file = wal_reader
.list(..)
.await
.unwrap()
.into_iter()
.next()
.expect("expected at least one WAL file");
let wal_metadata = wal_file.metadata().await.unwrap();
main_store.delete(&wal_metadata.location).await.unwrap();
let err = wal_file
.metadata()
.await
.expect_err("expected metadata() to fail after deleting WAL file");
assert_eq!(err.kind(), crate::ErrorKind::Data);
assert!(has_not_found_object_store_source(&err));
}
#[tokio::test]
async fn test_iterator_returns_error_when_file_deleted() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader_missing_iterator";
let db = Db::open(path, main_store.clone()).await.unwrap();
db.put_with_options(
b"k1",
b"v1",
&PutOptions::default(),
&WriteOptions::default(),
)
.await
.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let wal_reader = WalReader::new(path, main_store.clone());
let wal_file = wal_reader
.list(..)
.await
.unwrap()
.into_iter()
.next()
.expect("expected at least one WAL file");
let wal_metadata = wal_file.metadata().await.unwrap();
main_store.delete(&wal_metadata.location).await.unwrap();
let err = match wal_file.iterator().await {
Ok(_) => panic!("expected iterator() to fail after deleting WAL file"),
Err(err) => err,
};
assert_eq!(err.kind(), crate::ErrorKind::Data);
assert!(has_not_found_object_store_source(&err));
}
#[tokio::test]
async fn test_iterator_returns_tombstones() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader_tombstones";
let db = Db::open(path, main_store.clone()).await.unwrap();
db.delete(b"k_tombstone").await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let wal_reader = WalReader::new(path, main_store);
let wal_files = wal_reader.list(..).await.unwrap();
assert!(!wal_files.is_empty());
let mut rows = Vec::new();
for wal_file in wal_files {
let mut iter = wal_file.iterator().await.unwrap();
while let Some(entry) = iter.next().await.unwrap() {
rows.push(entry);
}
}
let tombstone_entry = rows
.iter()
.find(|entry| entry.key.as_ref() == b"k_tombstone")
.expect("expected deleted key in WAL iterator output");
assert!(matches!(tombstone_entry.value, ValueDeletable::Tombstone));
}
#[tokio::test]
async fn test_iterator_returns_merge_operands() {
let main_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/test_wal_reader_merges";
let db = Db::builder(path, main_store.clone())
.with_merge_operator(Arc::new(StringConcatMergeOperator))
.build()
.await
.unwrap();
db.merge(b"k_merge", b"merge_operand").await.unwrap();
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.unwrap();
let wal_reader = WalReader::new(path, main_store);
let wal_files = wal_reader.list(..).await.unwrap();
assert!(!wal_files.is_empty());
let mut rows = Vec::new();
for wal_file in wal_files {
let mut iter = wal_file.iterator().await.unwrap();
while let Some(entry) = iter.next().await.unwrap() {
rows.push(entry);
}
}
let merge_entry = rows
.iter()
.find(|entry| entry.key.as_ref() == b"k_merge")
.expect("expected merge key in WAL iterator output");
assert!(matches!(
&merge_entry.value,
ValueDeletable::Merge(value) if value.as_ref() == b"merge_operand"
));
}
}