use crate::block_meta_index::BlockMetaIndex;
use crate::cache::{BlockCache, CacheKey};
use crate::compaction::CompactionRunner;
use crate::error::{FlowError, Result};
use crate::gc::GcRunner;
use crate::manifest::{Manifest, ManifestEntry};
use crate::memtable::MemTables;
use crate::record::{Config, InternalRecord, KeyFilter, Op, Query, ReadOptions, Record, ScanRange, SyncMode};
use crate::sstable::SstReader;
use crate::stats::{EngineStats, StatsCounters};
use crate::wal::Wal;
use crate::write_worker::WriteWorker;
use parking_lot::RwLock;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::sync::Arc;
pub struct Engine {
config: Config,
worker: Arc<parking_lot::Mutex<WriteWorker>>,
seq_counter: std::sync::atomic::AtomicU64,
stats: Arc<StatsCounters>,
memtables: Arc<MemTables>,
index: Arc<RwLock<BlockMetaIndex>>,
manifest: Arc<parking_lot::Mutex<Manifest>>,
cache: Arc<BlockCache>,
readers: Arc<RwLock<HashMap<u32, Arc<SstReader>>>>,
_maintenance: Option<tokio::task::JoinHandle<()>>,
}
impl Engine {
pub fn spawn_background_maintenance(&self) -> Option<tokio::task::JoinHandle<()>> {
let worker = self.worker.clone();
let manifest = self.manifest.clone();
let index = self.index.clone();
let cache = self.cache.clone();
let stats = self.stats.clone();
let data_dir = self.config.data_dir.clone();
let block_size = self.config.block_size;
let zstd_level = self.config.zstd_level;
let bloom_bits = self.config.bloom_bits_per_key;
let compaction_threshold = self.config.compaction_threshold;
let flush_interval = self.config.flush_interval_ms.max(1);
let gc_interval = self.config.gc_interval_secs.max(1);
let wal_sync_mode = self.config.wal_sync_mode;
let handle = tokio::task::spawn(async move {
let mut flush_tick = tokio::time::interval(std::time::Duration::from_millis(flush_interval));
let mut compact_tick = tokio::time::interval(std::time::Duration::from_secs(gc_interval.max(1)));
let mut gc_tick = tokio::time::interval(std::time::Duration::from_secs(gc_interval));
let wal_sync_ms = match wal_sync_mode {
SyncMode::IntervalMs(ms) => ms.max(1),
_ => u64::MAX, };
let mut sync_tick = tokio::time::interval(std::time::Duration::from_millis(wal_sync_ms));
loop {
tokio::select! {
_ = flush_tick.tick() => {
let w = worker.clone();
let _ = tokio::task::spawn_blocking(move || {
let _ = w.lock().do_flush();
}).await;
}
_ = compact_tick.tick() => {
let sst_count = manifest.lock().state().sstables.len();
if sst_count >= compaction_threshold {
let compaction = crate::compaction::CompactionRunner::new(
data_dir.clone(),
block_size,
zstd_level,
bloom_bits,
compaction_threshold,
manifest.clone(),
index.clone(),
cache.clone(),
stats.clone(),
);
let _ = tokio::task::spawn_blocking(move || compaction.run()).await;
}
}
_ = gc_tick.tick() => {
let gc = crate::gc::GcRunner::new(
data_dir.clone(),
manifest.clone(),
index.clone(),
cache.clone(),
stats.clone(),
);
let _ = tokio::task::spawn_blocking(move || gc.run()).await;
}
_ = sync_tick.tick() => {
let w = worker.clone();
let _ = tokio::task::spawn_blocking(move || {
let mut wr = w.lock();
let _ = wr.wal.sync_all();
}).await;
}
}
}
});
Some(handle)
}
pub async fn open(config: Config) -> Result<Self> {
config.validate()?;
let data_dir = &config.data_dir;
if !data_dir.exists() && !config.create_if_missing {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("data directory does not exist: {}", data_dir.display()),
)
.into());
}
std::fs::create_dir_all(data_dir)?;
std::fs::create_dir_all(data_dir.join("WAL"))?;
std::fs::create_dir_all(data_dir.join("SST"))?;
std::fs::create_dir_all(data_dir.join("INDEX"))?;
let stats = Arc::new(StatsCounters::new());
let mut wal = Wal::open(&data_dir.join("WAL"), config.wal_segment_size_mb)?;
let mut manifest = Manifest::open(data_dir)?;
let mut index = BlockMetaIndex::new(config.time_bucket_secs);
let state = manifest.state().clone();
for sst_id in &state.active_sst_ids {
if let Some(info) = state.sstables.get(sst_id) {
if let Some(blocks) = state.block_infos.get(sst_id) {
index.add_sst(*sst_id, blocks);
if let Some(ref bloom) = info.bloom {
index.set_bloom(*sst_id, bloom.clone());
}
}
}
}
let stale_ssts: Vec<u32> = state
.active_sst_ids
.iter()
.filter(|&&id| match state.sstables.get(&id) {
Some(info) => match info.bloom {
Some(ref b) => b.hash_version() != crate::bloom::CURRENT_HASH_VERSION,
None => false,
},
None => false,
})
.copied()
.collect();
if !stale_ssts.is_empty() {
let rebuild_start = std::time::Instant::now();
tracing::warn!(
"Bloom hasher upgrade detected: rebuilding {} stale filter(s) ({} total SSTs)",
stale_ssts.len(),
state.active_sst_ids.len()
);
for sst_id in &stale_ssts {
if let Err(e) = Engine::rebuild_bloom_for_sst(
&config,
&mut manifest,
&mut index,
*sst_id,
config.bloom_bits_per_key,
) {
tracing::error!(
"Failed to rebuild bloom for SST {}: {} — falling back to no filter (correctness preserved, point queries may be slower)",
sst_id,
e
);
index.remove_bloom(*sst_id);
}
}
tracing::info!(
"Bloom rebuild complete: {} sst(s) in {:?}",
stale_ssts.len(),
rebuild_start.elapsed()
);
}
let last_flushed_seq = manifest.state().last_flushed_seq;
let wal_records = wal.replay_from(last_flushed_seq)?;
let memtables = Arc::new(MemTables::new(
config.max_frozen_memtables,
config.memtable_size_mb * 1024 * 1024,
));
for rec in &wal_records {
memtables.insert(rec.clone());
}
let index = Arc::new(RwLock::new(index));
let manifest = Arc::new(parking_lot::Mutex::new(manifest));
let cache = Arc::new(BlockCache::new(config.block_cache_capacity_mb));
let readers = Arc::new(RwLock::new(HashMap::new()));
let sst_count = manifest.lock().state().sstables.len();
let sst_bytes: u64 = manifest
.lock()
.state()
.sstables
.values()
.map(|s| s.bytes)
.sum();
stats.set_sstable(sst_count, sst_bytes);
stats.set_index_stats(index.read().total_entries(), index.read().bucket_count());
let max_sst_id = manifest
.lock()
.state()
.sstables
.keys()
.max()
.copied()
.unwrap_or(0);
let seq_counter = std::sync::atomic::AtomicU64::new((max_sst_id as u64 + 1) * 1_000_000);
let auto_bg = config.auto_background;
let worker = Arc::new(parking_lot::Mutex::new(WriteWorker::new(
config.clone(),
wal,
memtables.clone(),
manifest.clone(),
index.clone(),
stats.clone(),
)));
let mut engine = Self {
config,
worker,
seq_counter,
stats,
memtables,
index,
manifest,
cache,
readers,
_maintenance: None,
};
if auto_bg {
engine._maintenance = engine.spawn_background_maintenance();
}
Ok(engine)
}
pub async fn write_batch(&self, batch: &[Record]) -> Result<()> {
self.write_batch_ttl(batch, None).await
}
pub async fn write_batch_owned(&self, batch: Vec<Record>) -> Result<()> {
self.write_batch_owned_ttl(batch, None).await
}
pub fn write_batch_sync(&self, batch: Vec<Record>) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
let records = self.convert_records(batch, None);
self.do_write(records)
}
fn convert_records(&self, batch: Vec<Record>, ttl_secs: Option<u64>) -> Vec<InternalRecord> {
let ttl = ttl_secs.or(self.config.default_ttl_secs);
let base = self
.seq_counter
.fetch_add(batch.len() as u64, std::sync::atomic::Ordering::Relaxed);
batch
.into_iter()
.enumerate()
.map(|(i, rec)| {
let expire_at = match ttl {
Some(t) => rec.ts + (t as i64 * 1_000_000),
None => rec.expire_at,
};
InternalRecord {
seq: base + i as u64,
op: Op::Put,
key: rec.key,
ts: rec.ts,
expire_at,
value: rec.value,
range_end: None,
}
})
.collect()
}
async fn write_batch_owned_ttl(&self, batch: Vec<Record>, ttl_secs: Option<u64>) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
let records = self.convert_records(batch, ttl_secs);
self.do_write(records)
}
pub async fn write_batch_ttl(&self, batch: &[Record], ttl_secs: Option<u64>) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
let default_ttl = self.config.default_ttl_secs;
let ttl = ttl_secs.or(default_ttl);
let base = self
.seq_counter
.fetch_add(batch.len() as u64, std::sync::atomic::Ordering::Relaxed);
let records: Vec<InternalRecord> = batch
.iter()
.enumerate()
.map(|(i, rec)| {
let expire_at = match ttl {
Some(t) => rec.ts + (t as i64 * 1_000_000),
None => rec.expire_at,
};
let seq = base + i as u64;
let mut irec = InternalRecord::from_record(rec, seq);
irec.expire_at = expire_at;
irec
})
.collect();
self.do_write(records)
}
fn do_write(&self, records: Vec<InternalRecord>) -> Result<()> {
let num_records = records.len() as u64;
let (wal_buf, mem_bytes) = crate::wal::encode_batch(&records);
let sync_mode = self.config.wal_sync_mode;
let start = std::time::Instant::now();
self.worker
.lock()
.process_batch_encoded(records, &wal_buf, mem_bytes, num_records, sync_mode)?;
self.stats
.record_write_latency(start.elapsed().as_micros() as u64);
Ok(())
}
pub async fn query(&self, query: Query) -> Result<Vec<Record>> {
let start = std::time::Instant::now();
let now_us = now_micros();
let iter = ScanIterator::build(
&query,
&self.memtables,
&self.index,
&self.cache,
&self.config,
&self.readers,
now_us,
)?;
let records: Vec<Record> = iter.collect::<Result<Vec<_>>>()?;
self.stats
.record_query_latency(start.elapsed().as_micros() as u64);
self.stats.records_read(records.len() as u64);
Ok(records)
}
pub async fn query_by_prefix(&self, key: &str) -> Result<Vec<Record>> {
self.query(Query::prefix(key)).await
}
pub async fn query_by_key_range(&self, start: &str, end: &str) -> Result<Vec<Record>> {
self.query(Query::key_range(start, end)).await
}
pub async fn query_time_range(&self, start: i64, end: i64) -> Result<Vec<Record>> {
self.query(Query::time_range(start, end)).await
}
pub async fn query_prefix_time_range(
&self,
key: &str,
start: i64,
end: i64,
) -> Result<Vec<Record>> {
self.query(Query::prefix_time_range(key, start, end)).await
}
pub async fn query_key_time_range(
&self,
start_key: &str,
end_key: &str,
start: i64,
end: i64,
) -> Result<Vec<Record>> {
self.query(Query::key_time_range(start_key, end_key, start, end))
.await
}
pub async fn get(&self, key: &str, ts: i64) -> Result<Option<Record>> {
Ok(self.get_sync(key, ts))
}
pub fn get_sync(&self, key: &str, ts: i64) -> Option<Record> {
let now_us = now_micros();
if let Some(rec) = self.memtables.get(key.as_bytes(), ts, now_us) {
if rec.op != Op::Delete {
return Some(rec.to_record());
}
return None;
}
let key_bytes = key.as_bytes();
let idx = self.index.read();
if let Some((sst_id, block_idx)) = idx.single_sst_point(key_bytes, now_us) {
drop(idx);
if let Some(rec) = self.block_search(key_bytes, ts, now_us, sst_id, block_idx) {
return Some(rec);
}
return None;
}
let found = idx.query_point_inline(key_bytes, now_us, |meta| {
self.block_search(key_bytes, ts, now_us, meta.sst_id, meta.block_idx)
});
drop(idx);
found
}
fn block_search(
&self,
key: &[u8],
ts: i64,
now_us: i64,
sst_id: u32,
block_idx: u32,
) -> Option<Record> {
let reader = match Engine::get_reader(&self.readers, &self.config, sst_id) {
Ok(r) => r,
Err(e) => {
tracing::error!(
"SST point lookup: cannot open reader for sst {}: {}",
sst_id,
e
);
return None;
}
};
if let Some(cached) = reader.read_block_cached(block_idx, &self.cache) {
return Self::find_in_records(&cached, key, ts, now_us);
}
match reader.read_block_decompress(block_idx) {
Ok((_header, records)) => {
let result = Self::find_in_records(&records, key, ts, now_us);
self.cache.insert(CacheKey { sst_id, block_idx }, records);
result
}
Err(e) => {
tracing::error!(
"SST point lookup: block decompress failed sst={} block={}: {}",
sst_id,
block_idx,
e
);
None
}
}
}
fn find_in_records(
records: &[InternalRecord],
key: &[u8],
ts: i64,
now_us: i64,
) -> Option<Record> {
let lo = match records.binary_search_by(|r| {
r.key
.as_slice()
.cmp(key)
.then_with(|| r.ts.cmp(&ts))
}) {
Ok(idx) => idx,
Err(_) => return None,
};
let rec = &records[lo];
if rec.expire_at > now_us && rec.op != Op::Delete {
return Some(Record {
key: key.to_vec(),
ts: rec.ts,
expire_at: rec.expire_at,
value: rec.value.clone(),
});
}
None
}
pub async fn delete_batch(&self, keys_ts: &[(String, i64)]) -> Result<()> {
if keys_ts.is_empty() {
return Ok(());
}
let base = self
.seq_counter
.fetch_add(keys_ts.len() as u64, std::sync::atomic::Ordering::Relaxed);
let records: Vec<InternalRecord> = keys_ts
.iter()
.enumerate()
.map(|(i, (key, ts))| InternalRecord::delete(key.clone().into_bytes(), *ts, base + i as u64))
.collect();
self.do_write(records)
}
pub async fn delete_range(&self, start_key: &str, end_key: &str) -> Result<()> {
let seq = self
.seq_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let record = InternalRecord::delete_range(
start_key.as_bytes().to_vec(),
end_key.as_bytes().to_vec(),
seq,
);
self.do_write(vec![record])
}
pub async fn patch_record(
&self,
key: &str,
ts: i64,
new_value: Option<Vec<u8>>,
new_ttl_secs: Option<u64>,
) -> Result<Record> {
let existing = self.get(key, ts).await?;
let mut rec = match existing {
Some(r) => r,
None => {
return Err(crate::error::FlowError::Other(format!(
"record not found: key={}, ts={}",
key, ts
)));
}
};
if let Some(v) = new_value {
rec.value = v;
}
if let Some(ttl) = new_ttl_secs {
rec.expire_at = rec.ts + (ttl as i64 * 1_000_000);
}
self.write_batch(&[rec.clone()]).await?;
Ok(rec)
}
pub fn stats(&self) -> EngineStats {
self.stats.snapshot(self.cache.hit_rate())
}
pub fn metrics_text(&self) -> String {
self.stats.to_prometheus(self.cache.hit_rate())
}
fn get_reader(
readers: &Arc<RwLock<HashMap<u32, Arc<SstReader>>>>,
config: &Config,
sst_id: u32,
) -> Result<Arc<SstReader>> {
{
let r = readers.read();
if let Some(reader) = r.get(&sst_id) {
return Ok(reader.clone());
}
}
let path = config
.data_dir
.join("SST")
.join(format!("{:09}.sst", sst_id));
if !path.exists() {
return Err(FlowError::Other(format!("sst {} not found", sst_id)));
}
let reader = Arc::new(SstReader::open(&path, sst_id, 0)?);
readers.write().insert(sst_id, reader.clone());
Ok(reader)
}
fn get_reader_from_map(
readers: &HashMap<u32, Arc<SstReader>>,
config: &Config,
sst_id: u32,
) -> Result<Arc<SstReader>> {
if let Some(reader) = readers.get(&sst_id) {
return Ok(reader.clone());
}
let path = config
.data_dir
.join("SST")
.join(format!("{:09}.sst", sst_id));
if !path.exists() {
return Err(FlowError::Other(format!("sst {} not found", sst_id)));
}
Ok(Arc::new(SstReader::open(&path, sst_id, 0)?))
}
fn rebuild_bloom_for_sst(
config: &Config,
manifest: &mut Manifest,
index: &mut BlockMetaIndex,
sst_id: u32,
bits_per_key: usize,
) -> Result<()> {
let path = config
.data_dir
.join("SST")
.join(format!("{:09}.sst", sst_id));
if !path.exists() {
return Err(FlowError::Other(format!(
"rebuild_bloom: sst {} file missing",
sst_id
)));
}
let reader = SstReader::open(&path, sst_id, 0)?;
let block_count = reader.block_count();
let mut unique_keys: Vec<Vec<u8>> = Vec::new();
let mut last_key: Option<Vec<u8>> = None;
for blk_idx in 0..block_count {
let block = match reader.read_block(blk_idx, None) {
Ok(b) => b,
Err(e) => {
return Err(FlowError::Other(format!(
"rebuild_bloom: failed to read block {} of sst {}: {}",
blk_idx, sst_id, e
)));
}
};
for rec in &block.records {
if last_key.as_deref() != Some(rec.key.as_slice()) {
unique_keys.push(rec.key.clone());
last_key = Some(rec.key.clone());
}
}
}
let new_bloom = crate::bloom::BloomFilter::from_keys_with_bits(&unique_keys, bits_per_key);
debug_assert_eq!(new_bloom.hash_version(), crate::bloom::CURRENT_HASH_VERSION);
manifest.append(&ManifestEntry::UpdateBloom {
sst_id,
bloom: new_bloom.clone(),
})?;
index.set_bloom(sst_id, new_bloom);
Ok(())
}
pub async fn flush(&self) -> Result<()> {
let worker = self.worker.clone();
tokio::task::spawn_blocking(move || worker.lock().do_flush())
.await
.map_err(|e| FlowError::Other(format!("flush join error: {e}")))?
}
pub async fn trigger_gc(&self) -> Result<u64> {
let gc = GcRunner::new(
self.config.data_dir.clone(),
self.manifest.clone(),
self.index.clone(),
self.cache.clone(),
self.stats.clone(),
);
let purged = tokio::task::spawn_blocking(move || gc.run())
.await
.map_err(|_| FlowError::Closed)??;
self.evict_stale_readers();
Ok(purged)
}
pub async fn trigger_compaction(&self) -> Result<bool> {
let compaction = CompactionRunner::new(
self.config.data_dir.clone(),
self.config.block_size,
self.config.zstd_level,
self.config.bloom_bits_per_key,
self.config.compaction_threshold,
self.manifest.clone(),
self.index.clone(),
self.cache.clone(),
self.stats.clone(),
);
let did = tokio::task::spawn_blocking(move || compaction.run())
.await
.map_err(|_| FlowError::Closed)??;
self.evict_stale_readers();
Ok(did)
}
pub async fn shutdown(self) -> Result<()> {
if let Some(handle) = &self._maintenance {
handle.abort();
}
let mut worker = self.worker.lock();
worker.do_flush()?;
worker.flush_wal()?;
Ok(())
}
pub async fn close(&self) -> Result<()> {
let worker = self.worker.clone();
tokio::task::spawn_blocking(move || {
let mut w = worker.lock();
w.do_flush()?;
w.flush_wal()
})
.await
.map_err(|e| FlowError::Other(format!("close join error: {e}")))?
}
fn evict_stale_readers(&self) {
let active_ssts: std::collections::HashSet<u32> = {
let mf = self.manifest.lock();
mf.state().sstables.keys().copied().collect()
};
let mut readers = self.readers.write();
readers.retain(|sst_id, _| active_ssts.contains(sst_id));
}
pub fn scan(&self, range: ScanRange) -> Result<ScanIterator> {
self.scan_opt(range, &ReadOptions::default())
}
pub fn scan_opt(&self, range: ScanRange, _opts: &ReadOptions) -> Result<ScanIterator> {
let now_us = now_micros();
let (key_filter, time_range) = range.to_query_params();
let q = Query {
key_filter,
time_range,
};
ScanIterator::build(
&q,
&self.memtables,
&self.index,
&self.cache,
&self.config,
&self.readers,
now_us,
)
}
pub fn scan_prefix(&self, prefix: &str) -> Result<ScanIterator> {
self.scan(ScanRange::prefix(prefix))
}
pub fn scan_prefix_time_range(
&self,
prefix: &str,
ts_start: i64,
ts_end: i64,
) -> Result<ScanIterator> {
self.scan(ScanRange::prefix_time_range(prefix, ts_start, ts_end))
}
pub fn get_latest(&self, key: &str) -> Result<Option<Record>> {
let range = ScanRange::prefix(key);
let iter = self.scan(range)?;
let mut latest: Option<Record> = None;
for r in iter {
latest = Some(r?);
}
Ok(latest)
}
pub async fn get_latest_async(&self, key: &str) -> Result<Option<Record>> {
self.get_latest(key)
}
}
pub struct ScanIterator {
sources: Vec<std::iter::Peekable<std::vec::IntoIter<InternalRecord>>>,
heap: BinaryHeap<MergeEntry>,
tombstones: Vec<(Vec<u8>, Vec<u8>)>,
last_dedup: Option<(Vec<u8>, i64)>,
fast_source: Option<usize>,
}
struct MergeEntry {
key: Vec<u8>,
ts: i64,
seq: u64,
source: usize,
}
impl PartialEq for MergeEntry {
fn eq(&self, other: &Self) -> bool {
self.key == other.key && self.ts == other.ts
}
}
impl Eq for MergeEntry {}
impl PartialOrd for MergeEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MergeEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other
.key
.cmp(&self.key)
.then(other.ts.cmp(&self.ts))
.then(self.seq.cmp(&other.seq))
}
}
impl ScanIterator {
#[allow(clippy::too_many_arguments)]
fn build(
query: &Query,
memtables: &Arc<MemTables>,
index: &Arc<RwLock<BlockMetaIndex>>,
cache: &Arc<BlockCache>,
config: &Config,
readers: &Arc<RwLock<HashMap<u32, Arc<SstReader>>>>,
now_us: i64,
) -> Result<Self> {
let mem_results = match (&query.key_filter, &query.time_range) {
(KeyFilter::Prefix(key), None) => memtables.query_prefix(key, now_us),
(KeyFilter::Range { start, end }, None) => {
memtables.query_key_range(start, end, now_us)
}
(KeyFilter::All, Some((ts_start, ts_end))) => {
memtables.query_time_range(*ts_start, *ts_end, now_us)
}
(KeyFilter::Prefix(key), Some((ts_start, ts_end))) => {
memtables.query_prefix_time_range(key, *ts_start, *ts_end, now_us)
}
(KeyFilter::Range { start, end }, Some((ts_start, ts_end))) => {
memtables.query_key_time_range(start, end, *ts_start, *ts_end, now_us)
}
(KeyFilter::All, None) => memtables.query_key_range(b"", b"~", now_us),
};
let mut tombstones: Vec<(Vec<u8>, Vec<u8>)> = mem_results
.iter()
.filter(|r| r.op == Op::DeleteRange && r.range_end.is_some())
.map(|r| (r.key.clone(), r.range_end.clone().unwrap()))
.collect();
let candidates = {
let idx = index.read();
idx.query(&query.key_filter, query.time_range, now_us)
};
let is_full_scan = (matches!(&query.key_filter, KeyFilter::All)
|| matches!(&query.key_filter, KeyFilter::Prefix(p) if p.is_empty()))
&& query.time_range.is_none();
let readers_snapshot = {
let r = readers.read();
r.clone()
};
let mut sst_sources: Vec<std::iter::Peekable<std::vec::IntoIter<InternalRecord>>> =
Vec::new();
if is_full_scan {
let mut all_sst_records: Vec<InternalRecord> = Vec::new();
for meta in &candidates {
let reader =
match Engine::get_reader_from_map(&readers_snapshot, config, meta.sst_id) {
Ok(r) => r,
Err(e) => {
tracing::error!(
"Scan: cannot open SST reader for sst {}: {}",
meta.sst_id,
e
);
continue;
}
};
let records = match reader.read_block_decompress(meta.block_idx) {
Ok((_, recs)) => recs,
Err(e) => {
tracing::error!(
"Scan: block decompress failed sst={} block={}: {}",
meta.sst_id,
meta.block_idx,
e
);
continue;
}
};
for rec in records {
if rec.expire_at <= now_us {
continue;
}
if rec.op == Op::DeleteRange {
if let Some(ref end) = rec.range_end {
tombstones.push((rec.key.clone(), end.clone()));
}
continue;
}
if rec.op == Op::Delete {
continue;
}
all_sst_records.push(rec);
}
}
if !all_sst_records.is_empty() {
sst_sources.push(all_sst_records.into_iter().peekable());
}
} else {
for meta in &candidates {
let reader =
match Engine::get_reader_from_map(&readers_snapshot, config, meta.sst_id) {
Ok(r) => r,
Err(e) => {
tracing::error!(
"Scan: cannot open SST reader for sst {}: {}",
meta.sst_id,
e
);
continue;
}
};
let records = match reader.read_block_arc(meta.block_idx, cache) {
Ok(arc) => arc,
Err(e) => {
tracing::error!(
"Scan: read_block_arc failed sst={} block={}: {}",
meta.sst_id,
meta.block_idx,
e
);
continue;
}
};
let filtered = filter_sst_block(
&records,
&query.key_filter,
query.time_range,
false,
now_us,
&mut tombstones,
);
if !filtered.is_empty() {
sst_sources.push(filtered.into_iter().peekable());
}
}
}
let mut mem_sorted = mem_results;
mem_sorted.sort_by(|a, b| a.key.cmp(&b.key).then(a.ts.cmp(&b.ts)));
let mut sources: Vec<std::iter::Peekable<std::vec::IntoIter<InternalRecord>>> =
Vec::with_capacity(1 + sst_sources.len());
let memtable_present = !mem_sorted.is_empty();
if memtable_present {
sources.push(mem_sorted.into_iter().peekable());
}
sources.extend(sst_sources);
let fast_source = if sources.len() == 1 && tombstones.is_empty() {
Some(0)
} else {
None
};
let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::with_capacity(sources.len().max(1));
if fast_source.is_none() {
for (i, src) in sources.iter_mut().enumerate() {
if let Some(r) = src.peek() {
let source_id = if memtable_present && i == 0 {
usize::MAX
} else {
i
};
heap.push(MergeEntry {
key: r.key.clone(),
ts: r.ts,
seq: r.seq,
source: source_id,
});
}
}
}
Ok(Self {
sources,
heap,
tombstones,
last_dedup: None,
fast_source,
})
}
fn advance_source(&mut self, source: usize) -> Option<InternalRecord> {
let idx = if source == usize::MAX { 0 } else { source };
let src = self.sources.get_mut(idx)?;
let rec = src.next()?;
if let Some(next) = src.peek() {
self.heap.push(MergeEntry {
key: next.key.clone(),
ts: next.ts,
seq: next.seq,
source,
});
}
Some(rec)
}
fn is_tombstoned(&self, rec: &InternalRecord) -> bool {
self.tombstones.iter().any(|(start, end)| {
rec.key.as_slice() >= start.as_slice() && rec.key.as_slice() < end.as_slice()
})
}
}
fn filter_sst_block(
records: &[InternalRecord],
key_filter: &KeyFilter,
time_range: Option<(i64, i64)>,
is_full_scan: bool,
now_us: i64,
tombstones: &mut Vec<(Vec<u8>, Vec<u8>)>,
) -> Vec<InternalRecord> {
let mut filtered = Vec::with_capacity(records.len().min(64));
for rec in records {
if rec.expire_at <= now_us {
continue;
}
if rec.op == Op::DeleteRange {
if let Some(ref end) = rec.range_end {
tombstones.push((rec.key.clone(), end.clone()));
}
continue;
}
if is_full_scan {
if rec.op == Op::Delete {
continue;
}
filtered.push(InternalRecord {
seq: 0,
op: Op::Put,
key: rec.key.clone(),
ts: rec.ts,
expire_at: rec.expire_at,
value: rec.value.clone(),
range_end: None,
});
continue;
}
let matches_key = match key_filter {
KeyFilter::Prefix(key) => rec.key.starts_with(key.as_slice()),
KeyFilter::Range { start, end } => {
rec.key.as_slice() >= start.as_slice() && rec.key.as_slice() <= end.as_slice()
}
KeyFilter::All => true,
};
if !matches_key {
continue;
}
if let Some((ts_start, ts_end)) = time_range {
if rec.ts < ts_start || rec.ts > ts_end {
continue;
}
}
filtered.push(InternalRecord {
seq: 0,
op: rec.op,
key: rec.key.clone(),
ts: rec.ts,
expire_at: rec.expire_at,
value: rec.value.clone(),
range_end: rec.range_end.clone(),
});
}
filtered
}
impl Iterator for ScanIterator {
type Item = Result<Record>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(idx) = self.fast_source {
loop {
let src = self.sources.get_mut(idx)?;
let rec = src.next()?;
if rec.op == Op::Delete || rec.op == Op::DeleteRange {
continue;
}
return Some(Ok(rec.into_record_owned()));
}
}
loop {
let entry = self.heap.pop()?;
let dedup_key = (entry.key.clone(), entry.ts);
if self.last_dedup.as_ref() == Some(&dedup_key) {
self.advance_source(entry.source);
continue;
}
let rec = match self.advance_source(entry.source) {
Some(r) => r,
None => continue,
};
if rec.op == Op::Delete || rec.op == Op::DeleteRange {
self.last_dedup = Some(dedup_key);
continue;
}
if self.is_tombstoned(&rec) {
self.last_dedup = Some(dedup_key);
continue;
}
self.last_dedup = Some(dedup_key);
return Some(Ok(rec.into_record_owned()));
}
}
}
impl std::iter::FusedIterator for ScanIterator {}
fn now_micros() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as i64
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_config(dir: &std::path::Path) -> Config {
Config {
data_dir: dir.to_path_buf(),
memtable_size_mb: 1,
block_size: 100,
gc_interval_secs: 3600,
max_frozen_memtables: 2,
zstd_level: 1,
flush_interval_ms: 60000,
time_bucket_secs: 3600,
block_cache_capacity_mb: 16,
index_memory_budget_mb: 64,
default_ttl_secs: None,
bloom_bits_per_key: 10,
wal_segment_size_mb: 64,
compaction_threshold: 2,
create_if_missing: true,
wal_sync_mode: SyncMode::IntervalMs(u64::MAX),
auto_background: false,
}
}
fn make_record(key: &str, ts: i64) -> Record {
Record {
key: key.as_bytes().to_vec(),
ts,
expire_at: i64::MAX,
value: vec![1, 2, 3],
}
}
#[tokio::test]
async fn test_engine_write_read_roundtrip() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("key1", 100),
make_record("key2", 200),
make_record("key3", 300),
])
.await
.unwrap();
let results = engine.query_by_prefix("key1").await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].key, b"key1");
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_key_range_query() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 100),
make_record("b", 200),
make_record("c", 300),
make_record("d", 400),
])
.await
.unwrap();
let results = engine.query_by_key_range("b", "c").await.unwrap();
assert_eq!(results.len(), 2);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_time_range_query() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 100),
make_record("b", 200),
make_record("c", 300),
])
.await
.unwrap();
let results = engine.query_time_range(150, 300).await.unwrap();
assert_eq!(results.len(), 2);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_prefix_time_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 100),
make_record("a", 200),
make_record("a", 300),
make_record("b", 200),
])
.await
.unwrap();
let results = engine.query_prefix_time_range("a", 150, 250).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].ts, 200);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_ttl_expiry() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let mut rec = make_record("exp", 100);
rec.expire_at = 1;
engine.write_batch(&[rec]).await.unwrap();
let results = engine.query_by_prefix("exp").await.unwrap();
assert!(results.is_empty());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_flush_and_query() {
let dir = TempDir::new().unwrap();
let mut config = make_config(dir.path());
config.memtable_size_mb = 64;
let engine = Engine::open(config).await.unwrap();
let records: Vec<Record> = (0..50)
.map(|i| make_record(&format!("key_{:04}", i), i * 10))
.collect();
engine.write_batch(&records).await.unwrap();
engine.flush().await.unwrap();
let results = engine.query_by_prefix("key_").await.unwrap();
assert_eq!(results.len(), 50);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_stats() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[make_record("a", 100), make_record("b", 200)])
.await
.unwrap();
let stats = engine.stats();
assert_eq!(stats.total_records_written, 2);
assert!(stats.uptime_secs == 0 || stats.uptime_secs <= 5);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_recovery() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
{
let engine = Engine::open(config.clone()).await.unwrap();
engine
.write_batch(&[make_record("a", 100), make_record("b", 200)])
.await
.unwrap();
engine.shutdown().await.unwrap();
}
{
let engine = Engine::open(config).await.unwrap();
let results = engine.query_by_prefix("a").await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].key, b"a");
let results = engine.query_by_prefix("b").await.unwrap();
assert_eq!(results.len(), 1);
engine.shutdown().await.unwrap();
}
}
#[tokio::test]
async fn test_engine_concurrent_writes() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Arc::new(Engine::open(config).await.unwrap());
let mut handles = Vec::new();
for t in 0..4u64 {
let e = engine.clone();
handles.push(tokio::spawn(async move {
for i in 0..25u64 {
let rec = make_record(&format!("t{}-{}", t, i), (t * 100 + i) as i64);
e.write_batch(&[rec]).await.unwrap();
}
}));
}
for h in handles {
h.await.unwrap();
}
let stats = engine.stats();
assert_eq!(stats.total_records_written, 100);
let engine = Arc::try_unwrap(engine)
.unwrap_or_else(|e| std::sync::Arc::<Engine>::into_inner(e).unwrap());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_compaction() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
for batch in 0..10u64 {
let records: Vec<Record> = (0..5)
.map(|i| Record {
key: format!("compact_{}-{}", batch, i).into_bytes(),
ts: (batch * 100 + i) as i64,
expire_at: i64::MAX,
value: vec![1, 2, 3],
})
.collect();
engine.write_batch(&records).await.unwrap();
engine.flush().await.unwrap();
}
let did = engine.trigger_compaction().await.unwrap();
assert!(did);
let results = engine.query_by_prefix("compact_").await.unwrap();
assert_eq!(results.len(), 50);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_gc_removes_expired_sstables() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let now_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as i64;
let records: Vec<Record> = (0..10)
.map(|i| Record {
key: format!("gc_key_{}", i).into_bytes(),
ts: now_us,
expire_at: i64::MAX,
value: vec![1, 2, 3],
})
.collect();
engine.write_batch_ttl(&records, Some(1)).await.unwrap();
engine.flush().await.unwrap();
let results = engine.query_by_prefix("gc_key_").await.unwrap();
assert_eq!(results.len(), 10);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let purged = engine.trigger_gc().await.unwrap();
assert!(purged > 0);
let results = engine.query_by_prefix("gc_key_").await.unwrap();
assert!(results.is_empty());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_delete_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 100),
make_record("b", 200),
make_record("c", 300),
make_record("d", 400),
])
.await
.unwrap();
engine.delete_range("b", "d").await.unwrap();
let results = engine.query_by_key_range("a", "d").await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].key, b"a");
assert_eq!(results[1].key, b"d");
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_delete_range_flush_and_query() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("s1", 100),
make_record("s2", 200),
make_record("s3", 300),
])
.await
.unwrap();
engine.flush().await.unwrap();
engine.delete_range("s1", "s3").await.unwrap();
engine.flush().await.unwrap();
let results = engine.query_by_prefix("s").await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].key, b"s3");
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_size_tiered_compaction() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
for batch in 0..6u64 {
let records: Vec<Record> = (0..5)
.map(|i| Record {
key: format!("st_{}-{}", batch, i).into_bytes(),
ts: (batch * 100 + i) as i64,
expire_at: i64::MAX,
value: vec![1, 2, 3],
})
.collect();
engine.write_batch(&records).await.unwrap();
engine.flush().await.unwrap();
}
let did = engine.trigger_compaction().await.unwrap();
assert!(did);
let results = engine.query_by_prefix("st_").await.unwrap();
assert_eq!(results.len(), 30);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_delete_range_preserves_outside() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("x-a", 100),
make_record("x-b", 200),
make_record("x-c", 300),
make_record("y-a", 100),
make_record("y-b", 200),
])
.await
.unwrap();
engine.delete_range("x-a", "x-c").await.unwrap();
let x_results = engine.query_by_prefix("x-").await.unwrap();
assert_eq!(x_results.len(), 1);
assert_eq!(x_results[0].key, b"x-c");
let y_results = engine.query_by_prefix("y-").await.unwrap();
assert_eq!(y_results.len(), 2);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_prefix_basic() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("alpha", 100),
make_record("alpha", 200),
make_record("beta", 150),
])
.await
.unwrap();
let records: Vec<Record> = engine
.scan_prefix("alpha")
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 2);
assert_eq!(records[0].ts, 100);
assert_eq!(records[1].ts, 200);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_prefix_time_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("k1", 10),
make_record("k1", 20),
make_record("k1", 30),
make_record("k2", 15),
])
.await
.unwrap();
let records: Vec<Record> = engine
.scan_prefix_time_range("k1", 12, 25)
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 1);
assert_eq!(records[0].ts, 20);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_range_all() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 1),
make_record("b", 2),
make_record("c", 3),
])
.await
.unwrap();
let records: Vec<Record> = engine
.scan(ScanRange::all())
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 3);
assert_eq!(records[0].key, b"a");
assert_eq!(records[1].key, b"b");
assert_eq!(records[2].key, b"c");
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_key_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 1),
make_record("b", 2),
make_record("c", 3),
make_record("d", 4),
])
.await
.unwrap();
let records: Vec<Record> = engine
.scan(ScanRange::key_range("b", "c"))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 2);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_time_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 100),
make_record("b", 200),
make_record("c", 300),
])
.await
.unwrap();
let records: Vec<Record> = engine
.scan(ScanRange::time_range(150, 300))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 2);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_matches_query() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let records: Vec<Record> = (0..50)
.map(|i| make_record(&format!("key_{:04}", i), i * 10))
.collect();
engine.write_batch(&records).await.unwrap();
let scan_results: Vec<Record> = engine
.scan_prefix("key_")
.unwrap()
.map(|r| r.unwrap())
.collect();
let query_results = engine.query_by_prefix("key_").await.unwrap();
assert_eq!(scan_results.len(), query_results.len());
for (s, q) in scan_results.iter().zip(query_results.iter()) {
assert_eq!(s.key, q.key);
assert_eq!(s.ts, q.ts);
}
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_after_flush() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("s1", 100),
make_record("s2", 200),
make_record("s3", 300),
])
.await
.unwrap();
engine.flush().await.unwrap();
let records: Vec<Record> = engine
.scan_prefix("s")
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 3);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_with_delete_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("a", 100),
make_record("b", 200),
make_record("c", 300),
make_record("d", 400),
])
.await
.unwrap();
engine.delete_range("b", "d").await.unwrap();
let records: Vec<Record> = engine
.scan(ScanRange::key_range("a", "d"))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 2);
assert_eq!(records[0].key, b"a");
assert_eq!(records[1].key, b"d");
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_empty_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine.write_batch(&[make_record("a", 1)]).await.unwrap();
let records: Vec<Record> = engine
.scan_prefix("nonexistent")
.unwrap()
.map(|r| r.unwrap())
.collect();
assert!(records.is_empty());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_latest() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("key1", 100),
make_record("key1", 300),
make_record("key1", 200),
make_record("key2", 500),
])
.await
.unwrap();
let latest = engine.get_latest("key1").unwrap();
assert!(latest.is_some());
assert_eq!(latest.unwrap().ts, 300);
let latest2 = engine.get_latest("key2").unwrap();
assert!(latest2.is_some());
assert_eq!(latest2.unwrap().ts, 500);
let none = engine.get_latest("nonexistent").unwrap();
assert!(none.is_none());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_latest_after_flush() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine
.write_batch(&[
make_record("k", 10),
make_record("k", 20),
make_record("k", 30),
])
.await
.unwrap();
engine.flush().await.unwrap();
engine.write_batch(&[make_record("k", 99)]).await.unwrap();
let latest = engine.get_latest("k").unwrap();
assert!(latest.is_some());
assert_eq!(latest.unwrap().ts, 99);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_lazy_does_not_materialize_all() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
for i in 0..100u64 {
engine
.write_batch(&[Record {
key: format!("k{:03}", i).into_bytes(),
ts: i as i64,
expire_at: i64::MAX,
value: vec![42u8; 32],
}])
.await
.unwrap();
}
let first_5: Vec<Record> = engine
.scan(ScanRange::all())
.unwrap()
.take(5)
.map(|r| r.unwrap())
.collect();
assert_eq!(first_5.len(), 5);
assert_eq!(first_5[0].key, b"k000");
assert_eq!(first_5[4].key, b"k004");
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_opt_with_read_options() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
engine.write_batch(&[make_record("x", 1)]).await.unwrap();
let opts = ReadOptions {
fill_cache: false,
verify_checksums: false,
};
let records: Vec<Record> = engine
.scan_opt(ScanRange::prefix("x"), &opts)
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(records.len(), 1);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_recovery_after_flush_no_data_loss() {
let dir = TempDir::new().unwrap();
let path = dir.path().to_path_buf();
{
let engine = Engine::open(make_config(&path)).await.unwrap();
engine
.write_batch(&[make_record("flushed", 100)])
.await
.unwrap();
engine.flush().await.unwrap();
engine
.write_batch(&[make_record("post-flush", 200)])
.await
.unwrap();
drop(engine);
}
{
let engine = Engine::open(make_config(&path)).await.unwrap();
let r1 = engine.get("flushed", 100).await.unwrap();
assert!(r1.is_some(), "flushed record must survive restart");
let r2 = engine.get("post-flush", 200).await.unwrap();
assert!(r2.is_some(), "post-flush record must survive restart (WAL truncation bug check)");
}
}
#[tokio::test]
async fn test_memtable_backpressure_under_pressure() {
let dir = TempDir::new().unwrap();
let mut config = make_config(dir.path());
config.memtable_size_mb = 1;
let engine = Engine::open(config).await.unwrap();
let big_val = vec![0xABu8; 4096];
for i in 0..5000 {
let key = format!("pressure_{:05}", i).into_bytes();
let rec = Record {
key,
ts: i as i64,
expire_at: i64::MAX,
value: big_val.clone(),
};
engine.write_batch_sync(vec![rec]).unwrap();
}
engine.flush().await.unwrap();
let results: Vec<Record> = engine
.scan(ScanRange::prefix("pressure_"))
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(results.len(), 5000, "all 5000 records must survive");
let mut keys: Vec<Vec<u8>> = results.iter().map(|r| r.key.clone()).collect();
keys.sort();
keys.dedup();
assert_eq!(keys.len(), 5000, "no duplicate keys");
}
#[tokio::test]
async fn test_flush_does_not_block_runtime() {
let dir = TempDir::new().unwrap();
let engine = Arc::new(Engine::open(make_config(dir.path())).await.unwrap());
engine
.write_batch(&[make_record("a", 1), make_record("b", 2)])
.await
.unwrap();
engine.flush().await.unwrap();
let e1 = engine.clone();
let h1 = tokio::spawn(async move {
e1.flush().await.unwrap();
});
let e2 = engine.clone();
let h2 = tokio::spawn(async move {
let r = e2.get("a", 1).await.unwrap();
assert!(r.is_some());
});
h1.await.unwrap();
h2.await.unwrap();
}
#[tokio::test]
async fn test_bloom_rebuild_on_open_after_hash_upgrade() {
use crate::bloom::{BloomFilter, CURRENT_HASH_VERSION};
let dir = TempDir::new().unwrap();
let mut config = make_config(dir.path());
config.memtable_size_mb = 1;
let engine = Engine::open(config.clone()).await.unwrap();
let records: Vec<Record> = (0..200)
.map(|i| Record {
key: format!("bloom_key_{:04}", i).into_bytes(),
ts: i as i64,
expire_at: i64::MAX,
value: vec![1, 2, 3],
})
.collect();
engine.write_batch(&records).await.unwrap();
engine.flush().await.unwrap();
engine.shutdown().await.unwrap();
let manifest_path = dir.path().join("MANIFEST");
let original = std::fs::read_to_string(&manifest_path).unwrap();
let tampered = original.replace(
"\"hash_version\":1",
"\"hash_version\":0",
);
if tampered != original {
std::fs::write(&manifest_path, tampered).unwrap();
}
let engine2 = Engine::open(config.clone()).await.unwrap();
for i in 0..200 {
let key = format!("bloom_key_{:04}", i);
let rec = engine2.get(&key, i as i64).await.unwrap();
assert!(
rec.is_some(),
"key {} missing after bloom rebuild — false negative",
key
);
}
let absent = engine2.get("bloom_key_9999", 0).await.unwrap();
assert!(
absent.is_none(),
"absent key should not be found, got {:?}",
absent
);
engine2.shutdown().await.unwrap();
let manifest_text = std::fs::read_to_string(&manifest_path).unwrap();
assert!(
manifest_text.contains("\"update_bloom\""),
"expected UpdateBloom entry in manifest after rebuild, got: {}",
manifest_text
);
let mf = crate::manifest::Manifest::open(dir.path()).unwrap();
for info in mf.state().sstables.values() {
if let Some(b) = &info.bloom {
assert_eq!(
b.hash_version(),
CURRENT_HASH_VERSION,
"all blooms should be current after rebuild"
);
}
}
let _: &BloomFilter = mf
.state()
.sstables
.values()
.next()
.unwrap()
.bloom
.as_ref()
.unwrap();
}
#[tokio::test]
async fn test_bloom_remains_valid_across_clean_restart() {
let dir = TempDir::new().unwrap();
let mut config = make_config(dir.path());
config.memtable_size_mb = 1;
let records: Vec<Record> = (0..50)
.map(|i| Record {
key: format!("restart_key_{:03}", i).into_bytes(),
ts: i as i64,
expire_at: i64::MAX,
value: vec![9],
})
.collect();
{
let engine = Engine::open(config.clone()).await.unwrap();
engine.write_batch(&records).await.unwrap();
engine.flush().await.unwrap();
engine.shutdown().await.unwrap();
}
{
let engine = Engine::open(config.clone()).await.unwrap();
for (i, rec) in records.iter().enumerate() {
let got = engine
.get(&String::from_utf8_lossy(&rec.key), rec.ts)
.await
.unwrap();
assert!(
got.is_some(),
"key {} vanished after restart (bloom false negative?)",
String::from_utf8_lossy(&rec.key)
);
let _ = i;
}
engine.shutdown().await.unwrap();
}
}
#[tokio::test]
async fn test_write_batch_empty() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
assert!(engine.write_batch(&[]).await.is_ok());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_delete_batch_empty() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
assert!(engine.delete_batch(&[]).await.is_ok());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_nonexistent_key() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let result = engine.get("no_such_key", 0).await.unwrap();
assert!(result.is_none());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_patch_nonexistent_record_error() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let result = engine.patch_record("no_such_key", 0, Some(vec![1, 2, 3]), None).await;
assert!(result.is_err());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_delete_range_empty_strings() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
assert!(engine.delete_range("", "z").await.is_ok());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_query_by_prefix_empty_string() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let results = engine.query_by_prefix("").await.unwrap();
assert!(results.is_empty());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_latest_nonexistent() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
assert!(engine.get_latest("no_such_key").unwrap().is_none());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_flush_on_empty_engine() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
assert!(engine.flush().await.is_ok());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_all_range() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let iter = engine.scan(ScanRange::all()).unwrap();
let results: Vec<_> = iter.map(|r| r.unwrap()).collect();
assert!(results.is_empty());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_config_create_if_missing_false() {
let dir = TempDir::new().unwrap();
let nonexistent = dir.path().join("nonexistent");
let mut cfg = make_config(dir.path());
cfg.data_dir = nonexistent.clone();
cfg.create_if_missing = false;
let result = Engine::open(cfg).await;
assert!(result.is_err(), "should fail when dir does not exist and create_if_missing is false");
}
#[tokio::test]
async fn test_query_empty_result() {
let dir = TempDir::new().unwrap();
let config = make_config(dir.path());
let engine = Engine::open(config).await.unwrap();
let results = engine.query_by_key_range("z", "zzz").await.unwrap();
assert!(results.is_empty());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_auto_background_starts_maintenance() {
let dir = TempDir::new().unwrap();
let mut cfg = make_config(dir.path());
cfg.auto_background = true;
cfg.flush_interval_ms = 10;
cfg.gc_interval_secs = 1;
cfg.wal_sync_mode = SyncMode::IntervalMs(10);
let engine = Engine::open(cfg).await.unwrap();
engine
.write_batch(&[make_record("bg", 1)])
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_spawn_background_maintenance_explicit() {
let dir = TempDir::new().unwrap();
let mut cfg = make_config(dir.path());
cfg.flush_interval_ms = 10;
cfg.gc_interval_secs = 1;
cfg.wal_sync_mode = SyncMode::IntervalMs(10);
let engine = Engine::open(cfg).await.unwrap();
let handle = engine
.spawn_background_maintenance()
.expect("handle should be Some");
tokio::time::sleep(std::time::Duration::from_millis(40)).await;
handle.abort();
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_write_batch_owned() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
let records = vec![make_record("owned1", 1), make_record("owned2", 2)];
engine.write_batch_owned(records).await.unwrap();
let r = engine.get("owned1", 1).await.unwrap();
assert!(r.is_some());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_write_batch_sync() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch_sync(vec![make_record("sync1", 1), make_record("sync2", 2)])
.unwrap();
let r = engine.get("sync1", 1).await.unwrap();
assert!(r.is_some());
engine.write_batch_sync(vec![]).unwrap();
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_write_batch_owned_empty() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine.write_batch_owned(vec![]).await.unwrap();
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_write_batch_ttl_path() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch_ttl(&[make_record("ttl1", 1)], Some(u32::MAX as u64))
.await
.unwrap();
let r = engine.get("ttl1", 1).await.unwrap().unwrap();
assert!(r.expire_at < i64::MAX);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_write_batch_owned_ttl_path() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch_owned_ttl(vec![make_record("ot", 1)], Some(u32::MAX as u64))
.await
.unwrap();
let r = engine.get("ot", 1).await.unwrap().unwrap();
assert!(r.expire_at < i64::MAX);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_missing_key() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
assert!(engine.get("missing", 1).await.unwrap().is_none());
assert!(engine.get_sync("missing", 1).is_none());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_latest_async_path() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[make_record("gl", 1), make_record("gl", 5), make_record("gl", 3)])
.await
.unwrap();
let latest = engine.get_latest_async("gl").await.unwrap().unwrap();
assert_eq!(latest.ts, 5);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_latest_missing() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
assert!(engine.get_latest("no-such-key").unwrap().is_none());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_delete_batch_empty_noop() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine.delete_batch(&[]).await.unwrap();
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_patch_missing_record() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
let err = engine
.patch_record("ghost", 1, Some(b"v".to_vec()), None)
.await;
assert!(err.is_err());
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_patch_value_and_ttl() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[make_record("p", 1)])
.await
.unwrap();
let patched = engine
.patch_record("p", 1, Some(b"new".to_vec()), Some(u32::MAX as u64))
.await
.unwrap();
assert_eq!(patched.value, b"new");
assert!(patched.expire_at < i64::MAX);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_query_convenience_wrappers() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[
make_record("k1", 10),
make_record("k1", 20),
make_record("k2", 30),
])
.await
.unwrap();
assert_eq!(engine.query_by_prefix("k1").await.unwrap().len(), 2);
assert_eq!(engine.query_time_range(0, 100).await.unwrap().len(), 3);
assert_eq!(
engine.query_prefix_time_range("k1", 0, 100).await.unwrap().len(),
2
);
assert_eq!(
engine.query_key_time_range("k1", "k2", 0, 100).await.unwrap().len(),
3
);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_convenience_wrappers() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[make_record("c1", 1), make_record("c2", 2)])
.await
.unwrap();
let r: Vec<_> = engine
.scan_prefix_time_range("c", 0, 100)
.unwrap()
.map(|r| r.unwrap())
.collect();
assert_eq!(r.len(), 2);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_reopen_with_ssts() {
let dir = TempDir::new().unwrap();
{
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[make_record("persisted", 1)])
.await
.unwrap();
engine.flush().await.unwrap();
engine.shutdown().await.unwrap();
}
let engine = Engine::open(make_config(dir.path())).await.unwrap();
let r = engine.get("persisted", 1).await.unwrap();
assert!(r.is_some(), "record should survive reopen");
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_sync_after_flush() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[make_record("ss", 7)])
.await
.unwrap();
engine.flush().await.unwrap();
let r = engine.get_sync("ss", 7).unwrap();
assert_eq!(r.value, vec![1, 2, 3]);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_engine_with_default_ttl() {
let dir = TempDir::new().unwrap();
let mut cfg = make_config(dir.path());
cfg.default_ttl_secs = Some(u32::MAX as u64);
let engine = Engine::open(cfg).await.unwrap();
engine
.write_batch(&[make_record("dttl", 1)])
.await
.unwrap();
let r = engine.get("dttl", 1).await.unwrap().unwrap();
assert!(r.expire_at < i64::MAX);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_metrics_text_nonempty() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
let text = engine.metrics_text();
assert!(!text.is_empty());
assert!(text.contains("flowdb_"));
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_all_after_flush() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[make_record("fa", 1), make_record("fb", 2)])
.await
.unwrap();
engine.flush().await.unwrap();
let r: Vec<_> = engine
.scan(ScanRange::all())
.unwrap()
.map(|r| r.unwrap())
.collect();
assert!(r.len() >= 2);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_scan_all_with_tombstone_merge() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[
make_record("tm1", 1),
make_record("tm2", 1),
make_record("tm3", 1),
])
.await
.unwrap();
engine.flush().await.unwrap();
engine.delete_batch(&[("tm2".into(), 1)]).await.unwrap();
let r: Vec<_> = engine
.scan(ScanRange::all())
.unwrap()
.map(|r| r.unwrap())
.collect();
assert!(r.iter().all(|x| x.key != b"tm2"));
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_get_after_delete_same_key_ts() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[Record {
key: b"rd".to_vec(),
ts: 100,
expire_at: i64::MAX,
value: b"data".to_vec(),
}])
.await
.unwrap();
engine
.delete_batch(&[("rd".into(), 100)])
.await
.unwrap();
assert!(
engine.get("rd", 100).await.unwrap().is_none(),
"get after delete must return None — the delete tombstone has higher seq"
);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_get_returns_latest_version() {
let dir = TempDir::new().unwrap();
let engine = Engine::open(make_config(dir.path())).await.unwrap();
engine
.write_batch(&[Record {
key: b"ov".to_vec(),
ts: 50,
expire_at: i64::MAX,
value: b"v1".to_vec(),
}])
.await
.unwrap();
engine
.write_batch(&[Record {
key: b"ov".to_vec(),
ts: 50,
expire_at: i64::MAX,
value: b"v2".to_vec(),
}])
.await
.unwrap();
let r = engine.get("ov", 50).await.unwrap().unwrap();
assert_eq!(
r.value, b"v2",
"get must return the latest version (v2), not the stale one (v1)"
);
engine.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_regression_config_validation_in_open() {
let dir = TempDir::new().unwrap();
let mut cfg = make_config(dir.path());
cfg.time_bucket_secs = 0;
let result = Engine::open(cfg).await;
assert!(
result.is_err(),
"Engine::open must reject time_bucket_secs=0 (div-by-zero)"
);
}
#[tokio::test]
async fn test_regression_config_memtable_zero_rejected() {
let dir = TempDir::new().unwrap();
let mut cfg = make_config(dir.path());
cfg.memtable_size_mb = 0;
let result = Engine::open(cfg).await;
assert!(result.is_err());
}
}