#[cfg(feature = "write-support")]
pub mod compressed_data_writer;
#[cfg(feature = "write-support")]
pub mod compression_info_writer;
#[cfg(feature = "write-support")]
pub mod data_writer;
#[cfg(feature = "write-support")]
pub mod digest_writer;
#[cfg(feature = "write-support")]
pub mod filter_writer;
#[cfg(feature = "write-support")]
pub mod index_writer;
#[cfg(feature = "write-support")]
pub mod stats_writer;
#[cfg(feature = "write-support")]
pub mod summary_writer;
#[cfg(feature = "write-support")]
pub mod toc_writer;
#[cfg(all(feature = "write-support", feature = "deflate"))]
pub use compressed_data_writer::DeflateCompressor;
#[cfg(all(feature = "write-support", feature = "lz4"))]
pub use compressed_data_writer::Lz4Compressor;
#[cfg(all(feature = "write-support", feature = "snappy"))]
pub use compressed_data_writer::SnappyCompressor;
#[cfg(all(feature = "write-support", feature = "zstd"))]
pub use compressed_data_writer::ZstdCompressor;
#[cfg(feature = "write-support")]
pub use compressed_data_writer::{
create_compressor, CompressedDataWriter, Compressor, NoopCompressor,
};
#[cfg(feature = "write-support")]
pub use compression_info_writer::{
CompressionAlgorithm, CompressionInfoWriter, CompressionMetadata,
};
#[cfg(feature = "write-support")]
pub use data_writer::DataWriter;
#[cfg(feature = "write-support")]
pub use digest_writer::DigestWriter;
#[cfg(feature = "write-support")]
pub use filter_writer::FilterWriter;
#[cfg(feature = "write-support")]
pub use index_writer::{IndexEntryInfo, IndexWriter};
#[cfg(feature = "write-support")]
pub use stats_writer::{StatisticsMetadata, StatisticsWriter};
#[cfg(feature = "write-support")]
pub use summary_writer::SummaryWriter;
#[cfg(feature = "write-support")]
pub use toc_writer::{ComponentEntry, TocWriter};
use crate::error::{Error, Result};
use crate::schema::TableSchema;
use crate::storage::write_engine::mutation::{DecoratedKey, Mutation};
use std::path::{Path, PathBuf};
#[cfg(feature = "write-support")]
#[derive(Debug, Clone)]
pub struct SSTableInfo {
pub data_path: PathBuf,
pub index_path: PathBuf,
pub filter_path: PathBuf,
pub summary_path: PathBuf,
pub stats_path: PathBuf,
pub compression_info_path: Option<PathBuf>,
pub toc_path: PathBuf,
pub digest_path: PathBuf,
pub partition_count: usize,
pub data_size: u64,
}
#[cfg(feature = "write-support")]
#[derive(Debug)]
pub struct SSTableWriter {
sstable_dir: PathBuf,
generation: u64,
schema: TableSchema,
stats: StatisticsMetadata,
data_writer: DataWriter,
index_writer: IndexWriter,
filter_writer: Option<FilterWriter>,
summary_writer: SummaryWriter,
last_token: Option<i64>,
partition_count: usize,
summary_sample_counter: usize,
summary_sample_interval: usize,
}
#[cfg(feature = "write-support")]
impl SSTableWriter {
pub fn new(output_dir: PathBuf, generation: u64, schema: &TableSchema) -> Result<Self> {
Self::with_expected_partitions(output_dir, generation, schema, 128)
}
pub fn with_expected_partitions(
output_dir: PathBuf,
generation: u64,
schema: &TableSchema,
expected_partitions: usize,
) -> Result<Self> {
let mut stats = StatisticsMetadata::new();
stats.min_timestamp = i64::MAX;
stats.min_ttl = i32::MAX;
stats.min_local_deletion_time = i32::MAX;
let sstable_dir = output_dir.join(&schema.keyspace).join(&schema.table);
let data_path = Self::component_path(&sstable_dir, generation, "Data.db");
let data_writer = DataWriter::with_sink(stats.clone(), data_path);
let index_writer = IndexWriter::new();
let filter_path = Self::component_path(&sstable_dir, generation, "Filter.db");
let filter_writer = Some(FilterWriter::new(
filter_path,
expected_partitions.max(1),
0.01,
)?);
let summary_sample_interval = 128;
let summary_writer = SummaryWriter::new(summary_sample_interval as u32);
Ok(Self {
sstable_dir,
generation,
schema: schema.clone(),
stats,
data_writer,
index_writer,
filter_writer,
summary_writer,
last_token: None,
partition_count: 0,
summary_sample_counter: 0,
summary_sample_interval,
})
}
pub fn write_partition(&mut self, key: DecoratedKey, mutations: Vec<Mutation>) -> Result<()> {
if let Some(last_token) = self.last_token {
if key.token <= last_token {
return Err(Error::InvalidInput(format!(
"Partitions must be written in token order: got token {} after {}",
key.token, last_token
)));
}
}
self.last_token = Some(key.token);
let mut mutations = mutations;
mutations.sort_by(|a, b| match (&a.clustering_key, &b.clustering_key) {
(None, None) => std::cmp::Ordering::Equal,
(None, Some(_)) => std::cmp::Ordering::Less,
(Some(_), None) => std::cmp::Ordering::Greater,
(Some(ck_a), Some(ck_b)) => ck_a
.compare(ck_b, &self.schema)
.unwrap_or_else(|_| ck_a.cmp(ck_b)),
});
for mutation in &mutations {
self.stats.update_timestamp(mutation.timestamp_micros);
if let Some(ttl) = mutation.ttl_seconds {
self.stats.update_ttl(ttl as i32);
let now_seconds = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i32)
.unwrap_or(0);
let local_deletion_time = now_seconds.saturating_add(ttl as i32);
self.stats.update_local_deletion_time(local_deletion_time);
}
for op in &mutation.operations {
match op {
crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
ttl_seconds,
..
} => {
self.stats.update_ttl(*ttl_seconds as i32);
let now_seconds = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i32)
.unwrap_or(0);
let local_deletion_time = now_seconds.saturating_add(*ttl_seconds as i32);
self.stats.update_local_deletion_time(local_deletion_time);
}
crate::storage::write_engine::mutation::CellOperation::Delete { .. }
| crate::storage::write_engine::mutation::CellOperation::DeleteRow => {
let local_deletion_time = (mutation.timestamp_micros / 1_000_000) as i32;
self.stats.update_local_deletion_time(local_deletion_time);
}
_ => {}
}
}
if let Some(pt) = &mutation.partition_tombstone {
self.stats.update_timestamp(pt.deletion_time);
self.stats
.update_local_deletion_time(pt.local_deletion_time);
}
for rt in &mutation.range_tombstones {
self.stats.update_timestamp(rt.deletion_time);
self.stats
.update_local_deletion_time(rt.local_deletion_time);
}
self.stats.increment_row_count();
self.stats
.add_column_count(mutation.operations.len() as u64);
}
self.data_writer.update_stats(self.stats.clone());
let partition_tombstone = mutations
.iter()
.filter_map(|m| m.partition_tombstone.as_ref())
.max_by_key(|pt| pt.deletion_time);
let range_tombstones: Vec<_> = mutations
.iter()
.flat_map(|m| m.range_tombstones.iter())
.cloned()
.collect();
let data_offset = self.data_writer.write_partition(
&key,
&mutations,
&self.schema,
partition_tombstone,
&range_tombstones,
)?;
let entry_info = self.index_writer.add_partition(&key, data_offset)?;
if let Some(ref mut filter) = self.filter_writer {
filter.add_key(&key);
}
self.summary_writer.note_partition(&key);
if self.summary_sample_counter % self.summary_sample_interval == 0 {
self.summary_writer
.add_entry(&key, entry_info.index_offset)?;
}
self.summary_sample_counter += 1;
self.partition_count += 1;
self.stats.increment_partition_count();
Ok(())
}
pub async fn finish(mut self) -> Result<SSTableInfo> {
let sstable_dir = self.sstable_dir.clone();
let sstable_dir = sstable_dir.as_path();
tokio::fs::create_dir_all(sstable_dir).await?;
self.stats.finalize();
let stats_path = Self::component_path(sstable_dir, self.generation, "Statistics.db");
let stats_writer = StatisticsWriter::new(stats_path.clone());
stats_writer.write(&self.stats, Some(&self.schema))?;
let data_path = Self::component_path(sstable_dir, self.generation, "Data.db");
let data_size = self.data_writer.finish_streaming()?;
if data_size == 0 && !data_path.exists() {
tokio::fs::write(&data_path, b"").await?;
}
let index_path = Self::component_path(sstable_dir, self.generation, "Index.db");
let index_bytes = self.index_writer.finish()?;
tokio::fs::write(&index_path, index_bytes).await?;
let filter_path = Self::component_path(sstable_dir, self.generation, "Filter.db");
if let Some(filter_writer) = self.filter_writer {
filter_writer.finish().await?;
}
let summary_path = Self::component_path(sstable_dir, self.generation, "Summary.db");
let summary_bytes = self.summary_writer.finish()?;
tokio::fs::write(&summary_path, summary_bytes).await?;
let digest_path = Self::component_path(sstable_dir, self.generation, "Digest.crc32");
let digest_writer = DigestWriter::new(digest_path.clone());
let crc32_value = Self::compute_crc32(&data_path).await?;
digest_writer.write(crc32_value)?;
let toc_path = Self::component_path(sstable_dir, self.generation, "TOC.txt");
let toc_writer = TocWriter::new(toc_path.clone());
let components = vec![
ComponentEntry::new(crate::storage::sstable::directory::types::SSTableComponent::Data),
ComponentEntry::new(crate::storage::sstable::directory::types::SSTableComponent::Index),
ComponentEntry::new(
crate::storage::sstable::directory::types::SSTableComponent::Filter,
),
ComponentEntry::new(
crate::storage::sstable::directory::types::SSTableComponent::Summary,
),
ComponentEntry::new(
crate::storage::sstable::directory::types::SSTableComponent::Statistics,
),
ComponentEntry::new(
crate::storage::sstable::directory::types::SSTableComponent::Digest,
),
];
toc_writer.write(&components)?;
Ok(SSTableInfo {
data_path,
index_path,
filter_path,
summary_path,
stats_path,
compression_info_path: None,
toc_path,
digest_path,
partition_count: self.partition_count,
data_size,
})
}
fn component_path(output_dir: &Path, generation: u64, component: &str) -> PathBuf {
let filename = format!("nb-{}-big-{}", generation, component);
output_dir.join(filename)
}
async fn compute_crc32(file_path: &PathBuf) -> Result<u32> {
let data = tokio::fs::read(file_path).await?;
let mut hasher = crc32fast::Hasher::new();
hasher.update(&data);
Ok(hasher.finalize())
}
}
#[cfg(all(test, feature = "write-support"))]
mod tests {
use super::*;
use crate::schema::{Column, KeyColumn};
use crate::storage::write_engine::mutation::{CellOperation, PartitionKey, TableId};
use crate::types::Value;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "int".to_string(),
nullable: false,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_test_mutation(
keyspace: &str,
table: &str,
partition_id: i32,
name: &str,
timestamp: i64,
) -> Mutation {
let table_id = TableId::new(keyspace, table);
let pk = PartitionKey::single("id", Value::Integer(partition_id));
Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(name.to_string()),
}],
timestamp,
None,
)
}
#[tokio::test]
async fn test_sstable_writer_single_partition() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let mut writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 1, &schema).unwrap();
let mutation = create_test_mutation("test_ks", "test_table", 1, "Alice", 1000000);
let key = mutation.decorated_key(&schema).unwrap();
writer.write_partition(key, vec![mutation]).unwrap();
let info = writer.finish().await.unwrap();
assert!(info.data_path.exists());
assert!(info.index_path.exists());
assert!(info.filter_path.exists());
assert!(info.summary_path.exists());
assert!(info.stats_path.exists());
assert!(info.compression_info_path.is_none());
assert!(info.toc_path.exists());
assert!(info.digest_path.exists());
assert_eq!(info.partition_count, 1);
assert!(info.data_size > 0);
assert!(info
.data_path
.file_name()
.unwrap()
.to_str()
.unwrap()
.contains("nb-1-big-Data.db"));
}
#[tokio::test]
async fn test_sstable_writer_multiple_partitions() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let mut writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 1, &schema).unwrap();
let mutations = vec![
create_test_mutation("test_ks", "test_table", 1, "Alice", 1000000),
create_test_mutation("test_ks", "test_table", 2, "Bob", 1001000),
create_test_mutation("test_ks", "test_table", 3, "Charlie", 1002000),
];
let mut keyed_mutations: Vec<_> = mutations
.into_iter()
.map(|m| {
let key = m.decorated_key(&schema).unwrap();
(key, m)
})
.collect();
keyed_mutations.sort_by_key(|(k, _)| k.token);
for (key, mutation) in keyed_mutations {
writer.write_partition(key, vec![mutation]).unwrap();
}
let info = writer.finish().await.unwrap();
assert_eq!(info.partition_count, 3);
assert!(info.data_size > 0);
}
#[tokio::test]
async fn test_sstable_writer_token_ordering_validation() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let mut writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 1, &schema).unwrap();
let mutation1 = create_test_mutation("test_ks", "test_table", 1, "Alice", 1000000);
let key1 = mutation1.decorated_key(&schema).unwrap();
let token1 = key1.token;
writer
.write_partition(key1.clone(), vec![mutation1])
.unwrap();
let key2 = DecoratedKey::new(token1 - 1, vec![0x00, 0x00, 0x00, 0x02]);
let mutation2 = create_test_mutation("test_ks", "test_table", 2, "Bob", 1001000);
let result = writer.write_partition(key2, vec![mutation2]);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("token order"));
}
#[tokio::test]
async fn test_sstable_writer_component_paths() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let _writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 42, &schema).unwrap();
let data_path = SSTableWriter::component_path(temp_dir.path(), 42, "Data.db");
assert_eq!(
data_path.file_name().unwrap().to_str().unwrap(),
"nb-42-big-Data.db"
);
}
#[tokio::test]
async fn test_sstable_writer_toc_contents() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let mut writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 1, &schema).unwrap();
let mutation = create_test_mutation("test_ks", "test_table", 1, "Alice", 1000000);
let key = mutation.decorated_key(&schema).unwrap();
writer.write_partition(key, vec![mutation]).unwrap();
let info = writer.finish().await.unwrap();
let toc_contents = std::fs::read_to_string(&info.toc_path).unwrap();
assert!(toc_contents.contains("Data.db"));
assert!(toc_contents.contains("Index.db"));
assert!(toc_contents.contains("Filter.db"));
assert!(toc_contents.contains("Summary.db"));
assert!(toc_contents.contains("Statistics.db"));
assert!(!toc_contents.contains("CompressionInfo.db"));
assert!(toc_contents.contains("Digest.crc32"));
assert!(toc_contents.contains("TOC.txt"));
}
#[tokio::test]
async fn test_sstable_writer_statistics_metadata() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let mut writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 1, &schema).unwrap();
let mutations = vec![
{
let mut m = create_test_mutation("test_ks", "test_table", 1, "Alice", 1000000);
m.ttl_seconds = Some(3600);
m
},
create_test_mutation("test_ks", "test_table", 2, "Bob", 2000000),
{
let mut m = create_test_mutation("test_ks", "test_table", 3, "Charlie", 1500000);
m.ttl_seconds = Some(7200);
m
},
];
for mutation in mutations {
let key = mutation.decorated_key(&schema).unwrap();
writer.write_partition(key, vec![mutation]).unwrap();
}
assert_eq!(writer.stats.min_timestamp, 1000000);
assert_eq!(writer.stats.max_timestamp, 2000000);
assert_eq!(writer.stats.min_ttl, 3600);
assert_eq!(writer.stats.max_ttl, 7200);
assert_eq!(writer.stats.partition_count, 3);
let _info = writer.finish().await.unwrap();
}
#[tokio::test]
async fn test_sstable_writer_digest_crc32() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let mut writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 1, &schema).unwrap();
let mutation = create_test_mutation("test_ks", "test_table", 1, "Alice", 1000000);
let key = mutation.decorated_key(&schema).unwrap();
writer.write_partition(key, vec![mutation]).unwrap();
let info = writer.finish().await.unwrap();
let digest_contents = std::fs::read_to_string(&info.digest_path).unwrap();
assert!(!digest_contents.is_empty());
assert!(digest_contents.parse::<u32>().is_ok());
}
#[tokio::test]
async fn test_sstable_writer_empty_sstable() {
let temp_dir = TempDir::new().unwrap();
let schema = create_test_schema();
let writer = SSTableWriter::new(temp_dir.path().to_path_buf(), 1, &schema).unwrap();
let info = writer.finish().await.unwrap();
assert_eq!(info.partition_count, 0);
assert!(info.data_path.exists());
assert!(info.toc_path.exists());
}
}