#[cfg(feature = "write-support")]
use crate::error::{Error, Result};
#[cfg(feature = "write-support")]
use crate::storage::sstable::directory::types::SSTableComponent;
#[cfg(feature = "write-support")]
use std::path::{Path, PathBuf};
#[cfg(feature = "write-support")]
#[derive(Debug, Clone)]
pub struct ExportOptions {
pub keyspace: String,
pub table: String,
pub generation: u64,
pub compact_before_export: bool,
pub validate_after_export: bool,
}
#[cfg(feature = "write-support")]
impl ExportOptions {
pub fn new(keyspace: impl Into<String>, table: impl Into<String>, generation: u64) -> Self {
Self {
keyspace: keyspace.into(),
table: table.into(),
generation,
compact_before_export: false,
validate_after_export: true,
}
}
pub fn skip_compaction(mut self) -> Self {
self.compact_before_export = false;
self
}
pub fn skip_validation(mut self) -> Self {
self.validate_after_export = false;
self
}
}
#[cfg(feature = "write-support")]
#[derive(Debug, Clone)]
pub struct ExportReport {
pub output_path: PathBuf,
pub data_file_size: u64,
pub index_file_size: u64,
pub row_count: u64,
pub partition_count: u64,
pub components: Vec<PathBuf>,
}
#[cfg(feature = "write-support")]
impl ExportReport {
pub fn total_size(&self) -> u64 {
self.components
.iter()
.filter_map(|p| std::fs::metadata(p).ok())
.map(|m| m.len())
.sum()
}
pub fn validate_components(&self) -> Result<()> {
let required_components = [
"Data.db",
"Index.db",
"Statistics.db",
"Filter.db",
"Summary.db",
"Digest.crc32",
"TOC.txt",
];
for component in &required_components {
let exists = self.components.iter().any(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|s| s.ends_with(component))
.unwrap_or(false)
});
if !exists {
return Err(Error::Storage(format!(
"Missing required component: {}",
component
)));
}
}
Ok(())
}
}
#[cfg(feature = "write-support")]
fn validate_export_name(name: &str, field: &str) -> Result<()> {
if name.is_empty() {
return Err(Error::InvalidPath(format!(
"Export {} must not be empty",
field
)));
}
if name.contains("..") {
return Err(Error::InvalidPath(format!(
"Export {} contains path traversal sequence '..': {:?}",
field, name
)));
}
if name.contains('/') || name.contains('\\') {
return Err(Error::InvalidPath(format!(
"Export {} contains path separator: {:?}",
field, name
)));
}
if name.contains('\0') {
return Err(Error::InvalidPath(format!(
"Export {} contains null byte: {:?}",
field, name
)));
}
Ok(())
}
#[cfg(feature = "write-support")]
fn build_cassandra_filename(generation: u64, component: &str) -> String {
format!("nb-{}-big-{}", generation, component)
}
#[cfg(feature = "write-support")]
fn decode_unsigned_vint(data: &[u8], offset: usize) -> Result<(u64, usize)> {
use crate::parser::vint::parse_vuint;
let slice = data.get(offset..).ok_or_else(|| {
Error::Storage(format!(
"VInt: offset {} beyond data length {}",
offset,
data.len()
))
})?;
let (remaining, value) = parse_vuint(slice).map_err(|_| {
Error::Storage(format!(
"VInt: failed to decode at offset {} (data length {})",
offset,
data.len()
))
})?;
let bytes_consumed = slice.len() - remaining.len();
Ok((value, bytes_consumed))
}
#[cfg(feature = "write-support")]
impl crate::storage::write_engine::WriteEngine {
pub async fn export_sstable(
&mut self,
output_dir: &Path,
options: ExportOptions,
) -> Result<ExportReport> {
use std::sync::atomic::Ordering;
if self.closed.load(Ordering::SeqCst) {
return Err(Error::InvalidInput(
"WriteEngine has been closed".to_string(),
));
}
validate_export_name(&options.keyspace, "keyspace")?;
validate_export_name(&options.table, "table")?;
log::info!(
"Starting SSTable export to {} with keyspace={}, table={}, generation={}",
output_dir.display(),
options.keyspace,
options.table,
options.generation
);
let export_path = output_dir.join(&options.keyspace).join(&options.table);
tokio::fs::create_dir_all(&export_path).await.map_err(|e| {
Error::Storage(format!(
"Failed to create export directory {:?}: {}",
export_path, e
))
})?;
if !self.memtable.is_empty() {
log::info!(
"Flushing memtable before export ({} rows, {} bytes)",
self.memtable_row_count(),
self.memtable_size()
);
self.flush_internal_async().await?;
} else {
log::info!("Memtable is empty, skipping flush");
}
if options.compact_before_export {
log::warn!(
"compact_before_export on ExportOptions is deprecated. \
Use WriteEngine::maintenance_step() before export instead."
);
}
let source_sstable = self.find_most_recent_sstable().await?;
let mut exported_components = Vec::new();
let mut data_file_size = 0u64;
let mut index_file_size = 0u64;
let (source_generation, source_dir) = source_sstable;
let components_to_copy = [
("Data.db", SSTableComponent::Data),
("Index.db", SSTableComponent::Index),
("Statistics.db", SSTableComponent::Statistics),
("Filter.db", SSTableComponent::Filter),
("Summary.db", SSTableComponent::Summary),
("Digest.crc32", SSTableComponent::Digest),
("TOC.txt", SSTableComponent::TOC),
];
for (component_name, _component_type) in &components_to_copy {
let source_filename = format!("nb-{}-big-{}", source_generation, component_name);
let source_path = source_dir.join(&source_filename);
if !source_path.exists() {
log::warn!(
"Component {} not found at {}, skipping",
component_name,
source_path.display()
);
continue;
}
let dest_filename = build_cassandra_filename(options.generation, component_name);
let dest_path = export_path.join(&dest_filename);
tokio::fs::copy(&source_path, &dest_path)
.await
.map_err(|e| {
Error::Storage(format!(
"Failed to copy {} to {}: {}",
source_path.display(),
dest_path.display(),
e
))
})?;
log::debug!(
"Copied {} to {}",
source_path.display(),
dest_path.display()
);
if *component_name == "Data.db" {
data_file_size = tokio::fs::metadata(&dest_path).await?.len();
} else if *component_name == "Index.db" {
index_file_size = tokio::fs::metadata(&dest_path).await?.len();
}
exported_components.push(dest_path);
}
let (partition_count, row_count) = read_statistics_from_export(&exported_components)?;
let report = ExportReport {
output_path: export_path,
data_file_size,
index_file_size,
row_count,
partition_count,
components: exported_components,
};
if options.validate_after_export {
log::info!("Validating exported SSTable");
report.validate_components()?;
log::info!("Validation passed");
}
log::info!(
"Export complete: {} partitions, {} rows, {} total bytes",
report.partition_count,
report.row_count,
report.total_size()
);
Ok(report)
}
async fn find_most_recent_sstable(&self) -> Result<(u64, PathBuf)> {
if !self.config.data_dir.exists() {
return Err(Error::Storage(
"Data directory does not exist (no SSTables to export)".to_string(),
));
}
Self::find_max_generation(
&self.config.data_dir,
crate::storage::sstable::MAX_SSTABLE_SCAN_DEPTH,
)
.await?
.ok_or_else(|| {
Error::Storage("No SSTables found in data directory (nothing to export)".to_string())
})
}
#[allow(clippy::type_complexity)]
fn find_max_generation<'a>(
dir: &'a Path,
depth: usize,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Option<(u64, PathBuf)>>> + Send + 'a>,
> {
let dir = dir.to_path_buf();
Box::pin(async move {
let mut best: Option<(u64, PathBuf)> = None;
let mut entries = tokio::fs::read_dir(&dir)
.await
.map_err(|e| Error::Storage(format!("Failed to read directory: {}", e)))?;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| Error::Storage(format!("Failed to read directory entry: {}", e)))?
{
let filename = entry.file_name();
let filename_str = filename.to_string_lossy();
if filename_str.starts_with("nb-") && filename_str.contains("-big-") {
if let Some(gen) = filename_str
.strip_prefix("nb-")
.and_then(|s| s.split('-').next())
.and_then(|s| s.parse::<u64>().ok())
{
if best.as_ref().is_none_or(|(cur, _)| gen > *cur) {
best = Some((gen, dir.clone()));
}
}
} else if depth > 0 {
let path = entry.path();
if entry
.file_type()
.await
.map(|ft| ft.is_dir())
.unwrap_or(false)
{
if let Some((gen, sub_dir)) =
Self::find_max_generation(&path, depth - 1).await?
{
if best.as_ref().is_none_or(|(cur, _)| gen > *cur) {
best = Some((gen, sub_dir));
}
}
}
}
}
Ok(best)
})
}
}
#[cfg(feature = "write-support")]
fn read_statistics_from_export(components: &[PathBuf]) -> Result<(u64, u64)> {
let stats_path = components
.iter()
.find(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|s| s.ends_with("Statistics.db"))
.unwrap_or(false)
})
.ok_or_else(|| Error::Storage("Statistics.db not found in export".to_string()))?;
let file_data = std::fs::read(stats_path)
.map_err(|e| Error::Storage(format!("Failed to read Statistics.db: {}", e)))?;
if file_data.len() < 8 {
log::warn!("Statistics.db too small to parse, returning zero counts");
return Ok((0, 0));
}
let num_components =
u32::from_be_bytes([file_data[0], file_data[1], file_data[2], file_data[3]]) as usize;
let mut stats_offset: Option<usize> = None;
for i in 0..num_components {
let entry_base = 8 + i * 8; if entry_base + 8 > file_data.len() {
break;
}
let comp_type = u32::from_be_bytes([
file_data[entry_base],
file_data[entry_base + 1],
file_data[entry_base + 2],
file_data[entry_base + 3],
]);
if comp_type == 2 {
stats_offset = Some(u32::from_be_bytes([
file_data[entry_base + 4],
file_data[entry_base + 5],
file_data[entry_base + 6],
file_data[entry_base + 7],
]) as usize);
break;
}
}
let stats_offset = match stats_offset {
Some(o) => o,
None => {
log::warn!("STATS component not found in Statistics.db TOC");
return Ok((0, 0));
}
};
const TOTAL_ROWS_OFFSET: usize = 161;
let abs_offset = stats_offset + TOTAL_ROWS_OFFSET;
if abs_offset + 8 > file_data.len() {
log::warn!(
"Statistics.db STATS component too short for totalRows (need {} + 8, have {})",
abs_offset,
file_data.len()
);
return Ok((0, 0));
}
let row_count = u64::from_be_bytes([
file_data[abs_offset],
file_data[abs_offset + 1],
file_data[abs_offset + 2],
file_data[abs_offset + 3],
file_data[abs_offset + 4],
file_data[abs_offset + 5],
file_data[abs_offset + 6],
file_data[abs_offset + 7],
]);
let partition_count = count_index_entries(components).unwrap_or_else(|e| {
log::warn!("Failed to count Index.db entries: {}, defaulting to 0", e);
0
});
log::info!(
"Read from export: row_count={}, partition_count={}",
row_count,
partition_count
);
Ok((partition_count, row_count))
}
#[cfg(feature = "write-support")]
fn count_index_entries(components: &[PathBuf]) -> Result<u64> {
let index_path = components
.iter()
.find(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|s| s.ends_with("Index.db"))
.unwrap_or(false)
})
.ok_or_else(|| Error::Storage("Index.db not found in export".to_string()))?;
let index_data = std::fs::read(index_path)
.map_err(|e| Error::Storage(format!("Failed to read Index.db: {}", e)))?;
let mut count = 0u64;
let mut offset = 0usize;
while offset + 4 <= index_data.len() {
let key_len = u16::from_be_bytes([index_data[offset], index_data[offset + 1]]) as usize;
offset += 2;
if offset + key_len > index_data.len() {
log::warn!(
"Index.db: key at offset {} exceeds file bounds (key_len={})",
offset,
key_len
);
break;
}
offset += key_len;
let (_, pos_bytes) = decode_unsigned_vint(&index_data, offset)?;
offset += pos_bytes;
let (promoted_size, prom_bytes) = decode_unsigned_vint(&index_data, offset)?;
offset += prom_bytes;
if promoted_size > 0 {
let skip = promoted_size as usize;
if offset + skip > index_data.len() {
log::warn!(
"Index.db: promoted index at offset {} exceeds file bounds",
offset
);
break;
}
offset += skip;
}
count += 1;
}
log::debug!("Counted {} partition entries in Index.db", count);
Ok(count)
}
#[cfg(all(test, feature = "write-support"))]
mod tests {
use super::*;
use crate::schema::{Column, KeyColumn, TableSchema};
use crate::storage::write_engine::mutation::{CellOperation, Mutation, PartitionKey, TableId};
use crate::storage::write_engine::{WriteEngine, WriteEngineConfig};
use crate::types::Value;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_test_mutation(id: i32, name: &str, timestamp: i64) -> Mutation {
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(id));
let ops = vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
}];
Mutation::new(table_id, pk, None, ops, timestamp, None)
}
#[test]
fn test_export_path_traversal_rejected() {
let bad_names = vec!["../etc", "foo/bar", "foo\\bar", "", "test\0ks", ".."];
for name in &bad_names {
let result = validate_export_name(name, "keyspace");
assert!(result.is_err(), "Should reject {:?} as export name", name);
}
}
#[test]
fn test_export_valid_names_accepted() {
let good_names = vec!["test_ks", "my-table", "T1", "keyspace123", "a.b"];
for name in &good_names {
let result = validate_export_name(name, "keyspace");
assert!(
result.is_ok(),
"Should accept {:?} as export name: {:?}",
name,
result
);
}
}
#[test]
fn test_export_options_defaults() {
let options = ExportOptions::new("test_ks", "users", 1);
assert_eq!(options.keyspace, "test_ks");
assert_eq!(options.table, "users");
assert_eq!(options.generation, 1);
assert!(!options.compact_before_export);
assert!(options.validate_after_export);
}
#[test]
fn test_export_options_skip_compaction() {
let options = ExportOptions::new("test_ks", "users", 1).skip_compaction();
assert!(!options.compact_before_export);
assert!(options.validate_after_export);
}
#[test]
fn test_export_options_skip_validation() {
let options = ExportOptions::new("test_ks", "users", 1).skip_validation();
assert!(!options.compact_before_export);
assert!(!options.validate_after_export);
}
#[test]
fn test_build_cassandra_filename() {
let filename = build_cassandra_filename(1, "Data.db");
assert_eq!(filename, "nb-1-big-Data.db");
let filename = build_cassandra_filename(42, "Index.db");
assert_eq!(filename, "nb-42-big-Index.db");
}
#[tokio::test]
async fn test_export_empty_engine() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let options = ExportOptions::new("test_ks", "test_table", 1).skip_compaction();
let result = engine.export_sstable(export_dir.path(), options).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("No SSTables found"));
}
#[tokio::test]
async fn test_export_single_sstable() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
engine.flush().await.unwrap();
let options = ExportOptions::new("test_ks", "test_table", 1)
.skip_validation()
.skip_compaction();
let report = engine.export_sstable(export_dir.path(), options).await;
assert!(report.is_ok());
let report = report.unwrap();
assert!(report.data_file_size > 0);
assert!(!report.components.is_empty());
assert_eq!(report.row_count, 5, "Expected 5 rows");
assert!(
report.partition_count > 0,
"Expected non-zero partition count, got {}",
report.partition_count
);
let data_file = export_dir
.path()
.join("test_ks")
.join("test_table")
.join("nb-1-big-Data.db");
assert!(data_file.exists());
let index_file = export_dir
.path()
.join("test_ks")
.join("test_table")
.join("nb-1-big-Index.db");
assert!(index_file.exists());
let compression_info_file = export_dir
.path()
.join("test_ks")
.join("test_table")
.join("nb-1-big-CompressionInfo.db");
assert!(
!compression_info_file.exists(),
"CompressionInfo.db must NOT be included for uncompressed data"
);
}
#[tokio::test]
async fn test_export_with_memtable_flush() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..3 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
assert!(engine.memtable_row_count() > 0);
let options = ExportOptions::new("test_ks", "test_table", 1)
.skip_validation()
.skip_compaction();
let report = engine.export_sstable(export_dir.path(), options).await;
assert!(report.is_ok());
assert_eq!(engine.memtable_row_count(), 0);
}
#[tokio::test]
async fn test_export_report_total_size() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..5 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
engine.flush().await.unwrap();
let options = ExportOptions::new("test_ks", "test_table", 1)
.skip_validation()
.skip_compaction();
let report = engine
.export_sstable(export_dir.path(), options)
.await
.unwrap();
let total_size = report.total_size();
assert!(total_size > 0);
assert!(total_size >= report.data_file_size + report.index_file_size);
}
#[tokio::test]
async fn test_export_report_validate_components() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let mutation = create_test_mutation(1, "Alice", 1000000);
engine.write(mutation).unwrap();
engine.flush().await.unwrap();
let options = ExportOptions::new("test_ks", "test_table", 1)
.skip_validation()
.skip_compaction();
let report = engine
.export_sstable(export_dir.path(), options)
.await
.unwrap();
let validation_result = report.validate_components();
assert!(validation_result.is_ok());
}
#[tokio::test]
async fn test_export_after_close_fails() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let mutation = create_test_mutation(1, "Alice", 1000000);
engine.write(mutation).unwrap();
engine.flush().await.unwrap();
engine.close().await.unwrap();
let options = ExportOptions::new("test_ks", "test_table", 1);
let result = engine.export_sstable(export_dir.path(), options).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("closed"));
}
#[tokio::test]
async fn test_export_multiple_generations() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
let mutation1 = create_test_mutation(1, "Alice", 1000000);
engine.write(mutation1).unwrap();
engine.flush().await.unwrap();
let mutation2 = create_test_mutation(2, "Bob", 2000000);
engine.write(mutation2).unwrap();
engine.flush().await.unwrap();
let options = ExportOptions::new("test_ks", "test_table", 100)
.skip_validation()
.skip_compaction();
let report = engine.export_sstable(export_dir.path(), options).await;
assert!(report.is_ok());
let data_file = export_dir
.path()
.join("test_ks")
.join("test_table")
.join("nb-100-big-Data.db");
assert!(data_file.exists());
}
#[tokio::test]
async fn test_export_default_options_does_not_fail() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..3 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
engine.flush().await.unwrap();
let options = ExportOptions::new("test_ks", "test_table", 1).skip_validation();
let result = engine.export_sstable(export_dir.path(), options).await;
assert!(
result.is_ok(),
"Export with default options should not fail: {:?}",
result
);
}
#[test]
fn test_count_index_entries_big_format() {
let temp_dir = TempDir::new().unwrap();
let mut index_data = Vec::new();
for i in 0u64..3 {
index_data.extend_from_slice(&4u16.to_be_bytes());
index_data.extend_from_slice(&(i as u32).to_be_bytes());
index_data.push((i * 50) as u8);
index_data.push(0x00);
}
let index_path = temp_dir.path().join("nb-1-big-Index.db");
std::fs::write(&index_path, &index_data).unwrap();
let components = vec![index_path];
let count = count_index_entries(&components).unwrap();
assert_eq!(count, 3, "Should count 3 BIG-format index entries");
}
#[tokio::test]
async fn test_export_partition_count_nonzero() {
let temp_dir = TempDir::new().unwrap();
let export_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let config = WriteEngineConfig::new(
temp_dir.path().join("data"),
temp_dir.path().join("wal"),
schema,
);
let mut engine = WriteEngine::new(config).unwrap();
for i in 0..10 {
let mutation = create_test_mutation(i, &format!("User{}", i), 1000000 + i as i64);
engine.write(mutation).unwrap();
}
engine.flush().await.unwrap();
let options = ExportOptions::new("test_ks", "test_table", 1)
.skip_validation()
.skip_compaction();
let report = engine
.export_sstable(export_dir.path(), options)
.await
.unwrap();
assert!(
report.partition_count > 0,
"partition_count should be non-zero, got {}",
report.partition_count
);
assert_eq!(report.row_count, 10, "Expected 10 rows");
assert!(
report.partition_count <= report.row_count,
"partition_count ({}) should be <= row_count ({})",
report.partition_count,
report.row_count
);
}
}