#[cfg(feature = "write-support")]
pub mod cql_to_mutation;
#[cfg(feature = "write-support")]
pub mod export;
#[cfg(feature = "write-support")]
pub mod memtable;
#[cfg(feature = "write-support")]
pub mod merge;
#[cfg(feature = "write-support")]
pub mod merge_policy;
#[cfg(feature = "write-support")]
pub mod mutation;
#[cfg(feature = "write-support")]
pub mod wal;
#[cfg(feature = "write-support")]
pub use export::{ExportOptions, ExportReport};
#[cfg(feature = "write-support")]
pub use memtable::Memtable;
#[cfg(feature = "write-support")]
pub use merge::KWayMerger;
#[cfg(feature = "write-support")]
pub use merge_policy::STCSPolicy;
#[cfg(feature = "write-support")]
pub use mutation::{
CellOperation, ClusteringBound, ClusteringKey, DecoratedKey, Mutation, PartitionKey,
PartitionTombstone, RangeTombstone, TableId,
};
#[cfg(feature = "write-support")]
pub use wal::WriteAheadLog;
use crate::error::{Error, Result};
use crate::schema::TableSchema;
use crate::storage::sstable::writer::SSTableInfo;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
#[cfg(feature = "write-support")]
#[derive(Debug, Clone)]
pub struct MaintenanceReport {
pub time_spent: Duration,
pub completed_merges: Vec<PathBuf>,
pub rows_merged: u64,
pub bytes_written: u64,
pub pending_compaction: bool,
}
#[cfg(feature = "write-support")]
#[derive(Debug, Clone, Default)]
pub struct CompactionStats {
pub compactions_completed: u64,
pub sstables_merged_in: u64,
pub sstables_produced: u64,
pub bytes_read: u64,
pub bytes_written: u64,
pub rows_merged: u64,
pub total_time: Duration,
}
#[cfg(feature = "write-support")]
pub trait MergePolicy: Send + std::fmt::Debug {
fn select_merge(&self, candidates: &[PathBuf]) -> Result<Vec<PathBuf>>;
}
#[cfg(feature = "write-support")]
#[derive(Debug)]
struct ActiveMerge {
merger: KWayMerger,
writer: crate::storage::sstable::writer::SSTableWriter,
input_paths: Vec<PathBuf>,
tmp_dir: PathBuf,
sstable_dir: PathBuf,
rows_merged: u64,
bytes_read: u64,
started_at: Instant,
}
#[cfg(feature = "write-support")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Durability {
#[default]
SyncEachWrite,
Disabled,
}
#[cfg(feature = "write-support")]
#[derive(Debug, Clone)]
pub struct WriteEngineConfig {
pub data_dir: PathBuf,
pub wal_dir: PathBuf,
pub memtable_flush_threshold: usize,
pub memtable_hard_limit: usize,
pub schema: TableSchema,
pub durability: Durability,
}
#[cfg(feature = "write-support")]
impl WriteEngineConfig {
pub const DEFAULT_FLUSH_THRESHOLD: usize = 64 * 1024 * 1024;
pub const DEFAULT_HARD_LIMIT: usize = 256 * 1024 * 1024;
pub fn new(data_dir: PathBuf, wal_dir: PathBuf, schema: TableSchema) -> Self {
Self {
data_dir,
wal_dir,
memtable_flush_threshold: Self::DEFAULT_FLUSH_THRESHOLD,
memtable_hard_limit: Self::DEFAULT_HARD_LIMIT,
schema,
durability: Durability::default(),
}
}
pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
self.memtable_flush_threshold = threshold;
self
}
pub fn with_hard_limit(mut self, limit: usize) -> Self {
self.memtable_hard_limit = limit;
self
}
pub fn with_durability(mut self, durability: Durability) -> Self {
self.durability = durability;
self
}
}
#[cfg(feature = "write-support")]
#[derive(Debug)]
pub struct WriteEngine {
config: WriteEngineConfig,
wal: WriteAheadLog,
memtable: Memtable,
generation: u64,
closed: AtomicBool,
active_merge: Option<ActiveMerge>,
merge_policy: Option<Box<dyn MergePolicy>>,
cumulative_stats: CompactionStats,
}
#[cfg(feature = "write-support")]
fn reject_counter_cells(mutation: &Mutation) -> Result<()> {
for op in &mutation.operations {
match op {
CellOperation::Write { value, .. } | CellOperation::WriteWithTtl { value, .. } => {
if matches!(value, crate::types::Value::Counter(_)) {
return Err(Error::invalid_operation(
"counter writes are not supported via the standard mutation path; \
counter columns require server-side distributed increment semantics",
));
}
}
_ => {}
}
}
Ok(())
}
#[cfg(feature = "write-support")]
impl WriteEngine {
pub fn new(config: WriteEngineConfig) -> Result<Self> {
std::fs::create_dir_all(&config.data_dir).map_err(|e| {
Error::Storage(format!(
"Failed to create data directory {:?}: {}",
config.data_dir, e
))
})?;
std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
Error::Storage(format!(
"Failed to create WAL directory {:?}: {}",
config.wal_dir, e
))
})?;
Self::sweep_orphaned_compaction_tmp(&config.data_dir);
Self::sweep_orphaned_partial_sstables(
&config.data_dir,
&config.schema.keyspace,
&config.schema.table,
);
let wal_path = config.wal_dir.join(WriteAheadLog::WAL_FILENAME);
let wal = if wal_path.exists() {
WriteAheadLog::open_existing(&wal_path)?
} else {
WriteAheadLog::create(&config.wal_dir)?
};
let mut memtable = Memtable::new();
let mutations = wal.replay()?;
if !mutations.is_empty() {
log::info!("Replaying {} mutations from WAL", mutations.len());
for mutation in mutations {
let decorated_key = mutation.decorated_key(&config.schema)?;
memtable.insert_with_key(decorated_key, mutation)?;
}
log::info!(
"WAL replay complete: {} rows in memtable, {} bytes",
memtable.row_count(),
memtable.size_bytes()
);
}
let generation = Self::determine_next_generation(&config.data_dir)?;
Ok(Self {
config,
wal,
memtable,
generation,
closed: AtomicBool::new(false),
active_merge: None,
merge_policy: None,
cumulative_stats: CompactionStats::default(),
})
}
pub fn write(&mut self, mutation: Mutation) -> Result<()> {
if self.closed.load(Ordering::SeqCst) {
return Err(Error::InvalidInput(
"WriteEngine has been closed".to_string(),
));
}
reject_counter_cells(&mutation)?;
if self.memtable.size_bytes() >= self.config.memtable_hard_limit {
return Err(Error::Storage(format!(
"Memtable at hard limit ({} bytes >= {} bytes). Flush required before accepting more writes.",
self.memtable.size_bytes(),
self.config.memtable_hard_limit
)));
}
if self.config.durability == Durability::SyncEachWrite {
self.wal.append(&mutation)?;
self.wal.sync()?;
}
let decorated_key = mutation.decorated_key(&self.config.schema)?;
self.memtable.insert_with_key(decorated_key, mutation)?;
if self
.memtable
.should_flush(self.config.memtable_flush_threshold)
{
log::warn!(
"Memtable size {} exceeds threshold {} - call flush() manually in async context",
self.memtable.size_bytes(),
self.config.memtable_flush_threshold
);
if tokio::runtime::Handle::try_current().is_err() {
log::info!("Triggering automatic flush");
self.flush_internal()?;
}
}
Ok(())
}
pub async fn write_async(&mut self, mutation: Mutation) -> Result<()> {
if self.closed.load(Ordering::SeqCst) {
return Err(Error::InvalidInput(
"WriteEngine has been closed".to_string(),
));
}
reject_counter_cells(&mutation)?;
if self.memtable.size_bytes() >= self.config.memtable_hard_limit {
return Err(Error::Storage(format!(
"Memtable at hard limit ({} bytes >= {} bytes). Flush required before accepting more writes.",
self.memtable.size_bytes(),
self.config.memtable_hard_limit
)));
}
if self.config.durability == Durability::SyncEachWrite {
self.wal.append(&mutation)?;
self.wal.sync()?;
}
let decorated_key = mutation.decorated_key(&self.config.schema)?;
self.memtable.insert_with_key(decorated_key, mutation)?;
if self
.memtable
.should_flush(self.config.memtable_flush_threshold)
{
log::info!(
"Memtable size {} exceeds threshold {}, triggering flush",
self.memtable.size_bytes(),
self.config.memtable_flush_threshold
);
self.flush_internal_async().await?;
}
Ok(())
}
pub fn execute(&mut self, statement: &str) -> Result<()> {
if self.closed.load(Ordering::SeqCst) {
return Err(Error::InvalidInput(
"WriteEngine has been closed".to_string(),
));
}
let trimmed = statement.trim();
if trimmed.len() >= 5 && trimmed.as_bytes()[..5].eq_ignore_ascii_case(b"BEGIN") {
let mutations =
cql_to_mutation::convert_cql_to_mutations(trimmed, &self.config.schema)?;
for mutation in mutations {
self.write(mutation)?;
}
Ok(())
} else {
let mutation = self.parse_cql_to_mutation(statement)?;
self.write(mutation)
}
}
pub async fn flush(&mut self) -> Result<Option<SSTableInfo>> {
if self.closed.load(Ordering::SeqCst) {
return Err(Error::InvalidInput(
"WriteEngine has been closed".to_string(),
));
}
self.flush_internal_async().await
}
fn flush_internal(&mut self) -> Result<()> {
merge::block_on_async(self.flush_internal_async())?;
Ok(())
}
async fn flush_internal_async(&mut self) -> Result<Option<SSTableInfo>> {
if self.memtable.is_empty() {
return Ok(None);
}
log::info!(
"Flushing memtable: {} partitions, {} rows, {} bytes",
self.memtable.iter().count(),
self.memtable.row_count(),
self.memtable.size_bytes()
);
let partition_count_hint = self.memtable.iter().count();
let mut writer = crate::storage::sstable::writer::SSTableWriter::with_expected_partitions(
self.config.data_dir.clone(),
self.generation,
&self.config.schema,
partition_count_hint,
)?;
for (decorated_key, mutations) in self.memtable.iter() {
writer.write_partition(decorated_key.clone(), mutations.to_vec())?;
}
let info = writer.finish().await?;
log::info!(
"SSTable flush complete: generation {}, {} partitions, {} bytes",
self.generation,
info.partition_count,
info.data_size
);
if let Err(e) = self.wal.truncate() {
log::warn!(
"Failed to truncate WAL after successful SSTable flush: {}. \
Data is safe in SSTable, but WAL cleanup failed.",
e
);
}
self.memtable.clear();
self.generation += 1;
Ok(Some(info))
}
pub async fn close(&mut self) -> Result<()> {
if self.closed.swap(true, Ordering::SeqCst) {
return Ok(());
}
log::info!("Closing WriteEngine");
if !self.memtable.is_empty() {
log::info!("Flushing memtable before close");
match self.flush_internal_async().await {
Ok(_) => {
log::info!("Memtable flushed successfully");
}
Err(e) => {
log::error!("Failed to flush memtable during close: {}", e);
self.closed.store(false, Ordering::SeqCst);
return Err(e);
}
}
}
if let Err(e) = self.wal.sync() {
log::warn!("Failed to sync WAL during close: {}", e);
}
log::info!("WriteEngine closed");
Ok(())
}
pub fn memtable_size(&self) -> usize {
self.memtable.size_bytes()
}
pub fn memtable_row_count(&self) -> usize {
self.memtable.row_count()
}
pub fn wal_size(&self) -> u64 {
self.wal.size()
}
pub fn generation(&self) -> u64 {
self.generation
}
fn parse_cql_to_mutation(&self, statement: &str) -> Result<Mutation> {
cql_to_mutation::convert_cql_to_mutation(statement, &self.config.schema)
}
fn determine_next_generation(data_dir: &Path) -> Result<u64> {
let mut max_generation = 0u64;
if !data_dir.exists() {
return Ok(1);
}
Self::scan_generations(
data_dir,
&mut max_generation,
crate::storage::sstable::MAX_SSTABLE_SCAN_DEPTH,
)?;
Ok(max_generation + 1)
}
fn scan_generations(dir: &Path, max_generation: &mut u64, depth: usize) -> Result<()> {
for entry in std::fs::read_dir(dir)
.map_err(|e| Error::Storage(format!("Failed to read data directory: {}", e)))?
{
let entry = entry
.map_err(|e| Error::Storage(format!("Failed to read directory entry: {}", e)))?;
let filename = entry.file_name();
let filename_str = filename.to_string_lossy();
if filename_str.starts_with("nb-") && filename_str.contains("-big-") {
if let Some(gen_str) = filename_str
.strip_prefix("nb-")
.and_then(|s| s.split('-').next())
{
if let Ok(gen) = gen_str.parse::<u64>() {
*max_generation = (*max_generation).max(gen);
}
}
} else if depth > 0 {
let path = entry.path();
if path.is_dir() {
Self::scan_generations(&path, max_generation, depth - 1)?;
}
}
}
Ok(())
}
pub fn set_merge_policy(&mut self, policy: Box<dyn MergePolicy>) -> Result<()> {
self.merge_policy = Some(policy);
Ok(())
}
pub fn maintenance_stats(&self) -> CompactionStats {
self.cumulative_stats.clone()
}
pub fn maintenance_step(&mut self, budget: Duration) -> Result<MaintenanceReport> {
if self.closed.load(Ordering::SeqCst) {
return Err(Error::InvalidInput(
"WriteEngine has been closed".to_string(),
));
}
let start = Instant::now();
let mut report = MaintenanceReport {
time_spent: Duration::from_secs(0),
completed_merges: Vec::new(),
rows_merged: 0,
bytes_written: 0,
pending_compaction: false,
};
let merge_policy = match &self.merge_policy {
Some(policy) => policy,
None => {
report.time_spent = start.elapsed();
return Ok(report);
}
};
if self.active_merge.is_none() {
let candidates = self.scan_sstable_candidates()?;
let selected = merge_policy.select_merge(&candidates)?;
if !selected.is_empty() {
self.start_merge(selected)?;
} else {
report.time_spent = start.elapsed();
report.pending_compaction = false;
return Ok(report);
}
}
let budget_tolerance = budget.mul_f32(1.1); let mut partitions_processed = 0;
while let Some(merge) = &mut self.active_merge {
if partitions_processed > 0 && start.elapsed() >= budget_tolerance {
break;
}
let step = merge.merger.step()?;
match step {
merge::MergeStep::Partition { key, rows } => {
partitions_processed += 1;
let row_count = rows.len() as u64;
let entries_vec: Vec<_> = rows.into_iter().collect();
let mutations = entries_vec
.into_iter()
.map(|entry| self.merge_entry_to_mutation(entry))
.collect::<Result<Vec<_>>>()?;
if let Some(merge) = &mut self.active_merge {
merge.writer.write_partition(key, mutations)?;
merge.rows_merged += row_count;
}
report.rows_merged += row_count;
}
merge::MergeStep::Complete => {
self.finalize_merge_blocking(&mut report)?;
break;
}
}
}
report.pending_compaction = self.active_merge.is_some();
report.time_spent = start.elapsed();
Ok(report)
}
fn sweep_orphaned_compaction_tmp(data_dir: &Path) {
let read_dir = match std::fs::read_dir(data_dir) {
Ok(rd) => rd,
Err(e) => {
log::debug!(
"sweep_orphaned_compaction_tmp: cannot read {:?}: {}",
data_dir,
e
);
return;
}
};
for entry in read_dir.flatten() {
let path = entry.path();
let name = entry.file_name();
let name_str = name.to_string_lossy();
if name_str.starts_with(".compaction-tmp-") && path.is_dir() {
log::warn!("removing orphaned compaction tmp directory: {:?}", path);
if let Err(e) = std::fs::remove_dir_all(&path) {
log::warn!(
"failed to remove orphaned compaction tmp directory {:?}: {}",
path,
e
);
}
}
}
}
fn sweep_orphaned_partial_sstables(data_dir: &Path, keyspace: &str, table: &str) {
let sstable_dir = data_dir.join(keyspace).join(table);
let read_dir = match std::fs::read_dir(&sstable_dir) {
Ok(rd) => rd,
Err(_) => {
return;
}
};
for entry in read_dir.flatten() {
let path = entry.path();
let name = entry.file_name();
let name_str = name.to_string_lossy();
if !name_str.starts_with("nb-")
|| !name_str.ends_with("-big-Data.db")
|| !path.is_file()
{
continue;
}
let base = match name_str.strip_suffix("-Data.db") {
Some(b) => b.to_owned(),
None => continue,
};
let gen_str = base
.strip_prefix("nb-")
.and_then(|s| s.strip_suffix("-big"))
.unwrap_or(&base);
let toc_path = sstable_dir.join(format!("{}-TOC.txt", base));
if !toc_path.exists() {
log::warn!(
"removing orphaned partial SSTable components for generation {}: missing TOC.txt",
gen_str
);
if let Err(e) = Self::delete_sstable_files_static(&path) {
log::warn!(
"failed to remove orphaned partial SSTable for generation {}: {}",
gen_str,
e
);
}
}
}
}
fn scan_sstable_candidates(&self) -> Result<Vec<PathBuf>> {
let mut candidates = Vec::new();
if !self.config.data_dir.exists() {
return Ok(candidates);
}
Self::scan_data_files(
&self.config.data_dir,
&mut candidates,
crate::storage::sstable::MAX_SSTABLE_SCAN_DEPTH,
)?;
Ok(candidates)
}
fn scan_data_files(dir: &Path, candidates: &mut Vec<PathBuf>, depth: usize) -> Result<()> {
for entry in std::fs::read_dir(dir)
.map_err(|e| Error::Storage(format!("Failed to read data directory: {}", e)))?
{
let entry = entry
.map_err(|e| Error::Storage(format!("Failed to read directory entry: {}", e)))?;
let path = entry.path();
let filename = path.file_name().unwrap_or_default().to_string_lossy();
if filename.starts_with("nb-") && filename.ends_with("-big-Data.db") {
let base = filename.trim_end_matches("-Data.db");
let toc_path = path.with_file_name(format!("{base}-TOC.txt"));
if toc_path.exists() {
candidates.push(path);
} else {
log::debug!(
"scan_data_files: skipping unpublished SSTable (no TOC.txt): {:?}",
path
);
}
} else if depth > 0 && path.is_dir() {
Self::scan_data_files(&path, candidates, depth - 1)?;
}
}
Ok(())
}
fn start_merge(&mut self, input_paths: Vec<PathBuf>) -> Result<()> {
log::info!(
"Starting compaction merge of {} SSTables",
input_paths.len()
);
let bytes_read: u64 = input_paths
.iter()
.map(|p| std::fs::metadata(p).map(|m| m.len()).unwrap_or(0))
.sum();
let output_generation = self.generation;
let sstable_dir = self
.config
.data_dir
.join(&self.config.schema.keyspace)
.join(&self.config.schema.table);
let tmp_dir = self
.config
.data_dir
.join(format!(".compaction-tmp-{}", output_generation));
std::fs::create_dir_all(&tmp_dir).map_err(|e| {
Error::Storage(format!(
"Failed to create compaction tmp directory {:?}: {}",
tmp_dir, e
))
})?;
let merger = KWayMerger::new(input_paths.clone(), &self.config.schema)?;
let writer = crate::storage::sstable::writer::SSTableWriter::new(
tmp_dir.clone(),
output_generation,
&self.config.schema,
)?;
self.generation += 1;
self.active_merge = Some(ActiveMerge {
merger,
writer,
input_paths,
tmp_dir,
sstable_dir,
rows_merged: 0,
bytes_read,
started_at: Instant::now(),
});
Ok(())
}
fn finalize_merge_blocking(&mut self, report: &mut MaintenanceReport) -> Result<()> {
merge::block_on_async(self.finalize_merge_async(report))
}
async fn finalize_merge_async(&mut self, report: &mut MaintenanceReport) -> Result<()> {
let merge = match self.active_merge.take() {
Some(m) => m,
None => return Ok(()),
};
let elapsed = merge.started_at.elapsed();
log::info!(
"Finalizing compaction merge: {} rows, {:?} elapsed",
merge.rows_merged,
elapsed
);
let tmp_info = match merge.writer.finish().await {
Ok(info) => info,
Err(e) => {
let _ = std::fs::remove_dir_all(&merge.tmp_dir);
return Err(Error::Storage(format!(
"Compaction merge write failed (inputs intact): {}",
e
)));
}
};
log::info!(
"Compaction tmp output: {} bytes, {} partitions",
tmp_info.data_size,
tmp_info.partition_count
);
let sstable_dir = &merge.sstable_dir;
std::fs::create_dir_all(sstable_dir).map_err(|e| {
Error::Storage(format!(
"Failed to create SSTable directory {:?}: {}",
sstable_dir, e
))
})?;
let make_rename = |src: &PathBuf| -> Result<(PathBuf, PathBuf)> {
let filename = src
.file_name()
.ok_or_else(|| Error::Storage("Component path has no filename".to_string()))?;
let dst = sstable_dir.join(filename);
Ok((src.clone(), dst))
};
let mut renames: Vec<(PathBuf, PathBuf)> = Vec::new();
for src in &[
&tmp_info.data_path,
&tmp_info.index_path,
&tmp_info.filter_path,
&tmp_info.summary_path,
&tmp_info.stats_path,
&tmp_info.digest_path,
] {
renames.push(make_rename(src)?);
}
if let Some(ref ci_path) = tmp_info.compression_info_path {
renames.push(make_rename(ci_path)?);
}
renames.push(make_rename(&tmp_info.toc_path)?);
let mut renamed: Vec<PathBuf> = Vec::with_capacity(renames.len());
let mut rename_error: Option<Error> = None;
for (src, dst) in &renames {
match std::fs::rename(src, dst) {
Ok(()) => {
log::debug!(
"Renamed {:?} → {:?}",
src.file_name().unwrap_or_default(),
dst.file_name().unwrap_or_default()
);
renamed.push(dst.clone());
}
Err(e) => {
rename_error = Some(Error::Storage(format!(
"Atomic rename of {:?} to {:?} failed (rolling back, inputs intact): {}",
src, dst, e
)));
break;
}
}
}
if let Some(err) = rename_error {
for dst in &renamed {
let _ = std::fs::remove_file(dst);
}
let _ = std::fs::remove_dir_all(&merge.tmp_dir);
return Err(err);
}
for input_path in &merge.input_paths {
if let Err(e) = self.delete_sstable_files(input_path) {
log::warn!(
"Failed to delete compaction input {:?}: {} \
(merge output is valid; inputs will be re-evaluated next cycle)",
input_path,
e
);
}
}
if let Err(e) = std::fs::remove_dir_all(&merge.tmp_dir) {
log::debug!(
"Failed to remove compaction tmp directory {:?}: {}",
merge.tmp_dir,
e
);
}
let final_data_path = sstable_dir.join(
tmp_info
.data_path
.file_name()
.ok_or_else(|| Error::Storage("Data.db path has no filename".to_string()))?,
);
let total_bytes_written: u64 = [
&tmp_info.data_path,
&tmp_info.index_path,
&tmp_info.filter_path,
&tmp_info.summary_path,
&tmp_info.stats_path,
&tmp_info.digest_path,
]
.iter()
.map(|p| {
let filename = p.file_name().unwrap_or_default();
let final_path = sstable_dir.join(filename);
std::fs::metadata(&final_path).map(|m| m.len()).unwrap_or(0)
})
.sum::<u64>()
+ tmp_info
.compression_info_path
.as_ref()
.and_then(|p| {
let filename = p.file_name()?;
std::fs::metadata(sstable_dir.join(filename))
.ok()
.map(|m| m.len())
})
.unwrap_or(0);
report.completed_merges.push(final_data_path);
report.bytes_written += total_bytes_written;
self.cumulative_stats.compactions_completed += 1;
self.cumulative_stats.sstables_merged_in += merge.input_paths.len() as u64;
self.cumulative_stats.sstables_produced += 1;
self.cumulative_stats.bytes_read += merge.bytes_read;
self.cumulative_stats.bytes_written += total_bytes_written;
self.cumulative_stats.rows_merged += merge.rows_merged;
self.cumulative_stats.total_time += elapsed;
log::info!(
"Compaction complete: merged {} inputs → 1 output ({} bytes total across all components, {} rows, {:?})",
merge.input_paths.len(),
total_bytes_written,
merge.rows_merged,
elapsed
);
Ok(())
}
fn delete_sstable_files(&self, data_path: &Path) -> Result<()> {
Self::delete_sstable_files_static(data_path)
}
fn delete_sstable_files_static(data_path: &Path) -> Result<()> {
let filename = data_path
.file_name()
.and_then(|s| s.to_str())
.ok_or_else(|| Error::Storage("Invalid SSTable path".to_string()))?;
let base = filename
.strip_suffix("-Data.db")
.ok_or_else(|| Error::Storage("Invalid Data.db filename".to_string()))?;
let parent_dir = data_path.parent().ok_or_else(|| {
Error::Storage(format!(
"Data.db path has no parent directory: {:?}",
data_path
))
})?;
let components = [
"TOC.txt",
"Data.db",
"Index.db",
"Summary.db",
"Statistics.db",
"CompressionInfo.db",
"Filter.db",
"Digest.crc32",
];
let mut failures: Vec<String> = Vec::new();
for component in &components {
let component_path = parent_dir.join(format!("{}-{}", base, component));
if component_path.exists() {
match std::fs::remove_file(&component_path) {
Ok(()) => log::debug!("Deleted compaction input: {:?}", component_path),
Err(e) => {
log::warn!(
"Deferred delete of {:?}: {} (component left as orphan; \
unpublished via TOC.txt removal, reclaimed on next startup)",
component_path,
e
);
failures.push(format!("{:?}: {}", component_path, e));
}
}
}
}
if failures.is_empty() {
Ok(())
} else {
Err(Error::Storage(format!(
"Deferred delete left {} orphaned component(s) (unpublished, reclaimed on \
next startup): {}",
failures.len(),
failures.join("; ")
)))
}
}
fn merge_entry_to_mutation(
&self,
entry: merge::MergeEntry,
) -> Result<crate::storage::write_engine::mutation::Mutation> {
merge::KWayMerger::merge_entry_to_mutation(entry, &self.config.schema)
}
}
#[cfg(all(test, feature = "write-support"))]
mod tests {
use super::*;
use crate::schema::{Column, KeyColumn};
use crate::storage::write_engine::mutation::{CellOperation, PartitionKey, TableId};
use crate::types::Value;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_test_mutation(id: i32, name: &str, timestamp: i64) -> Mutation {
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
}];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
#[test]
fn test_set_merge_policy() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let policy = Box::new(crate::storage::write_engine::STCSPolicy::default());
engine.set_merge_policy(policy).unwrap();
let report = engine
.maintenance_step(std::time::Duration::from_millis(100))
.unwrap();
assert!(!report.pending_compaction);
assert_eq!(report.rows_merged, 0);
}
#[test]
fn test_write_engine_config() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
assert_eq!(
config.memtable_flush_threshold,
WriteEngineConfig::DEFAULT_FLUSH_THRESHOLD
);
assert_eq!(
config.memtable_hard_limit,
WriteEngineConfig::DEFAULT_HARD_LIMIT
);
let config = config.with_flush_threshold(128 * 1024 * 1024);
assert_eq!(config.memtable_flush_threshold, 128 * 1024 * 1024);
let config = config.with_hard_limit(512 * 1024 * 1024);
assert_eq!(config.memtable_hard_limit, 512 * 1024 * 1024);
}
#[test]
fn test_write_engine_new() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config).unwrap();
assert_eq!(engine.generation(), 1);
assert_eq!(engine.memtable_size(), 0);
assert_eq!(engine.memtable_row_count(), 0);
assert!(!engine.closed.load(std::sync::atomic::Ordering::Relaxed));
}
#[test]
fn test_write_engine_write_single_mutation() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let mutation = create_test_mutation(1, "Alice", 1000000);
engine.write(mutation).unwrap();
assert_eq!(engine.memtable_row_count(), 1);
assert!(engine.memtable_size() > 0);
assert!(engine.wal_size() > 0);
}
#[test]
fn test_write_engine_write_multiple_mutations() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..10 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
assert_eq!(engine.memtable_row_count(), 10);
assert!(engine.memtable_size() > 0);
}
#[tokio::test]
async fn test_write_engine_flush_empty() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let result = engine.flush().await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_write_engine_flush_with_data() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
let initial_generation = engine.generation();
let info = engine.flush().await.unwrap();
assert!(info.is_some());
let info = info.unwrap();
assert_eq!(info.partition_count, 5);
assert!(info.data_size > 0);
assert!(info.data_path.exists());
assert_eq!(engine.memtable_row_count(), 0);
assert_eq!(engine.memtable_size(), 0);
assert_eq!(engine.wal_size(), 0);
assert_eq!(engine.generation(), initial_generation + 1);
}
#[test]
fn test_write_engine_automatic_flush() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_flush_threshold(1024);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..100 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
assert!(engine.generation() > 1 || engine.memtable_size() < 10000);
}
#[tokio::test]
async fn test_write_engine_close_with_data() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
engine.close().await.unwrap();
let data_dir = temp_dir.path().join("data");
let entries: Vec<_> = std::fs::read_dir(&data_dir).unwrap().collect();
assert!(!entries.is_empty(), "SSTable files should exist");
}
#[tokio::test]
async fn test_write_engine_close_empty() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
engine.close().await.unwrap();
}
#[test]
fn test_write_engine_write_after_close() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
tokio::runtime::Runtime::new()
.unwrap()
.block_on(engine.close())
.unwrap();
let schema2 = create_test_schema();
let config2 = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema2,
);
let mut engine2 = WriteEngine::new(config2).unwrap();
let mutation = create_test_mutation(1, "Alice", 1000000);
engine2.write(mutation).unwrap();
assert_eq!(engine2.memtable_row_count(), 1);
}
#[test]
fn test_write_engine_wal_recovery() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
{
let mut engine = WriteEngine::new(config.clone()).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
}
let config2 = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config2).unwrap();
assert_eq!(engine.memtable_row_count(), 5);
assert!(engine.memtable_size() > 0);
}
#[test]
fn test_write_engine_generation_tracking() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
);
{
let mut engine = WriteEngine::new(config.clone()).unwrap();
assert_eq!(engine.generation(), 1);
let mutation = create_test_mutation(1, "Alice", 1000000);
engine.write(mutation).unwrap();
tokio::runtime::Runtime::new()
.unwrap()
.block_on(engine.flush())
.unwrap();
assert_eq!(engine.generation(), 2);
}
let config2 = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config2).unwrap();
assert_eq!(engine.generation(), 2);
}
#[test]
fn test_write_engine_execute_table_mismatch() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let result = engine.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("targets table 'users'")
&& err_msg.contains("schema is for 'test_table'"),
"Expected table mismatch error, got: {}",
err_msg
);
}
#[test]
fn test_write_engine_execute_insert_success() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
assert_eq!(engine.memtable_row_count(), 0);
let result = engine.execute("INSERT INTO test_table (id, name) VALUES (1, 'Alice')");
assert!(
result.is_ok(),
"execute() failed: {:?}",
result.unwrap_err()
);
assert_eq!(engine.memtable_row_count(), 1);
}
#[test]
fn test_determine_next_generation_empty_dir() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let generation = WriteEngine::determine_next_generation(&data_dir).unwrap();
assert_eq!(generation, 1);
}
#[test]
fn test_determine_next_generation_with_sstables() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
std::fs::write(data_dir.join("nb-1-big-Data.db"), b"").unwrap();
std::fs::write(data_dir.join("nb-2-big-Data.db"), b"").unwrap();
std::fs::write(data_dir.join("nb-5-big-Data.db"), b"").unwrap();
let generation = WriteEngine::determine_next_generation(&data_dir).unwrap();
assert_eq!(generation, 6);
}
#[tokio::test]
async fn test_write_engine_close_idempotent() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
engine.close().await.unwrap();
assert!(engine.closed.load(Ordering::SeqCst));
engine.close().await.unwrap();
assert!(engine.closed.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_write_engine_close_syncs_wal() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let mutation = create_test_mutation(1, "Alice", 1000000);
engine.write(mutation).unwrap();
engine.close().await.unwrap();
assert_eq!(engine.wal_size(), 0);
}
#[test]
fn test_write_engine_closed_flag_atomic() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config).unwrap();
assert!(!engine.closed.load(Ordering::SeqCst));
engine.closed.store(true, Ordering::SeqCst);
assert!(engine.closed.load(Ordering::SeqCst));
let prev = engine.closed.swap(false, Ordering::SeqCst);
assert!(prev);
assert!(!engine.closed.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_write_engine_write_after_close_fails() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
engine.close().await.unwrap();
let mutation = create_test_mutation(1, "Alice", 1000000);
let result = engine.write(mutation);
assert!(result.is_err());
match result {
Err(Error::InvalidInput(_)) => {}
_ => panic!("Expected InvalidInput error"),
}
}
#[tokio::test]
async fn test_write_engine_flush_after_close_fails() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
engine.close().await.unwrap();
let result = engine.flush().await;
assert!(result.is_err());
match result {
Err(Error::InvalidInput(_)) => {}
_ => panic!("Expected InvalidInput error"),
}
}
#[test]
fn test_write_engine_hard_limit_enforcement() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_flush_threshold(10 * 1024) .with_hard_limit(2048);
let mut engine = WriteEngine::new(config).unwrap();
let mut write_count = 0;
for i in 0..1000 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
let result = engine.write(mutation);
match result {
Ok(()) => {
write_count += 1;
}
Err(Error::Storage(msg)) => {
assert!(msg.contains("hard limit"));
break;
}
Err(e) => panic!("Expected Storage error, got: {:?}", e),
}
}
assert!(
write_count < 1000,
"Should have hit hard limit before 1000 writes"
);
assert!(
write_count > 0,
"Should have accepted at least some writes before hitting limit"
);
}
#[tokio::test]
async fn test_write_engine_hard_limit_enforcement_async() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_flush_threshold(10 * 1024) .with_hard_limit(2048);
let mut engine = WriteEngine::new(config).unwrap();
let mut write_count = 0;
for i in 0..1000 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
let result = engine.write_async(mutation).await;
match result {
Ok(()) => {
write_count += 1;
}
Err(Error::Storage(msg)) => {
assert!(msg.contains("hard limit"));
break;
}
Err(e) => panic!("Expected Storage error, got: {:?}", e),
}
}
assert!(
write_count < 1000,
"Should have hit hard limit before 1000 writes"
);
assert!(
write_count > 0,
"Should have accepted at least some writes before hitting limit"
);
}
#[tokio::test]
async fn test_write_engine_hard_limit_recovery_after_flush() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_flush_threshold(1024)
.with_hard_limit(2048);
let mut engine = WriteEngine::new(config).unwrap();
let mut first_batch_count = 0;
for i in 0..1000 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
let result = engine.write(mutation);
if result.is_err() {
break;
}
first_batch_count += 1;
}
assert!(
first_batch_count > 0,
"Should have accepted some writes before limit"
);
engine.flush().await.unwrap();
let mutation = create_test_mutation(9999, "After flush", 2000000);
let result = engine.write(mutation);
assert!(result.is_ok(), "Should accept writes after flush");
assert_eq!(engine.memtable_row_count(), 1);
}
#[test]
fn test_generation_counter_is_u64() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config).unwrap();
let generation: u64 = engine.generation();
assert_eq!(generation, 1u64);
let _type_check: u64 = generation;
let large_generation: u64 = u32::MAX as u64 + 1000;
assert!(large_generation > u32::MAX as u64);
assert_eq!(large_generation, 4_294_968_295u64);
}
#[test]
fn test_determine_next_generation_large_numbers() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let large_gen: u64 = u32::MAX as u64 + 100;
std::fs::write(data_dir.join(format!("nb-{}-big-Data.db", large_gen)), b"").unwrap();
let generation = WriteEngine::determine_next_generation(&data_dir).unwrap();
assert_eq!(generation, large_gen + 1);
assert!(generation > u32::MAX as u64);
}
#[test]
fn test_maintenance_step_no_policy() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let report = engine.maintenance_step(Duration::from_millis(100)).unwrap();
assert_eq!(report.rows_merged, 0);
assert_eq!(report.bytes_written, 0);
assert_eq!(report.completed_merges.len(), 0);
assert!(!report.pending_compaction);
assert!(report.time_spent < Duration::from_millis(50));
}
#[test]
fn test_maintenance_step_with_closed_engine() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
tokio::runtime::Runtime::new()
.unwrap()
.block_on(engine.close())
.unwrap();
let result = engine.maintenance_step(Duration::from_millis(100));
assert!(result.is_err());
match result {
Err(Error::InvalidInput(msg)) => {
assert!(msg.contains("closed"));
}
_ => panic!("Expected InvalidInput error"),
}
}
#[test]
fn test_maintenance_report_creation() {
let report = MaintenanceReport {
time_spent: Duration::from_millis(250),
completed_merges: vec![PathBuf::from("data/nb-5-big-Data.db")],
rows_merged: 1000,
bytes_written: 1024 * 1024,
pending_compaction: true,
};
assert_eq!(report.time_spent.as_millis(), 250);
assert_eq!(report.completed_merges.len(), 1);
assert_eq!(report.rows_merged, 1000);
assert_eq!(report.bytes_written, 1024 * 1024);
assert!(report.pending_compaction);
}
#[test]
fn test_scan_sstable_candidates_empty_dir() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config).unwrap();
let candidates = engine.scan_sstable_candidates().unwrap();
assert_eq!(candidates.len(), 0);
}
#[test]
fn test_scan_sstable_candidates_with_sstables() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config).unwrap();
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
std::fs::write(data_dir.join("nb-1-big-Data.db"), b"").unwrap();
std::fs::write(data_dir.join("nb-1-big-TOC.txt"), b"").unwrap();
std::fs::write(data_dir.join("nb-2-big-Data.db"), b"").unwrap();
std::fs::write(data_dir.join("nb-2-big-TOC.txt"), b"").unwrap();
std::fs::write(data_dir.join("nb-3-big-Index.db"), b"").unwrap(); std::fs::write(data_dir.join("other-file.txt"), b"").unwrap(); std::fs::write(data_dir.join("nb-4-big-Data.db"), b"").unwrap();
let candidates = engine.scan_sstable_candidates().unwrap();
assert_eq!(candidates.len(), 2);
assert!(candidates
.iter()
.all(|p| p.to_string_lossy().contains("Data.db")));
assert!(
!candidates
.iter()
.any(|p| p.to_string_lossy().contains("nb-4-big")),
"unpublished Data.db (no TOC.txt) must be excluded (Issue #591)"
);
}
#[test]
fn test_delete_sstable_files() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config).unwrap();
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let components = [
"nb-5-big-Data.db",
"nb-5-big-Index.db",
"nb-5-big-Summary.db",
"nb-5-big-Statistics.db",
];
for component in &components {
std::fs::write(data_dir.join(component), b"dummy").unwrap();
}
for component in &components {
assert!(data_dir.join(component).exists());
}
let data_path = data_dir.join("nb-5-big-Data.db");
engine.delete_sstable_files(&data_path).unwrap();
for component in &components {
assert!(!data_dir.join(component).exists());
}
}
#[test]
fn test_delete_removes_toc_first_unpublishing_atomically() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
for comp in &[
"nb-7-big-Data.db",
"nb-7-big-Index.db",
"nb-7-big-Statistics.db",
"nb-7-big-TOC.txt",
] {
std::fs::write(data_dir.join(comp), b"x").unwrap();
}
let data_path = data_dir.join("nb-7-big-Data.db");
WriteEngine::delete_sstable_files_static(&data_path).unwrap();
assert!(!data_dir.join("nb-7-big-TOC.txt").exists());
assert!(!data_path.exists());
std::fs::write(data_dir.join("nb-8-big-Data.db"), b"x").unwrap();
let mut candidates = Vec::new();
WriteEngine::scan_data_files(&data_dir, &mut candidates, 1).unwrap();
assert!(
candidates.is_empty(),
"a Data.db without a sibling TOC.txt must NOT be a compaction candidate \
(publication barrier, Issue #591); got {:?}",
candidates
);
std::fs::write(data_dir.join("nb-8-big-TOC.txt"), b"x").unwrap();
let mut candidates = Vec::new();
WriteEngine::scan_data_files(&data_dir, &mut candidates, 1).unwrap();
assert_eq!(
candidates.len(),
1,
"a published Data.db (TOC.txt present) must be discovered"
);
}
#[derive(Debug)]
#[allow(dead_code)] struct TestMergePolicy {
files_to_select: Vec<PathBuf>,
}
impl MergePolicy for TestMergePolicy {
fn select_merge(&self, _candidates: &[PathBuf]) -> Result<Vec<PathBuf>> {
Ok(self.files_to_select.clone())
}
}
#[test]
fn test_maintenance_step_with_policy_no_work() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let policy = TestMergePolicy {
files_to_select: vec![],
};
engine.set_merge_policy(Box::new(policy)).unwrap();
let report = engine.maintenance_step(Duration::from_millis(100)).unwrap();
assert_eq!(report.rows_merged, 0);
assert_eq!(report.bytes_written, 0);
assert_eq!(report.completed_merges.len(), 0);
assert!(!report.pending_compaction);
}
#[test]
fn test_maintenance_step_budget_honored() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let policy = TestMergePolicy {
files_to_select: vec![],
};
engine.set_merge_policy(Box::new(policy)).unwrap();
let budget = Duration::from_millis(10);
let report = engine.maintenance_step(budget).unwrap();
assert!(
report.time_spent < budget.mul_f32(1.5),
"Time spent {:?} exceeded budget {:?} by >50%",
report.time_spent,
budget
);
}
fn flush_n_sstables_sync(engine: &mut WriteEngine, n: usize) -> Vec<PathBuf> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut paths = Vec::new();
for batch in 0..n {
for row in 0..5 {
let mutation = create_test_mutation(
(batch * 100 + row) as i32,
&format!("User-{}-{}", batch, row),
1_000_000 + (batch * 100 + row) as i64,
);
engine.write(mutation).unwrap();
}
let info = rt.block_on(engine.flush()).unwrap().unwrap();
paths.push(info.data_path);
}
paths
}
#[test]
fn test_maintenance_stats_initial_zero() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine = WriteEngine::new(config).unwrap();
let stats = engine.maintenance_stats();
assert_eq!(stats.compactions_completed, 0);
assert_eq!(stats.sstables_merged_in, 0);
assert_eq!(stats.sstables_produced, 0);
assert_eq!(stats.bytes_read, 0);
assert_eq!(stats.bytes_written, 0);
assert_eq!(stats.rows_merged, 0);
assert_eq!(stats.total_time, Duration::ZERO);
}
#[test]
fn test_stcs_selects_expected_group_by_size() {
let policy = crate::storage::write_engine::STCSPolicy::default();
let temp_dir = TempDir::new().unwrap();
let mut paths = Vec::new();
for i in 1..=4 {
let path = temp_dir.path().join(format!("nb-{}-big-Data.db", i));
let size_bytes = 60 * 1024 * 1024u64;
let file = std::fs::File::create(&path).unwrap();
file.set_len(size_bytes).unwrap();
paths.push(path);
}
let selected = policy.select_merge(&paths).unwrap();
assert_eq!(
selected.len(),
4,
"STCS should select all 4 same-sized SSTables as one compaction group"
);
for sel in &selected {
assert!(
paths.contains(sel),
"Selected path {:?} not in input set",
sel
);
}
}
#[test]
fn test_stcs_does_not_select_below_threshold() {
let policy = crate::storage::write_engine::STCSPolicy::default();
let temp_dir = TempDir::new().unwrap();
let mut paths = Vec::new();
for i in 1..=3 {
let path = temp_dir.path().join(format!("nb-{}-big-Data.db", i));
let file = std::fs::File::create(&path).unwrap();
file.set_len(60 * 1024 * 1024).unwrap();
paths.push(path);
}
let selected = policy.select_merge(&paths).unwrap();
assert!(
selected.is_empty(),
"STCS should NOT select when fewer than min_threshold SSTables exist"
);
}
#[test]
fn test_maintenance_step_compacts_sstables_atomically() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let policy = crate::storage::write_engine::STCSPolicy::new(
4, 32, 0.5, 1.5, 0, )
.unwrap();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let input_paths = flush_n_sstables_sync(&mut engine, 4);
assert_eq!(input_paths.len(), 4, "Expected 4 flushed SSTables");
for p in &input_paths {
assert!(
p.exists(),
"Input file {:?} should exist before compaction",
p
);
}
engine.set_merge_policy(Box::new(policy)).unwrap();
let report = engine.maintenance_step(Duration::from_secs(60)).unwrap();
assert_eq!(
report.completed_merges.len(),
1,
"Expected exactly 1 completed merge, got: {:?}",
report.completed_merges
);
let merged_path = &report.completed_merges[0];
assert!(
merged_path.exists(),
"Merged output file {:?} must exist after compaction",
merged_path
);
for p in &input_paths {
assert!(
!p.exists(),
"Input file {:?} should have been deleted after compaction",
p
);
}
let stats = engine.maintenance_stats();
assert_eq!(
stats.compactions_completed, 1,
"compactions_completed must be 1"
);
assert_eq!(
stats.sstables_merged_in, 4,
"Should have consumed 4 input SSTables"
);
assert_eq!(stats.sstables_produced, 1, "sstables_produced must be 1");
assert!(stats.total_time > Duration::ZERO, "total_time must be > 0");
}
#[test]
fn test_maintenance_stats_accumulate_across_cycles() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let policy = crate::storage::write_engine::STCSPolicy::new(
4, 32, 0.5, 1.5, 0, )
.unwrap();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
engine.set_merge_policy(Box::new(policy)).unwrap();
flush_n_sstables_sync(&mut engine, 4);
engine.maintenance_step(Duration::from_secs(60)).unwrap();
let stats_after_first = engine.maintenance_stats();
assert_eq!(stats_after_first.compactions_completed, 1);
flush_n_sstables_sync(&mut engine, 4);
engine.maintenance_step(Duration::from_secs(60)).unwrap();
let stats_after_second = engine.maintenance_stats();
assert_eq!(
stats_after_second.compactions_completed, 2,
"Stats must accumulate across compaction cycles"
);
assert_eq!(
stats_after_second.sstables_merged_in, 8,
"Should have consumed 8 total input SSTables (2 cycles × 4 each)"
);
assert_eq!(
stats_after_second.sstables_produced, 2,
"Should have produced 2 output SSTables"
);
assert!(
stats_after_second.total_time >= stats_after_first.total_time,
"Cumulative total_time must only increase"
);
}
#[test]
fn test_maintenance_step_inputs_intact_on_unwriteable_tmp_dir() {
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
let is_root = std::fs::metadata("/proc/self")
.map(|m| m.uid() == 0)
.unwrap_or_else(|_| {
false
});
let is_root_macos = std::fs::write("/etc/cqlite-test-root-check", b"")
.map(|_| {
let _ = std::fs::remove_file("/etc/cqlite-test-root-check");
true
})
.unwrap_or(false);
if is_root || is_root_macos {
return;
}
}
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let input_paths = flush_n_sstables_sync(&mut engine, 4);
for p in &input_paths {
assert!(
p.exists(),
"Input file {:?} should exist before failure test",
p
);
}
let data_dir = temp_dir.path().join("data");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(
&data_dir,
std::fs::Permissions::from_mode(0o555), )
.unwrap();
}
let policy = crate::storage::write_engine::STCSPolicy::new(4, 32, 0.5, 1.5, 0).unwrap();
engine.set_merge_policy(Box::new(policy)).unwrap();
let result = engine.maintenance_step(Duration::from_secs(60));
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&data_dir, std::fs::Permissions::from_mode(0o755)).unwrap();
}
assert!(
result.is_err(),
"maintenance_step should return an error when the tmp dir cannot be created"
);
for p in &input_paths {
assert!(
p.exists(),
"Input file {:?} must remain intact after failed compaction",
p
);
}
let stats = engine.maintenance_stats();
assert_eq!(
stats.compactions_completed, 0,
"compactions_completed must not increment on failure"
);
}
#[test]
fn test_no_tmp_dir_remains_after_successful_merge() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let policy = crate::storage::write_engine::STCSPolicy::new(4, 32, 0.5, 1.5, 0).unwrap();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
flush_n_sstables_sync(&mut engine, 4);
engine.set_merge_policy(Box::new(policy)).unwrap();
engine.maintenance_step(Duration::from_secs(60)).unwrap();
let data_dir = temp_dir.path().join("data");
let leftover_tmp: Vec<_> = std::fs::read_dir(&data_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
e.file_name()
.to_string_lossy()
.starts_with(".compaction-tmp-")
})
.collect();
assert!(
leftover_tmp.is_empty(),
"No .compaction-tmp-* directories should remain after successful compaction, \
found: {:?}",
leftover_tmp.iter().map(|e| e.path()).collect::<Vec<_>>()
);
}
fn config_for(temp_dir: &TempDir) -> WriteEngineConfig {
WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
create_test_schema(),
)
}
#[test]
fn test_startup_sweep_removes_orphaned_compaction_tmp_dir() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
std::fs::create_dir_all(&data_dir).unwrap();
let orphan_dir = data_dir.join(".compaction-tmp-99");
std::fs::create_dir_all(&orphan_dir).unwrap();
std::fs::write(orphan_dir.join("partial.db"), b"partial content").unwrap();
assert!(
orphan_dir.exists(),
"Orphan dir should exist before engine creation"
);
let config = config_for(&temp_dir);
let _engine = WriteEngine::new(config).unwrap();
assert!(
!orphan_dir.exists(),
"Startup sweep must remove orphaned .compaction-tmp-99/ directory"
);
}
#[test]
fn test_startup_sweep_removes_orphaned_partial_sstable() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let sstable_dir = data_dir.join("test_ks").join("test_table");
std::fs::create_dir_all(&sstable_dir).unwrap();
let orphan_components = [
"nb-99-big-Data.db",
"nb-99-big-Index.db",
"nb-99-big-Statistics.db",
];
for name in &orphan_components {
std::fs::write(sstable_dir.join(name), b"orphaned").unwrap();
}
let complete_components = ["nb-1-big-Data.db", "nb-1-big-Index.db", "nb-1-big-TOC.txt"];
for name in &complete_components {
std::fs::write(sstable_dir.join(name), b"good").unwrap();
}
let config = config_for(&temp_dir);
let _engine = WriteEngine::new(config).unwrap();
for name in &orphan_components {
assert!(
!sstable_dir.join(name).exists(),
"Startup sweep must remove orphaned component {:?}",
name
);
}
for name in &complete_components {
assert!(
sstable_dir.join(name).exists(),
"Startup sweep must NOT remove complete SSTable component {:?}",
name
);
}
}
#[test]
fn test_startup_sweep_leaves_complete_sstable_intact() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("data");
let sstable_dir = data_dir.join("test_ks").join("test_table");
std::fs::create_dir_all(&sstable_dir).unwrap();
let all_components = [
"nb-3-big-Data.db",
"nb-3-big-Index.db",
"nb-3-big-Summary.db",
"nb-3-big-Statistics.db",
"nb-3-big-Filter.db",
"nb-3-big-Digest.crc32",
"nb-3-big-TOC.txt",
];
for name in &all_components {
std::fs::write(sstable_dir.join(name), b"complete").unwrap();
}
let config = config_for(&temp_dir);
let _engine = WriteEngine::new(config).unwrap();
for name in &all_components {
assert!(
sstable_dir.join(name).exists(),
"Complete SSTable component {:?} must not be removed by startup sweep",
name
);
}
}
#[test]
fn test_bytes_written_includes_all_components() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let policy = crate::storage::write_engine::STCSPolicy::new(4, 32, 0.5, 1.5, 0).unwrap();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let input_paths = flush_n_sstables_sync(&mut engine, 4);
let data_db_total: u64 = input_paths
.iter()
.map(|p| std::fs::metadata(p).map(|m| m.len()).unwrap_or(0))
.sum();
engine.set_merge_policy(Box::new(policy)).unwrap();
engine.maintenance_step(Duration::from_secs(60)).unwrap();
let stats = engine.maintenance_stats();
assert_eq!(stats.compactions_completed, 1, "compaction must have run");
let _ = data_db_total; let stats2 = engine.maintenance_stats();
assert_eq!(
stats.bytes_written, stats2.bytes_written,
"maintenance_stats() must be consistent across calls"
);
assert_eq!(
stats.sstables_produced, 1,
"one output SSTable must have been produced"
);
}
#[test]
fn test_durability_default_is_sync_each_write() {
assert_eq!(Durability::default(), Durability::SyncEachWrite);
}
#[test]
fn test_config_default_durability() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
assert_eq!(config.durability, Durability::SyncEachWrite);
}
#[test]
fn test_config_with_durability_builder() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_durability(Durability::Disabled);
assert_eq!(config.durability, Durability::Disabled);
}
#[test]
fn test_wal_on_produces_wal_growth() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_durability(Durability::SyncEachWrite);
let mut engine = WriteEngine::new(config).unwrap();
assert_eq!(engine.wal_size(), 0, "WAL must start empty");
let mutation = create_test_mutation(1, "Alice", 1_000_000);
engine.write(mutation).unwrap();
assert!(
engine.wal_size() > 0,
"WAL must grow after write with SyncEachWrite"
);
}
#[test]
fn test_wal_off_produces_no_wal_growth() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_durability(Durability::Disabled);
let mut engine = WriteEngine::new(config).unwrap();
assert_eq!(engine.wal_size(), 0, "WAL must start empty");
for i in 0..10 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
engine.write(mutation).unwrap();
}
assert_eq!(
engine.wal_size(),
0,
"WAL must remain empty with Durability::Disabled"
);
assert_eq!(
engine.memtable_row_count(),
10,
"Mutations must reach the memtable even without WAL"
);
}
#[tokio::test]
async fn test_wal_off_write_async_produces_no_wal_growth() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_durability(Durability::Disabled);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
engine.write_async(mutation).await.unwrap();
}
assert_eq!(
engine.wal_size(),
0,
"WAL must remain empty with Durability::Disabled (async path)"
);
assert_eq!(engine.memtable_row_count(), 5);
}
#[test]
fn test_wal_off_no_replay_on_restart() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
)
.with_durability(Durability::Disabled);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
engine.write(mutation).unwrap();
}
}
let config2 = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let engine2 = WriteEngine::new(config2).unwrap();
assert_eq!(
engine2.memtable_row_count(),
0,
"No WAL entries were written with Disabled, so no replay is possible"
);
}
#[test]
fn test_wal_on_replays_on_restart() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
{
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema.clone(),
)
.with_durability(Durability::SyncEachWrite);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1_000_000 + i as i64);
engine.write(mutation).unwrap();
}
}
let config2 = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
)
.with_durability(Durability::SyncEachWrite);
let engine2 = WriteEngine::new(config2).unwrap();
assert_eq!(
engine2.memtable_row_count(),
5,
"SyncEachWrite must replay mutations durably on restart"
);
}
}