use crate::error::{Error, Result};
use crate::schema::{Column, CqlType, TableSchema};
use crate::storage::serialization::types::TypeSerializer;
use crate::storage::serialization::vint::{encode_signed, encode_unsigned, unsigned_len};
use crate::storage::sstable::writer::stats_writer::StatisticsMetadata;
use crate::storage::write_engine::mutation::{
ClusteringBound, ClusteringKey, DecoratedKey, Mutation, PartitionKey, PartitionTombstone,
RangeTombstone, TableId,
};
use crate::types::{ComparatorType, UdtTypeDef, Value};
use std::io::Write;
use std::path::PathBuf;
const ROW_HAS_TIMESTAMP: u8 = 0x04;
const ROW_HAS_TTL: u8 = 0x08;
#[allow(dead_code)]
const ROW_HAS_DELETION: u8 = 0x10;
const ROW_HAS_ALL_COLUMNS: u8 = 0x20;
const ROW_HAS_COMPLEX_DELETION: u8 = 0x40;
const ROW_HAS_EXTENDED_FLAGS: u8 = 0x80;
const EXTENDED_IS_STATIC: u8 = 0x01;
const CELL_IS_DELETED: u8 = 0x01;
const CELL_IS_EXPIRING: u8 = 0x02;
const CELL_HAS_EMPTY_VALUE: u8 = 0x04;
const CELL_USE_ROW_TIMESTAMP: u8 = 0x08;
#[allow(dead_code)]
const CELL_USE_ROW_TTL: u8 = 0x10;
const IS_MARKER: u8 = 0x02;
const EXCL_END_BOUND: u8 = 0;
const INCL_START_BOUND: u8 = 1;
const INCL_END_BOUND: u8 = 6;
const EXCL_START_BOUND: u8 = 7;
const END_OF_PARTITION: u8 = 0x01;
const DATA_SINK_BUFFER_BYTES: usize = 1024 * 1024;
#[derive(Debug)]
pub struct DataWriter {
buffer: Vec<u8>,
sink: Option<std::io::BufWriter<std::fs::File>>,
data_path: Option<PathBuf>,
position: u64,
stats: StatisticsMetadata,
}
impl DataWriter {
pub fn new(stats: StatisticsMetadata) -> Self {
Self {
buffer: Vec::new(),
sink: None,
data_path: None,
position: 0,
stats,
}
}
pub fn with_sink(stats: StatisticsMetadata, data_path: PathBuf) -> Self {
Self {
buffer: Vec::new(),
sink: None,
data_path: Some(data_path),
position: 0,
stats,
}
}
fn ensure_sink(&mut self) -> Result<()> {
if self.sink.is_some() {
return Ok(());
}
if let Some(path) = self.data_path.clone() {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::File::create(&path)?;
self.sink = Some(std::io::BufWriter::with_capacity(
DATA_SINK_BUFFER_BYTES,
file,
));
}
Ok(())
}
fn flush_partition(&mut self) -> Result<()> {
if self.data_path.is_none() {
return Ok(());
}
self.ensure_sink()?;
if let Some(sink) = self.sink.as_mut() {
sink.write_all(&self.buffer)?;
}
self.position += self.buffer.len() as u64;
self.buffer.clear();
Ok(())
}
pub fn update_stats(&mut self, stats: StatisticsMetadata) {
self.stats = stats;
}
pub fn write_partition(
&mut self,
key: &DecoratedKey,
mutations: &[Mutation],
schema: &TableSchema,
partition_tombstone: Option<&PartitionTombstone>,
range_tombstones: &[RangeTombstone],
) -> Result<u64> {
let partition_offset = self.position + self.buffer.len() as u64;
let header_start = self.buffer.len();
self.write_partition_header(key, partition_tombstone)?;
let mut prev_unfiltered_size = (self.buffer.len() - header_start) as u64;
let partition_floor = partition_tombstone.map(|pt| pt.deletion_time);
let schema_has_static = schema.columns.iter().any(|c| c.is_static);
if schema_has_static {
let merged = collect_static_operations(mutations, schema, partition_floor);
let unshadowed_static = |m: &&Mutation| {
partition_floor.is_none_or(|floor| m.timestamp_micros > floor)
&& has_static_operation(m, schema)
};
if merged.is_empty() {
prev_unfiltered_size =
self.write_empty_static_row(prev_unfiltered_size, schema)? as u64;
} else {
let latest_ts = mutations
.iter()
.filter(unshadowed_static)
.map(|m| m.timestamp_micros)
.max()
.unwrap_or(mutations.first().map(|m| m.timestamp_micros).unwrap_or(0));
let ttl = mutations
.iter()
.filter(unshadowed_static)
.max_by_key(|m| m.timestamp_micros)
.and_then(|m| m.ttl_seconds);
let synthetic = Mutation {
table: mutations
.first()
.map(|m| m.table.clone())
.unwrap_or_else(|| TableId {
keyspace: schema.keyspace.clone(),
table: schema.table.clone(),
}),
partition_key: mutations
.first()
.map(|m| m.partition_key.clone())
.unwrap_or_else(|| PartitionKey {
columns: Vec::new(),
}),
clustering_key: None,
operations: merged,
timestamp_micros: latest_ts,
ttl_seconds: ttl,
partition_tombstone: None,
range_tombstones: Vec::new(),
};
prev_unfiltered_size =
self.write_static_row_with_prev_size(&synthetic, schema, prev_unfiltered_size)?
as u64;
}
}
let rows = self.merge_clustering_rows(
mutations,
schema,
schema_has_static,
partition_floor,
range_tombstones,
);
enum PartitionItem<'a> {
Row(RowWrite<'a>),
Marker {
bound: &'a ClusteringBound,
is_open: bool,
deletion_time: i64,
local_deletion_time: i32,
},
}
let mut items: Vec<PartitionItem> = rows.into_iter().map(PartitionItem::Row).collect();
for rt in range_tombstones {
items.push(PartitionItem::Marker {
bound: &rt.start,
is_open: true,
deletion_time: rt.deletion_time,
local_deletion_time: rt.local_deletion_time,
});
items.push(PartitionItem::Marker {
bound: &rt.end,
is_open: false,
deletion_time: rt.deletion_time,
local_deletion_time: rt.local_deletion_time,
});
}
fn sort_class<'a, 'b>(item: &'b PartitionItem<'a>) -> (i8, Option<&'b ClusteringKey>, i8) {
match item {
PartitionItem::Row(row) => (0, row.clustering_key, 0),
PartitionItem::Marker { bound, is_open, .. } => match bound {
ClusteringBound::Inclusive(ck) => (0, Some(ck), if *is_open { -1 } else { 1 }),
ClusteringBound::Exclusive(ck) => (0, Some(ck), if *is_open { 1 } else { -1 }),
ClusteringBound::Bottom => (-1, None, 0),
ClusteringBound::Top => (1, None, 0),
},
}
}
items.sort_by(|a, b| {
let (class_a, ck_a, weight_a) = sort_class(a);
let (class_b, ck_b, weight_b) = sort_class(b);
class_a
.cmp(&class_b)
.then_with(|| match (ck_a, ck_b) {
(Some(x), Some(y)) => x.compare(y, schema).unwrap_or_else(|_| x.cmp(y)),
_ => std::cmp::Ordering::Equal,
})
.then(weight_a.cmp(&weight_b))
});
for item in items {
prev_unfiltered_size = match item {
PartitionItem::Row(row) => {
self.write_merged_row_with_prev_size(&row, schema, prev_unfiltered_size)? as u64
}
PartitionItem::Marker {
bound,
is_open,
deletion_time,
local_deletion_time,
} => self.write_range_bound(
bound,
is_open,
deletion_time,
local_deletion_time,
schema,
prev_unfiltered_size,
)? as u64,
};
}
self.buffer.push(END_OF_PARTITION);
self.flush_partition()?;
Ok(partition_offset)
}
fn write_empty_static_row(&mut self, prev_size: u64, schema: &TableSchema) -> Result<usize> {
let start_len = self.buffer.len();
let flags: u8 = ROW_HAS_EXTENDED_FLAGS;
self.buffer.push(flags);
self.buffer.push(EXTENDED_IS_STATIC);
let mut body = Vec::new();
let static_columns = self.static_columns(schema);
let empty_present: std::collections::HashSet<&str> = std::collections::HashSet::new();
self.write_column_subset(&mut body, &static_columns, &empty_present)?;
let prev_size_vint_len = unsigned_len(prev_size);
let row_body_size = prev_size_vint_len as u64 + body.len() as u64;
let mut row_size_buf = Vec::new();
encode_unsigned(row_body_size, &mut row_size_buf);
self.buffer.extend_from_slice(&row_size_buf);
encode_unsigned(prev_size, &mut self.buffer);
self.buffer.extend_from_slice(&body);
Ok(self.buffer.len() - start_len)
}
pub fn finish(self) -> Result<Vec<u8>> {
if self.data_path.is_some() {
return Err(Error::InvalidInput(
"DataWriter::finish() called on a streaming writer; use finish_streaming()"
.to_string(),
));
}
Ok(self.buffer)
}
pub fn finish_streaming(mut self) -> Result<u64> {
if self.data_path.is_none() {
return Err(Error::InvalidInput(
"finish_streaming() called on an in-memory DataWriter".to_string(),
));
}
self.flush_partition()?;
if let Some(mut sink) = self.sink.take() {
sink.flush()?;
}
Ok(self.position)
}
fn write_partition_header(
&mut self,
key: &DecoratedKey,
tombstone: Option<&PartitionTombstone>,
) -> Result<()> {
if key.key.len() > 65535 {
return Err(Error::InvalidInput(format!(
"Partition key too large: {} bytes (max 65535)",
key.key.len()
)));
}
self.buffer
.write_all(&(key.key.len() as u16).to_be_bytes())?;
self.buffer.extend_from_slice(&key.key);
if let Some(ts) = tombstone {
self.buffer
.write_all(&ts.local_deletion_time.to_be_bytes())?;
self.buffer.write_all(&ts.deletion_time.to_be_bytes())?;
} else {
self.buffer.write_all(&i32::MAX.to_be_bytes())?;
self.buffer.write_all(&i64::MIN.to_be_bytes())?;
}
Ok(())
}
#[allow(dead_code)]
fn write_row(&mut self, mutation: &Mutation, schema: &TableSchema) -> Result<()> {
self.write_row_with_prev_size(mutation, schema, 0)?;
Ok(())
}
fn write_row_with_prev_size(
&mut self,
mutation: &Mutation,
schema: &TableSchema,
prev_size: u64,
) -> Result<usize> {
match Self::merge_row_group(&[mutation], schema, false, None) {
Some(row) => self.write_merged_row_with_prev_size(&row, schema, prev_size),
None => Ok(0),
}
}
fn merge_clustering_rows<'a>(
&self,
mutations: &'a [Mutation],
schema: &TableSchema,
skip_static_ops: bool,
partition_floor: Option<i64>,
range_tombstones: &[RangeTombstone],
) -> Vec<RowWrite<'a>> {
let row_mutations: Vec<&'a Mutation> = mutations
.iter()
.filter(|m| !is_static_row_mutation(m, schema))
.collect();
let mut rows = Vec::new();
let mut start = 0;
while start < row_mutations.len() {
let mut end = start + 1;
while end < row_mutations.len()
&& row_mutations[end].clustering_key == row_mutations[start].clustering_key
{
end += 1;
}
let clustering_key = row_mutations[start].clustering_key.as_ref();
let mut shadow_floor = partition_floor;
for rt in range_tombstones {
if range_tombstone_covers(rt, clustering_key, schema) {
shadow_floor =
Some(shadow_floor.map_or(rt.deletion_time, |f| f.max(rt.deletion_time)));
}
}
if let Some(row) = Self::merge_row_group(
&row_mutations[start..end],
schema,
skip_static_ops,
shadow_floor,
) {
rows.push(row);
}
start = end;
}
rows
}
fn merge_row_group<'a>(
group: &[&'a Mutation],
schema: &TableSchema,
skip_static_ops: bool,
shadow_floor: Option<i64>,
) -> Option<RowWrite<'a>> {
use crate::storage::write_engine::mutation::CellOperation;
let mut row_deletion: Option<(i64, i32)> = None;
for m in group {
let has_delete_row = m
.operations
.iter()
.any(|op| matches!(op, CellOperation::DeleteRow));
if has_delete_row
&& shadow_floor.is_none_or(|floor| m.timestamp_micros > floor)
&& row_deletion.is_none_or(|(ts, _)| m.timestamp_micros >= ts)
{
row_deletion = Some((m.timestamp_micros, (m.timestamp_micros / 1_000_000) as i32));
}
}
let deletion_ts = match (row_deletion.map(|(ts, _)| ts), shadow_floor) {
(Some(a), Some(b)) => Some(a.max(b)),
(a, b) => a.or(b),
};
let mut cells: std::collections::HashMap<&'a str, MergedOp<'a>> =
std::collections::HashMap::new();
let mut liveness: Option<(i64, Option<u32>)> = None;
for m in group {
if deletion_ts.is_some_and(|dts| m.timestamp_micros <= dts) {
continue;
}
let mut contributes_liveness = false;
for op in &m.operations {
let column = match op {
CellOperation::Write { column, .. }
| CellOperation::WriteWithTtl { column, .. }
| CellOperation::Delete { column } => column.as_str(),
CellOperation::DeleteRow => continue,
};
if skip_static_ops && is_static_operation(op, schema) {
continue;
}
if matches!(
op,
CellOperation::Write { .. } | CellOperation::WriteWithTtl { .. }
) {
contributes_liveness = true;
}
let candidate = MergedOp {
op,
timestamp_micros: m.timestamp_micros,
row_ttl_seconds: m.ttl_seconds,
};
match cells.entry(column) {
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(candidate);
}
std::collections::hash_map::Entry::Occupied(mut entry) => {
let existing = entry.get();
let candidate_is_tombstone =
matches!(candidate.op, CellOperation::Delete { .. });
let wins = candidate.timestamp_micros > existing.timestamp_micros
|| (candidate.timestamp_micros == existing.timestamp_micros
&& (candidate_is_tombstone
|| !matches!(existing.op, CellOperation::Delete { .. })));
if wins {
entry.insert(candidate);
}
}
}
}
let pure_pk_insert = m.operations.is_empty()
&& m.partition_tombstone.is_none()
&& m.range_tombstones.is_empty();
if (contributes_liveness || pure_pk_insert)
&& liveness.is_none_or(|(ts, _)| m.timestamp_micros >= ts)
{
liveness = Some((m.timestamp_micros, m.ttl_seconds));
}
}
let ops: Vec<MergedOp<'a>> = cells.into_values().collect();
if ops.is_empty() && row_deletion.is_none() && liveness.is_none() {
return None;
}
Some(RowWrite {
clustering_key: group[0].clustering_key.as_ref(),
liveness_ts: liveness.map(|(ts, _)| ts),
ttl_seconds: liveness.and_then(|(_, ttl)| ttl),
row_deletion,
ops,
})
}
fn write_merged_row_with_prev_size(
&mut self,
row: &RowWrite<'_>,
schema: &TableSchema,
prev_size: u64,
) -> Result<usize> {
use crate::storage::write_engine::mutation::CellOperation;
let start_len = self.buffer.len();
let mut flags = 0u8;
if row.row_deletion.is_some() {
flags |= ROW_HAS_DELETION; }
if row.liveness_ts.is_some() {
flags |= ROW_HAS_TIMESTAMP;
if row.ttl_seconds.is_some() {
flags |= ROW_HAS_TTL;
}
}
if row.row_deletion.is_none() {
let all_writes = row.ops.iter().all(|mop| {
matches!(
mop.op,
CellOperation::Write { .. } | CellOperation::WriteWithTtl { .. }
)
});
let has_nulls = row.ops.iter().any(|mop| match mop.op {
CellOperation::Write { value, .. } | CellOperation::WriteWithTtl { value, .. } => {
matches!(value, Value::Null)
}
_ => false,
});
let regular_column_count = self.regular_columns(schema).len();
if all_writes && !has_nulls && row.ops.len() == regular_column_count {
flags |= ROW_HAS_ALL_COLUMNS;
}
}
let has_complex = row.ops.iter().any(|mop| {
let col_name = match mop.op {
CellOperation::Write { column, .. }
| CellOperation::WriteWithTtl { column, .. }
| CellOperation::Delete { column } => Some(column.as_str()),
_ => None,
};
col_name.is_some_and(|name| {
schema
.columns
.iter()
.find(|c| c.name == name)
.map(|c| is_complex_column(&c.data_type))
.unwrap_or(false)
})
});
if has_complex {
flags |= ROW_HAS_COMPLEX_DELETION;
}
self.buffer.push(flags);
if let Some(clustering_key) = row.clustering_key {
self.write_clustering_prefix(clustering_key, schema)?;
}
let row_body = self.build_merged_row_body(row, schema, flags)?;
let prev_size_vint_len = unsigned_len(prev_size);
let row_body_size = prev_size_vint_len as u64 + row_body.len() as u64;
let mut row_size_buf = Vec::new();
encode_unsigned(row_body_size, &mut row_size_buf);
self.buffer.extend_from_slice(&row_size_buf);
encode_unsigned(prev_size, &mut self.buffer);
self.buffer.extend_from_slice(&row_body);
Ok(self.buffer.len() - start_len)
}
pub fn write_static_row(&mut self, mutation: &Mutation, schema: &TableSchema) -> Result<()> {
self.write_static_row_with_prev_size(mutation, schema, 0)?;
Ok(())
}
fn write_static_row_with_prev_size(
&mut self,
mutation: &Mutation,
schema: &TableSchema,
prev_size: u64,
) -> Result<usize> {
let start_len = self.buffer.len();
let mut flags = ROW_HAS_EXTENDED_FLAGS;
let is_row_tombstone = mutation.operations.iter().any(|op| {
matches!(
op,
crate::storage::write_engine::mutation::CellOperation::DeleteRow
)
});
if is_row_tombstone {
flags |= ROW_HAS_DELETION;
}
flags |= ROW_HAS_TIMESTAMP;
if !is_row_tombstone && mutation.ttl_seconds.is_some() {
flags |= ROW_HAS_TTL;
}
if !is_row_tombstone {
let all_writes = mutation.operations.iter().all(|op| {
matches!(
op,
crate::storage::write_engine::mutation::CellOperation::Write { .. }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl { .. }
)
});
let has_nulls = mutation.operations.iter().any(|op| match op {
crate::storage::write_engine::mutation::CellOperation::Write { value, .. }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
value,
..
} => {
matches!(value, Value::Null)
}
_ => false,
});
let static_column_count = schema.columns.iter().filter(|c| c.is_static).count();
if all_writes && !has_nulls && mutation.operations.len() == static_column_count {
flags |= ROW_HAS_ALL_COLUMNS;
}
}
self.buffer.push(flags);
self.buffer.push(EXTENDED_IS_STATIC);
let row_body = self.build_static_row_body(mutation, schema, flags)?;
let prev_size_vint_len = unsigned_len(prev_size);
let row_body_size = prev_size_vint_len as u64 + row_body.len() as u64;
let mut row_size_buf = Vec::new();
encode_unsigned(row_body_size, &mut row_size_buf);
self.buffer.extend_from_slice(&row_size_buf);
encode_unsigned(prev_size, &mut self.buffer);
self.buffer.extend_from_slice(&row_body);
Ok(self.buffer.len() - start_len)
}
fn build_static_row_body(
&self,
mutation: &Mutation,
schema: &TableSchema,
flags: u8,
) -> Result<Vec<u8>> {
let mut body = Vec::new();
if (flags & ROW_HAS_TIMESTAMP) != 0 {
let timestamp_delta = (mutation.timestamp_micros - self.stats.min_timestamp) as u64;
encode_unsigned(timestamp_delta, &mut body);
}
if (flags & ROW_HAS_TTL) != 0 {
if let Some(ttl) = mutation.ttl_seconds {
let ttl_delta = ttl as i64 - self.stats.min_ttl as i64;
if ttl_delta < 0 {
return Err(Error::InvalidInput(format!(
"TTL {} is less than min_ttl {}",
ttl, self.stats.min_ttl
)));
}
encode_unsigned(ttl_delta as u64, &mut body);
let local_deletion_time = self.expiring_local_deletion_time(ttl)?;
let ldt_delta =
(local_deletion_time as i64) - (self.stats.min_local_deletion_time as i64);
if ldt_delta < 0 {
return Err(Error::InvalidInput(format!(
"Local deletion time {} is less than min_local_deletion_time {}",
local_deletion_time, self.stats.min_local_deletion_time
)));
}
encode_unsigned(ldt_delta as u64, &mut body);
}
}
if (flags & ROW_HAS_DELETION) != 0 {
let ts_delta = (mutation.timestamp_micros - self.stats.min_timestamp) as u64;
encode_unsigned(ts_delta, &mut body);
let local_deletion_time = (mutation.timestamp_micros / 1_000_000) as i32;
let ldt_delta =
local_deletion_time.wrapping_sub(self.stats.min_local_deletion_time) as u32;
encode_unsigned(ldt_delta as u64, &mut body);
if (flags & ROW_HAS_ALL_COLUMNS) == 0 {
let static_columns = self.static_columns(schema);
let empty_present: std::collections::HashSet<&str> =
std::collections::HashSet::new();
self.write_column_subset(&mut body, &static_columns, &empty_present)?;
}
return Ok(body);
}
if (flags & ROW_HAS_ALL_COLUMNS) == 0 {
self.write_static_column_bitmap(&mut body, mutation, schema)?;
}
self.write_static_cells(&mut body, mutation, schema)?;
Ok(body)
}
fn write_static_column_bitmap(
&self,
buf: &mut Vec<u8>,
mutation: &Mutation,
schema: &TableSchema,
) -> Result<()> {
let present_columns: std::collections::HashSet<&str> = mutation
.operations
.iter()
.filter_map(|op| match op {
crate::storage::write_engine::mutation::CellOperation::Write { column, value }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
column,
value,
..
} if !matches!(value, Value::Null) => Some(column.as_str()),
crate::storage::write_engine::mutation::CellOperation::Delete { column } => {
Some(column.as_str())
}
_ => None,
})
.collect();
let static_columns = self.static_columns(schema);
self.write_column_subset(buf, &static_columns, &present_columns)
}
fn write_static_cells(
&self,
buf: &mut Vec<u8>,
mutation: &Mutation,
schema: &TableSchema,
) -> Result<()> {
let static_column_names: std::collections::HashSet<_> = schema
.columns
.iter()
.filter(|c| c.is_static)
.map(|c| &c.name)
.collect();
for op in self.sorted_operations(mutation, &self.static_columns(schema)) {
match op {
crate::storage::write_engine::mutation::CellOperation::Write { column, value } => {
if static_column_names.contains(column) && !matches!(value, Value::Null) {
self.write_cell(buf, column, value, mutation.timestamp_micros)?;
}
}
crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
column,
value,
ttl_seconds,
} => {
if static_column_names.contains(column) && !matches!(value, Value::Null) {
self.write_cell_with_ttl(
buf,
column,
value,
mutation.timestamp_micros,
*ttl_seconds,
)?;
}
}
crate::storage::write_engine::mutation::CellOperation::Delete { column } => {
if static_column_names.contains(column) {
let local_deletion_time = (mutation.timestamp_micros / 1_000_000) as i32;
self.write_tombstone_cell(
buf,
column,
mutation.timestamp_micros,
local_deletion_time,
)?;
}
}
crate::storage::write_engine::mutation::CellOperation::DeleteRow => {
}
}
}
Ok(())
}
#[cfg(test)]
fn build_row_body(
&self,
mutation: &Mutation,
schema: &TableSchema,
flags: u8,
) -> Result<Vec<u8>> {
let row = Self::merge_row_group(&[mutation], schema, false, None).unwrap_or(RowWrite {
clustering_key: mutation.clustering_key.as_ref(),
liveness_ts: Some(mutation.timestamp_micros),
ttl_seconds: mutation.ttl_seconds,
row_deletion: None,
ops: Vec::new(),
});
self.build_merged_row_body(&row, schema, flags)
}
fn build_merged_row_body(
&self,
row: &RowWrite<'_>,
schema: &TableSchema,
flags: u8,
) -> Result<Vec<u8>> {
let mut body = Vec::new();
if (flags & ROW_HAS_TIMESTAMP) != 0 {
let liveness_ts = row.liveness_ts.ok_or_else(|| {
Error::InvalidInput(
"ROW_HAS_TIMESTAMP set but row has no liveness timestamp".to_string(),
)
})?;
let timestamp_delta = (liveness_ts - self.stats.min_timestamp) as u64;
encode_unsigned(timestamp_delta, &mut body);
}
if (flags & ROW_HAS_TTL) != 0 {
if let Some(ttl) = row.ttl_seconds {
let ttl_delta = ttl as i64 - self.stats.min_ttl as i64;
if ttl_delta < 0 {
return Err(Error::InvalidInput(format!(
"TTL {} is less than min_ttl {}",
ttl, self.stats.min_ttl
)));
}
encode_unsigned(ttl_delta as u64, &mut body);
let local_deletion_time = self.expiring_local_deletion_time(ttl)?;
let ldt_delta =
(local_deletion_time as i64) - (self.stats.min_local_deletion_time as i64);
if ldt_delta < 0 {
return Err(Error::InvalidInput(format!(
"Local deletion time {} is less than min_local_deletion_time {}",
local_deletion_time, self.stats.min_local_deletion_time
)));
}
encode_unsigned(ldt_delta as u64, &mut body);
}
}
if (flags & ROW_HAS_DELETION) != 0 {
let (deletion_ts, local_deletion_time) = row.row_deletion.ok_or_else(|| {
Error::InvalidInput("ROW_HAS_DELETION set but row has no deletion time".to_string())
})?;
let ts_delta = (deletion_ts - self.stats.min_timestamp) as u64;
encode_unsigned(ts_delta, &mut body);
let ldt_delta =
local_deletion_time.wrapping_sub(self.stats.min_local_deletion_time) as u32;
encode_unsigned(ldt_delta as u64, &mut body);
}
if (flags & ROW_HAS_ALL_COLUMNS) == 0 {
self.write_merged_column_bitmap(&mut body, &row.ops, schema)?;
}
self.write_merged_cells(&mut body, row, schema)?;
Ok(body)
}
fn write_clustering_prefix(
&mut self,
clustering_key: &crate::storage::write_engine::mutation::ClusteringKey,
schema: &TableSchema,
) -> Result<()> {
let mut header = 0u64;
for (i, (_, value)) in clustering_key.columns.iter().enumerate() {
let state = match value {
Value::Null => 2, _ => 0, };
header |= (state as u64) << (i * 2);
}
encode_unsigned(header, &mut self.buffer);
for (i, (_, value)) in clustering_key.columns.iter().enumerate() {
if !matches!(value, Value::Null) {
if i >= schema.clustering_keys.len() {
return Err(Error::Schema(format!(
"Clustering key has more columns than schema: {} > {}",
i + 1,
schema.clustering_keys.len()
)));
}
let cluster_col = &schema.clustering_keys[i];
let comparator = ComparatorType::from_data_type(&cluster_col.data_type)?;
let value_bytes = serialize_value_for_clustering(value, &comparator)?;
self.buffer.extend_from_slice(&value_bytes);
}
}
Ok(())
}
#[cfg(test)]
fn write_column_bitmap(
&self,
buf: &mut Vec<u8>,
mutation: &Mutation,
schema: &TableSchema,
) -> Result<()> {
let present_columns: std::collections::HashSet<&str> = mutation
.operations
.iter()
.filter_map(|op| match op {
crate::storage::write_engine::mutation::CellOperation::Write { column, value }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
column,
value,
..
} if !matches!(value, Value::Null) => Some(column.as_str()),
crate::storage::write_engine::mutation::CellOperation::Delete { column } => {
Some(column.as_str())
}
_ => None,
})
.collect();
let regular_columns = self.regular_columns(schema);
self.write_column_subset(buf, ®ular_columns, &present_columns)
}
fn write_merged_column_bitmap(
&self,
buf: &mut Vec<u8>,
ops: &[MergedOp<'_>],
schema: &TableSchema,
) -> Result<()> {
use crate::storage::write_engine::mutation::CellOperation;
let present_columns: std::collections::HashSet<&str> = ops
.iter()
.filter_map(|mop| match mop.op {
CellOperation::Write { column, value }
| CellOperation::WriteWithTtl { column, value, .. }
if !matches!(value, Value::Null) =>
{
Some(column.as_str())
}
CellOperation::Delete { column } => Some(column.as_str()),
_ => None,
})
.collect();
let regular_columns = self.regular_columns(schema);
self.write_column_subset(buf, ®ular_columns, &present_columns)
}
fn regular_columns<'a>(&self, schema: &'a TableSchema) -> Vec<&'a Column> {
self.ordered_columns(schema, |column| {
!column.is_static
&& !schema.is_partition_key(&column.name)
&& !schema.is_clustering_key(&column.name)
})
}
fn static_columns<'a>(&self, schema: &'a TableSchema) -> Vec<&'a Column> {
self.ordered_columns(schema, |column| column.is_static)
}
fn write_merged_cells(
&self,
buf: &mut Vec<u8>,
row: &RowWrite<'_>,
schema: &TableSchema,
) -> Result<()> {
use crate::storage::write_engine::mutation::CellOperation;
for mop in self.sorted_merged_ops(&row.ops, schema) {
match mop.op {
CellOperation::Write { column, value } => {
if matches!(value, Value::Null) {
continue;
}
let is_complex = schema
.columns
.iter()
.find(|c| c.name == *column)
.map(|c| is_complex_column(&c.data_type))
.unwrap_or(false);
if is_complex {
let col = schema
.columns
.iter()
.find(|c| c.name == *column)
.ok_or_else(|| {
Error::Schema(format!(
"Complex column '{}' not found in schema",
column
))
})?;
self.write_complex_column(buf, col, value, mop.timestamp_micros, None)?;
} else if let Some(ttl_seconds) = mop.row_ttl_seconds {
if row.ttl_seconds == Some(ttl_seconds)
&& row.liveness_ts == Some(mop.timestamp_micros)
{
self.write_cell_with_row_ttl(
buf,
column,
value,
mop.timestamp_micros,
ttl_seconds,
)?;
} else {
self.write_cell_with_ttl(
buf,
column,
value,
mop.timestamp_micros,
ttl_seconds,
)?;
}
} else if row.liveness_ts == Some(mop.timestamp_micros) {
self.write_cell(buf, column, value, mop.timestamp_micros)?;
} else {
self.write_cell_explicit_ts(buf, column, value, mop.timestamp_micros)?;
}
}
CellOperation::WriteWithTtl {
column,
value,
ttl_seconds,
} => {
if matches!(value, Value::Null) {
continue;
}
let is_complex = schema
.columns
.iter()
.find(|c| c.name == *column)
.map(|c| is_complex_column(&c.data_type))
.unwrap_or(false);
if is_complex {
let col = schema
.columns
.iter()
.find(|c| c.name == *column)
.ok_or_else(|| {
Error::Schema(format!(
"Complex column '{}' not found in schema",
column
))
})?;
self.write_complex_column(
buf,
col,
value,
mop.timestamp_micros,
Some(*ttl_seconds),
)?;
} else {
self.write_cell_with_ttl(
buf,
column,
value,
mop.timestamp_micros,
*ttl_seconds,
)?;
}
}
CellOperation::Delete { column } => {
let is_complex = schema
.columns
.iter()
.find(|c| c.name == *column)
.map(|c| is_complex_column(&c.data_type))
.unwrap_or(false);
if is_complex {
self.write_complex_column_deletion(buf, mop.timestamp_micros)?;
} else {
let local_deletion_time = (mop.timestamp_micros / 1_000_000) as i32;
self.write_tombstone_cell(
buf,
column,
mop.timestamp_micros,
local_deletion_time,
)?;
}
}
CellOperation::DeleteRow => {
}
}
}
Ok(())
}
fn write_complex_column(
&self,
buf: &mut Vec<u8>,
column: &Column,
value: &Value,
timestamp_micros: i64,
ttl_seconds: Option<u32>,
) -> Result<()> {
let ts_delta = i64::MIN.wrapping_sub(self.stats.min_timestamp) as u64;
encode_unsigned(ts_delta, buf);
let ldt_delta = i32::MAX.wrapping_sub(self.stats.min_local_deletion_time) as u32;
encode_unsigned(ldt_delta as u64, buf);
let dt = column.data_type.to_lowercase();
if dt.starts_with("set<") || dt.starts_with("org.apache.cassandra.db.marshal.settype(") {
self.write_set_complex_cells(buf, value, timestamp_micros, ttl_seconds)?;
} else if dt.starts_with("map<")
|| dt.starts_with("org.apache.cassandra.db.marshal.maptype(")
{
self.write_map_complex_cells(buf, value, timestamp_micros, ttl_seconds)?;
} else if dt.starts_with("list<")
|| dt.starts_with("org.apache.cassandra.db.marshal.listtype(")
{
self.write_list_complex_cells(buf, value, timestamp_micros, ttl_seconds)?;
} else {
return Err(Error::InvalidInput(format!(
"Column '{}' has type '{}' which is not a recognized complex column type",
column.name, column.data_type
)));
}
Ok(())
}
fn write_complex_column_deletion(
&self,
buf: &mut Vec<u8>,
timestamp_micros: i64,
) -> Result<()> {
let ts_delta = (timestamp_micros - self.stats.min_timestamp) as u64;
encode_unsigned(ts_delta, buf);
let local_deletion_time = (timestamp_micros / 1_000_000) as i32;
let ldt_delta = local_deletion_time.wrapping_sub(self.stats.min_local_deletion_time) as u32;
encode_unsigned(ldt_delta as u64, buf);
encode_unsigned(0u64, buf);
Ok(())
}
fn write_complex_cell_header(
&self,
buf: &mut Vec<u8>,
base_flags: u8,
timestamp_micros: i64,
ttl_seconds: Option<u32>,
) -> Result<()> {
match ttl_seconds {
Some(ttl) => {
let flags = base_flags | CELL_IS_EXPIRING;
buf.push(flags);
let timestamp_delta = (timestamp_micros - self.stats.min_timestamp) as u64;
encode_unsigned(timestamp_delta, buf);
let now_seconds = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| Error::Storage(format!("System time error: {}", e)))?
.as_secs() as i32;
let local_deletion_time = now_seconds.saturating_add(ttl as i32);
let ldt_delta =
(local_deletion_time as i64) - (self.stats.min_local_deletion_time as i64);
if ldt_delta < 0 {
return Err(Error::InvalidInput(format!(
"Complex cell: local deletion time {} is less than min_local_deletion_time {}",
local_deletion_time, self.stats.min_local_deletion_time
)));
}
encode_unsigned(ldt_delta as u64, buf);
let ttl_delta = (ttl as i64) - (self.stats.min_ttl as i64);
if ttl_delta < 0 {
return Err(Error::InvalidInput(format!(
"Complex cell: TTL {} is less than min_ttl {}",
ttl, self.stats.min_ttl
)));
}
encode_unsigned(ttl_delta as u64, buf);
}
None => {
buf.push(base_flags | CELL_USE_ROW_TIMESTAMP);
}
}
Ok(())
}
fn write_set_complex_cells(
&self,
buf: &mut Vec<u8>,
value: &Value,
timestamp_micros: i64,
ttl_seconds: Option<u32>,
) -> Result<()> {
let elements = match value {
Value::Set(elements) => elements,
_ => {
return Err(Error::InvalidInput(format!(
"Expected Set value for complex SET column, got {:?}",
value
)))
}
};
let mut serialized: Vec<Vec<u8>> = elements
.iter()
.map(|e| serialize_collection_element(e, "SET"))
.collect::<Result<Vec<_>>>()?;
serialized.sort();
encode_unsigned(serialized.len() as u64, buf);
for path_bytes in &serialized {
self.write_complex_cell_header(
buf,
CELL_HAS_EMPTY_VALUE,
timestamp_micros,
ttl_seconds,
)?;
encode_unsigned(path_bytes.len() as u64, buf);
buf.extend_from_slice(path_bytes);
}
Ok(())
}
fn write_map_complex_cells(
&self,
buf: &mut Vec<u8>,
value: &Value,
timestamp_micros: i64,
ttl_seconds: Option<u32>,
) -> Result<()> {
let entries = match value {
Value::Map(entries) => entries,
_ => {
return Err(Error::InvalidInput(format!(
"Expected Map value for complex MAP column, got {:?}",
value
)))
}
};
let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entries
.iter()
.map(|(key, val)| {
if matches!(key, Value::Null) {
return Err(Error::InvalidInput(
"MAP keys cannot be null (CQL semantics)".to_string(),
));
}
Ok((serialize_value(key)?, serialize_value(val)?))
})
.collect::<Result<Vec<_>>>()?;
serialized.sort_by(|a, b| a.0.cmp(&b.0));
encode_unsigned(serialized.len() as u64, buf);
for (path_bytes, value_bytes) in &serialized {
self.write_complex_cell_header(buf, 0, timestamp_micros, ttl_seconds)?;
encode_unsigned(path_bytes.len() as u64, buf);
buf.extend_from_slice(path_bytes);
encode_unsigned(value_bytes.len() as u64, buf);
buf.extend_from_slice(value_bytes);
}
Ok(())
}
fn write_list_complex_cells(
&self,
buf: &mut Vec<u8>,
value: &Value,
timestamp_micros: i64,
ttl_seconds: Option<u32>,
) -> Result<()> {
let elements = match value {
Value::List(elements) => elements,
_ => {
return Err(Error::InvalidInput(format!(
"Expected List value for complex LIST column, got {:?}",
value
)))
}
};
encode_unsigned(elements.len() as u64, buf);
for (i, elem) in elements.iter().enumerate() {
if matches!(elem, Value::Null) {
return Err(Error::InvalidInput(
"LIST elements cannot be null (CQL semantics)".to_string(),
));
}
self.write_complex_cell_header(buf, 0, timestamp_micros, ttl_seconds)?;
let timeuuid = generate_list_cell_path_timeuuid(timestamp_micros, i as u64);
encode_unsigned(16u64, buf);
buf.extend_from_slice(&timeuuid);
let value_bytes = serialize_value(elem)?;
encode_unsigned(value_bytes.len() as u64, buf);
buf.extend_from_slice(&value_bytes);
}
Ok(())
}
fn write_cell(
&self,
buf: &mut Vec<u8>,
column: &str,
value: &Value,
timestamp: i64,
) -> Result<()> {
if matches!(value, Value::Null) {
return Err(Error::InvalidInput(format!(
"NULL values should not be written as cells (column: {}). They are represented by absence in the bitmap.",
column
)));
}
let mut flags = CELL_USE_ROW_TIMESTAMP;
let is_empty_string = matches!(value, Value::Text(s) if s.is_empty());
if is_empty_string {
flags |= CELL_HAS_EMPTY_VALUE;
}
buf.push(flags);
if (flags & CELL_USE_ROW_TIMESTAMP) == 0 {
let timestamp_delta = (timestamp - self.stats.min_timestamp) as u64;
encode_unsigned(timestamp_delta, buf);
}
if (flags & CELL_HAS_EMPTY_VALUE) != 0 {
return Ok(());
}
let value_bytes = serialize_value(value)?;
if value_bytes.len() > i64::MAX as usize {
return Err(Error::InvalidInput(format!(
"Value too large for column '{}': {} bytes (max {})",
column,
value_bytes.len(),
i64::MAX
)));
}
if cell_value_uses_length_prefix(value) {
encode_unsigned(value_bytes.len() as u64, buf);
}
buf.extend_from_slice(&value_bytes);
Ok(())
}
fn write_cell_explicit_ts(
&self,
buf: &mut Vec<u8>,
column: &str,
value: &Value,
timestamp: i64,
) -> Result<()> {
if matches!(value, Value::Null) {
return Err(Error::InvalidInput(format!(
"NULL values should not be written as cells (column: {}). They are represented by absence in the bitmap.",
column
)));
}
let mut flags = 0u8;
if matches!(value, Value::Text(s) if s.is_empty()) {
flags |= CELL_HAS_EMPTY_VALUE;
}
buf.push(flags);
let timestamp_delta = (timestamp - self.stats.min_timestamp) as u64;
encode_unsigned(timestamp_delta, buf);
if (flags & CELL_HAS_EMPTY_VALUE) != 0 {
return Ok(());
}
let value_bytes = serialize_value(value)?;
if value_bytes.len() > i64::MAX as usize {
return Err(Error::InvalidInput(format!(
"Value too large for column '{}': {} bytes (max {})",
column,
value_bytes.len(),
i64::MAX
)));
}
if cell_value_uses_length_prefix(value) {
encode_unsigned(value_bytes.len() as u64, buf);
}
buf.extend_from_slice(&value_bytes);
Ok(())
}
fn write_cell_with_ttl(
&self,
buf: &mut Vec<u8>,
column: &str,
value: &Value,
timestamp: i64,
ttl_seconds: u32,
) -> Result<()> {
if matches!(value, Value::Null) {
return Err(Error::InvalidInput(format!(
"NULL values should not be written as cells (column: {}). They are represented by absence in the bitmap.",
column
)));
}
let local_deletion_time = self.expiring_local_deletion_time(ttl_seconds)?;
let mut flags = CELL_IS_EXPIRING;
if matches!(value, Value::Text(s) if s.is_empty()) {
flags |= CELL_HAS_EMPTY_VALUE;
}
buf.push(flags);
let timestamp_delta = (timestamp - self.stats.min_timestamp) as u64;
encode_unsigned(timestamp_delta, buf);
let ldt_delta = (local_deletion_time as i64) - (self.stats.min_local_deletion_time as i64);
if ldt_delta < 0 {
return Err(Error::InvalidInput(format!(
"Local deletion time {} is less than min_local_deletion_time {}",
local_deletion_time, self.stats.min_local_deletion_time
)));
}
encode_unsigned(ldt_delta as u64, buf);
let ttl_delta = (ttl_seconds as i64) - (self.stats.min_ttl as i64);
if ttl_delta < 0 {
return Err(Error::InvalidInput(format!(
"TTL {} is less than min_ttl {}",
ttl_seconds, self.stats.min_ttl
)));
}
encode_unsigned(ttl_delta as u64, buf);
if (flags & CELL_HAS_EMPTY_VALUE) != 0 {
return Ok(());
}
let value_bytes = serialize_value(value)?;
if value_bytes.len() > i64::MAX as usize {
return Err(Error::InvalidInput(format!(
"Value too large for column '{}': {} bytes (max {})",
column,
value_bytes.len(),
i64::MAX
)));
}
if cell_value_uses_length_prefix(value) {
encode_unsigned(value_bytes.len() as u64, buf);
}
buf.extend_from_slice(&value_bytes);
Ok(())
}
fn write_cell_with_row_ttl(
&self,
buf: &mut Vec<u8>,
column: &str,
value: &Value,
_timestamp: i64,
_ttl_seconds: u32,
) -> Result<()> {
if matches!(value, Value::Null) {
return Err(Error::InvalidInput(format!(
"NULL values should not be written as cells (column: {}). They are represented by absence in the bitmap.",
column
)));
}
let mut flags = CELL_IS_EXPIRING | CELL_USE_ROW_TIMESTAMP | CELL_USE_ROW_TTL;
if matches!(value, Value::Text(s) if s.is_empty()) {
flags |= CELL_HAS_EMPTY_VALUE;
}
buf.push(flags);
if (flags & CELL_HAS_EMPTY_VALUE) != 0 {
return Ok(());
}
let value_bytes = serialize_value(value)?;
if value_bytes.len() > i64::MAX as usize {
return Err(Error::InvalidInput(format!(
"Value too large for column '{}': {} bytes (max {})",
column,
value_bytes.len(),
i64::MAX
)));
}
if cell_value_uses_length_prefix(value) {
encode_unsigned(value_bytes.len() as u64, buf);
}
buf.extend_from_slice(&value_bytes);
Ok(())
}
fn expiring_local_deletion_time(&self, ttl_seconds: u32) -> Result<i32> {
let now_seconds = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| Error::Storage(format!("System time error: {}", e)))?
.as_secs() as i32;
Ok(now_seconds.saturating_add(ttl_seconds as i32))
}
fn write_tombstone_cell(
&self,
buf: &mut Vec<u8>,
_column: &str,
timestamp: i64,
local_deletion_time: i32,
) -> Result<()> {
let flags = CELL_IS_DELETED | CELL_HAS_EMPTY_VALUE;
buf.push(flags);
let timestamp_delta = (timestamp - self.stats.min_timestamp) as u64;
encode_unsigned(timestamp_delta, buf);
let deletion_time_delta =
(local_deletion_time as i64) - (self.stats.min_local_deletion_time as i64);
if deletion_time_delta < 0 {
return Err(Error::InvalidInput(format!(
"Local deletion time {} is less than min_local_deletion_time {}",
local_deletion_time, self.stats.min_local_deletion_time
)));
}
encode_unsigned(deletion_time_delta as u64, buf);
Ok(())
}
fn write_range_bound(
&mut self,
bound: &ClusteringBound,
is_open: bool,
deletion_time: i64,
local_deletion_time: i32,
schema: &TableSchema,
prev_size: u64,
) -> Result<usize> {
let start_len = self.buffer.len();
self.buffer.push(IS_MARKER);
let (bound_kind, clustering) = match (is_open, bound) {
(true, ClusteringBound::Inclusive(ck)) => (INCL_START_BOUND, Some(ck)),
(true, ClusteringBound::Exclusive(ck)) => (EXCL_START_BOUND, Some(ck)),
(false, ClusteringBound::Inclusive(ck)) => (INCL_END_BOUND, Some(ck)),
(false, ClusteringBound::Exclusive(ck)) => (EXCL_END_BOUND, Some(ck)),
(true, ClusteringBound::Bottom | ClusteringBound::Top) => (INCL_START_BOUND, None),
(false, ClusteringBound::Bottom | ClusteringBound::Top) => (INCL_END_BOUND, None),
};
self.buffer.push(bound_kind);
let cluster_count = clustering.map_or(0, |ck| ck.columns.len());
if cluster_count > u16::MAX as usize {
return Err(Error::InvalidInput(format!(
"Range tombstone bound has too many clustering values: {}",
cluster_count
)));
}
self.buffer
.write_all(&(cluster_count as u16).to_be_bytes())?;
if let Some(ck) = clustering {
self.write_clustering_prefix(ck, schema)?;
}
let mut deletion = Vec::new();
let ts_delta = (deletion_time - self.stats.min_timestamp) as u64;
encode_unsigned(ts_delta, &mut deletion);
let ldt_delta =
(local_deletion_time as i64 - self.stats.min_local_deletion_time as i64) as u64;
encode_unsigned(ldt_delta, &mut deletion);
let body_size = unsigned_len(prev_size) as u64 + deletion.len() as u64;
encode_unsigned(body_size, &mut self.buffer);
encode_unsigned(prev_size, &mut self.buffer);
self.buffer.extend_from_slice(&deletion);
Ok(self.buffer.len() - start_len)
}
pub fn position(&self) -> u64 {
self.position + self.buffer.len() as u64
}
#[cfg(test)]
pub(crate) fn scratch_len(&self) -> usize {
self.buffer.len()
}
#[cfg(test)]
pub(crate) fn flushed_position(&self) -> u64 {
self.position
}
fn ordered_columns<'a, F>(&self, schema: &'a TableSchema, predicate: F) -> Vec<&'a Column>
where
F: Fn(&Column) -> bool,
{
let mut columns: Vec<&'a Column> = schema
.columns
.iter()
.filter(|column| predicate(column))
.collect();
columns.sort_by_key(|column| column_order_key(column));
columns
}
fn sorted_operations<'a>(
&self,
mutation: &'a Mutation,
columns: &[&Column],
) -> Vec<&'a crate::storage::write_engine::mutation::CellOperation> {
let column_order: std::collections::HashMap<&str, usize> = columns
.iter()
.enumerate()
.map(|(idx, column)| (column.name.as_str(), idx))
.collect();
let mut operations: Vec<_> = mutation.operations.iter().collect();
operations.sort_by_key(|operation| match operation {
crate::storage::write_engine::mutation::CellOperation::Write { column, .. }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
column, ..
}
| crate::storage::write_engine::mutation::CellOperation::Delete { column } => {
column_order
.get(column.as_str())
.copied()
.unwrap_or(usize::MAX - 1)
}
crate::storage::write_engine::mutation::CellOperation::DeleteRow => usize::MAX,
});
operations
}
fn sorted_merged_ops<'a, 'b>(
&self,
ops: &'b [MergedOp<'a>],
schema: &TableSchema,
) -> Vec<&'b MergedOp<'a>> {
let columns = self.regular_columns(schema);
let column_order: std::collections::HashMap<&str, usize> = columns
.iter()
.enumerate()
.map(|(idx, column)| (column.name.as_str(), idx))
.collect();
let mut sorted: Vec<&'b MergedOp<'a>> = ops.iter().collect();
sorted.sort_by_key(|mop| match mop.op {
crate::storage::write_engine::mutation::CellOperation::Write { column, .. }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
column, ..
}
| crate::storage::write_engine::mutation::CellOperation::Delete { column } => {
column_order
.get(column.as_str())
.copied()
.unwrap_or(usize::MAX - 1)
}
crate::storage::write_engine::mutation::CellOperation::DeleteRow => usize::MAX,
});
sorted
}
fn write_column_subset(
&self,
buf: &mut Vec<u8>,
columns: &[&Column],
present_columns: &std::collections::HashSet<&str>,
) -> Result<()> {
let mut present_indices = Vec::new();
let mut missing_indices = Vec::new();
for (idx, column) in columns.iter().enumerate() {
if present_columns.contains(column.name.as_str()) {
present_indices.push(idx);
} else {
missing_indices.push(idx);
}
}
if missing_indices.is_empty() {
encode_unsigned(0, buf);
return Ok(());
}
if columns.len() < 64 {
let mut bitmap = 0u64;
for idx in missing_indices {
bitmap |= 1u64 << idx;
}
encode_unsigned(bitmap, buf);
return Ok(());
}
encode_unsigned((columns.len() - present_indices.len()) as u64, buf);
if present_indices.len() < columns.len() / 2 {
for idx in present_indices {
encode_unsigned(idx as u64, buf);
}
} else {
for idx in missing_indices {
encode_unsigned(idx as u64, buf);
}
}
Ok(())
}
}
fn is_complex_column(data_type: &str) -> bool {
let dt = data_type.to_lowercase();
if dt.starts_with("frozen<") || dt.starts_with("org.apache.cassandra.db.marshal.frozentype(") {
return false;
}
if dt.starts_with("list<") || dt.starts_with("set<") || dt.starts_with("map<") {
return true;
}
if dt.starts_with("org.apache.cassandra.db.marshal.listtype(")
|| dt.starts_with("org.apache.cassandra.db.marshal.settype(")
|| dt.starts_with("org.apache.cassandra.db.marshal.maptype(")
{
return true;
}
false
}
struct MergedOp<'a> {
op: &'a crate::storage::write_engine::mutation::CellOperation,
timestamp_micros: i64,
row_ttl_seconds: Option<u32>,
}
struct RowWrite<'a> {
clustering_key: Option<&'a crate::storage::write_engine::mutation::ClusteringKey>,
liveness_ts: Option<i64>,
ttl_seconds: Option<u32>,
row_deletion: Option<(i64, i32)>,
ops: Vec<MergedOp<'a>>,
}
fn column_order_key(column: &Column) -> (bool, &str) {
(is_complex_column(&column.data_type), column.name.as_str())
}
fn generate_list_cell_path_timeuuid(timestamp_micros: i64, element_index: u64) -> [u8; 16] {
const UUID_EPOCH_OFFSET: u64 = 0x01B2_1DD2_1381_4000;
let ts_100ns = (timestamp_micros as u64) * 10 + element_index;
let uuid_ts = ts_100ns + UUID_EPOCH_OFFSET;
let time_low = (uuid_ts & 0xFFFF_FFFF) as u32;
let time_mid = ((uuid_ts >> 32) & 0xFFFF) as u16;
let time_hi = ((uuid_ts >> 48) & 0x0FFF) as u16 | 0x1000;
let clock_seq: u16 = 0x80; let node: [u8; 6] = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
let mut uuid = [0u8; 16];
uuid[0..4].copy_from_slice(&time_low.to_be_bytes());
uuid[4..6].copy_from_slice(&time_mid.to_be_bytes());
uuid[6..8].copy_from_slice(&time_hi.to_be_bytes());
uuid[8] = (clock_seq >> 8) as u8;
uuid[9] = (clock_seq & 0xFF) as u8;
uuid[10..16].copy_from_slice(&node);
uuid
}
fn len_as_i32(len: usize) -> Result<i32> {
i32::try_from(len).map_err(|_| {
Error::InvalidInput(format!(
"Length {} exceeds maximum i32 for collection encoding",
len
))
})
}
fn serialize_collection_element(value: &Value, collection_kind: &str) -> Result<Vec<u8>> {
if matches!(value, Value::Null) {
return Err(Error::InvalidInput(format!(
"{} elements cannot be null (CQL semantics)",
collection_kind
)));
}
serialize_value(value)
}
fn serialize_value(value: &Value) -> Result<Vec<u8>> {
match value {
Value::Null => Ok(Vec::new()),
Value::Boolean(b) => Ok(vec![if *b { 1 } else { 0 }]),
Value::TinyInt(n) => Ok(vec![*n as u8]),
Value::SmallInt(n) => Ok(n.to_be_bytes().to_vec()),
Value::Integer(n) => Ok(n.to_be_bytes().to_vec()),
Value::BigInt(n) => Ok(n.to_be_bytes().to_vec()),
Value::Counter(n) => Ok(n.to_be_bytes().to_vec()),
Value::Float32(f) => Ok(f.to_bits().to_be_bytes().to_vec()),
Value::Float(f) => Ok(f.to_bits().to_be_bytes().to_vec()),
Value::Text(s) => Ok(s.as_bytes().to_vec()),
Value::Blob(bytes) => Ok(bytes.clone()),
Value::Timestamp(millis) => Ok(millis.to_be_bytes().to_vec()),
Value::Date(days) => {
let stored = days.wrapping_sub(i32::MIN) as u32;
Ok(stored.to_be_bytes().to_vec())
}
Value::Time(nanos) => Ok(nanos.to_be_bytes().to_vec()),
Value::Uuid(bytes) => Ok(bytes.to_vec()),
Value::Inet(bytes) => Ok(bytes.clone()),
Value::Varint(bytes) => Ok(bytes.clone()),
Value::Decimal { scale, unscaled } => {
let mut result = Vec::new();
result.extend_from_slice(&scale.to_be_bytes());
result.extend_from_slice(unscaled);
Ok(result)
}
Value::Duration {
months,
days,
nanos,
} => {
let mut result = Vec::new();
encode_signed(*months as i64, &mut result);
encode_signed(*days as i64, &mut result);
encode_signed(*nanos, &mut result);
Ok(result)
}
Value::Udt(udt_value) => {
let mut schema =
UdtTypeDef::new(udt_value.keyspace.clone(), udt_value.type_name.clone());
for field in &udt_value.fields {
let field_type = infer_cql_type_from_value(field.value.as_ref());
schema = schema.with_field(field.name.clone(), field_type, true);
}
let serializer = TypeSerializer::new();
serializer.serialize_udt(value, &schema)
}
Value::List(elements) | Value::Set(elements) => {
let mut buf = Vec::new();
buf.extend_from_slice(&len_as_i32(elements.len())?.to_be_bytes());
for elem in elements {
let elem_bytes = serialize_collection_element(elem, "Collection")?;
buf.extend_from_slice(&len_as_i32(elem_bytes.len())?.to_be_bytes());
buf.extend_from_slice(&elem_bytes);
}
Ok(buf)
}
Value::Map(entries) => {
let mut buf = Vec::new();
buf.extend_from_slice(&len_as_i32(entries.len())?.to_be_bytes());
for (key, val) in entries {
if matches!(key, Value::Null) {
return Err(Error::InvalidInput(
"MAP keys cannot be null (CQL semantics)".to_string(),
));
}
let key_bytes = serialize_value(key)?;
buf.extend_from_slice(&len_as_i32(key_bytes.len())?.to_be_bytes());
buf.extend_from_slice(&key_bytes);
let val_bytes = serialize_value(val)?;
buf.extend_from_slice(&len_as_i32(val_bytes.len())?.to_be_bytes());
buf.extend_from_slice(&val_bytes);
}
Ok(buf)
}
Value::Tuple(fields) => {
let mut buf = Vec::new();
for field in fields {
match field {
Value::Null => buf.extend_from_slice(&(-1i32).to_be_bytes()),
other => {
let field_bytes = serialize_value(other)?;
buf.extend_from_slice(&len_as_i32(field_bytes.len())?.to_be_bytes());
buf.extend_from_slice(&field_bytes);
}
}
}
Ok(buf)
}
Value::Frozen(inner) => serialize_value(inner),
_ => Err(Error::InvalidInput(format!(
"Unsupported value type for serialization: {:?}",
value
))),
}
}
fn infer_cql_type_from_value(value: Option<&Value>) -> CqlType {
match value {
None | Some(Value::Null) => CqlType::Text, Some(Value::Boolean(_)) => CqlType::Boolean,
Some(Value::TinyInt(_)) => CqlType::TinyInt,
Some(Value::SmallInt(_)) => CqlType::SmallInt,
Some(Value::Integer(_)) => CqlType::Int,
Some(Value::BigInt(_)) => CqlType::BigInt,
Some(Value::Float32(_)) => CqlType::Float,
Some(Value::Float(_)) => CqlType::Double,
Some(Value::Text(_)) => CqlType::Text,
Some(Value::Blob(_)) => CqlType::Blob,
Some(Value::Timestamp(_)) => CqlType::Timestamp,
Some(Value::Date(_)) => CqlType::Date,
Some(Value::Time(_)) => CqlType::Time,
Some(Value::Uuid(_)) => CqlType::Uuid,
Some(Value::Inet(_)) => CqlType::Inet,
Some(Value::Varint(_)) => CqlType::Varint,
Some(Value::Decimal { .. }) => CqlType::Decimal,
Some(Value::Duration { .. }) => CqlType::Duration,
Some(Value::Counter(_)) => CqlType::Counter,
Some(Value::List(elements)) => CqlType::List(Box::new(
elements
.first()
.map(|elem| infer_cql_type_from_value(Some(elem)))
.unwrap_or(CqlType::Text),
)),
Some(Value::Set(elements)) => CqlType::Set(Box::new(
elements
.first()
.map(|elem| infer_cql_type_from_value(Some(elem)))
.unwrap_or(CqlType::Text),
)),
Some(Value::Map(entries)) => {
let (key_type, value_type) = entries
.first()
.map(|(key, value)| {
(
infer_cql_type_from_value(Some(key)),
infer_cql_type_from_value(Some(value)),
)
})
.unwrap_or((CqlType::Text, CqlType::Text));
CqlType::Map(Box::new(key_type), Box::new(value_type))
}
Some(Value::Tuple(fields)) => CqlType::Tuple(
fields
.iter()
.map(|field| infer_cql_type_from_value(Some(field)))
.collect(),
),
Some(Value::Udt(udt)) => CqlType::Udt(
udt.type_name.clone(),
udt.fields
.iter()
.map(|field| {
(
field.name.clone(),
infer_cql_type_from_value(field.value.as_ref()),
)
})
.collect(),
),
Some(Value::Frozen(inner)) => {
CqlType::Frozen(Box::new(infer_cql_type_from_value(Some(inner))))
}
Some(Value::Tombstone(_)) => CqlType::Text, Some(Value::Json(_)) => CqlType::Text, }
}
fn cell_value_uses_length_prefix(value: &Value) -> bool {
!matches!(
value,
Value::Boolean(_)
| Value::Integer(_)
| Value::BigInt(_)
| Value::Float32(_)
| Value::Float(_)
| Value::Timestamp(_)
| Value::Uuid(_)
)
}
fn is_static_row_mutation(mutation: &Mutation, schema: &TableSchema) -> bool {
if mutation.clustering_key.is_some() || !schema.columns.iter().any(|column| column.is_static) {
return false;
}
mutation.operations.iter().all(|operation| match operation {
crate::storage::write_engine::mutation::CellOperation::Write { column, .. }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl { column, .. }
| crate::storage::write_engine::mutation::CellOperation::Delete { column } => schema
.columns
.iter()
.find(|candidate| candidate.name == *column)
.map(|candidate| candidate.is_static)
.unwrap_or(false),
crate::storage::write_engine::mutation::CellOperation::DeleteRow => true,
})
}
fn is_static_operation(
op: &crate::storage::write_engine::mutation::CellOperation,
schema: &TableSchema,
) -> bool {
match op {
crate::storage::write_engine::mutation::CellOperation::Write { column, .. }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl { column, .. }
| crate::storage::write_engine::mutation::CellOperation::Delete { column } => schema
.columns
.iter()
.find(|c| c.name == *column)
.map(|c| c.is_static)
.unwrap_or(false),
crate::storage::write_engine::mutation::CellOperation::DeleteRow => false,
}
}
fn has_static_operation(mutation: &Mutation, schema: &TableSchema) -> bool {
mutation
.operations
.iter()
.any(|op| is_static_operation(op, schema))
}
fn collect_static_operations(
mutations: &[Mutation],
schema: &TableSchema,
shadow_floor: Option<i64>,
) -> Vec<crate::storage::write_engine::mutation::CellOperation> {
use std::collections::HashMap;
let mut best: HashMap<String, (i64, crate::storage::write_engine::mutation::CellOperation)> =
HashMap::new();
for mutation in mutations {
if shadow_floor.is_some_and(|floor| mutation.timestamp_micros <= floor) {
continue;
}
for op in &mutation.operations {
if !is_static_operation(op, schema) {
continue;
}
let col_name = match op {
crate::storage::write_engine::mutation::CellOperation::Write { column, .. }
| crate::storage::write_engine::mutation::CellOperation::WriteWithTtl {
column,
..
}
| crate::storage::write_engine::mutation::CellOperation::Delete { column } => {
column.clone()
}
crate::storage::write_engine::mutation::CellOperation::DeleteRow => continue,
};
let entry = best.entry(col_name).or_insert((i64::MIN, op.clone()));
if mutation.timestamp_micros >= entry.0 {
*entry = (mutation.timestamp_micros, op.clone());
}
}
}
best.into_values().map(|(_, op)| op).collect()
}
fn range_tombstone_covers(
rt: &RangeTombstone,
clustering_key: Option<&ClusteringKey>,
schema: &TableSchema,
) -> bool {
use std::cmp::Ordering;
let Some(ck) = clustering_key else {
return false;
};
let cmp = |bound: &ClusteringKey| ck.compare(bound, schema).unwrap_or_else(|_| ck.cmp(bound));
let after_start = match &rt.start {
ClusteringBound::Inclusive(b) => cmp(b) != Ordering::Less,
ClusteringBound::Exclusive(b) => cmp(b) == Ordering::Greater,
ClusteringBound::Bottom => true,
ClusteringBound::Top => false,
};
let before_end = match &rt.end {
ClusteringBound::Inclusive(b) => cmp(b) != Ordering::Greater,
ClusteringBound::Exclusive(b) => cmp(b) == Ordering::Less,
ClusteringBound::Top => true,
ClusteringBound::Bottom => false,
};
after_start && before_end
}
fn serialize_value_for_clustering(value: &Value, comparator: &ComparatorType) -> Result<Vec<u8>> {
match (value, comparator) {
(Value::Boolean(b), ComparatorType::Boolean) => Ok(vec![if *b { 1 } else { 0 }]),
(Value::TinyInt(n), ComparatorType::TinyInt) => Ok(n.to_be_bytes().to_vec()),
(Value::SmallInt(n), ComparatorType::SmallInt) => Ok(n.to_be_bytes().to_vec()),
(Value::Integer(n), ComparatorType::Int) => Ok(n.to_be_bytes().to_vec()),
(Value::BigInt(n), ComparatorType::BigInt) => Ok(n.to_be_bytes().to_vec()),
(Value::Float32(f), ComparatorType::Float32) => Ok(f.to_bits().to_be_bytes().to_vec()),
(Value::Float(f), ComparatorType::Float) => Ok(f.to_bits().to_be_bytes().to_vec()),
(Value::Timestamp(millis), ComparatorType::Timestamp) => Ok(millis.to_be_bytes().to_vec()),
(Value::Date(days), ComparatorType::Date) => {
let stored = days.wrapping_sub(i32::MIN) as u32;
let mut result = Vec::new();
encode_unsigned(4, &mut result);
result.extend_from_slice(&stored.to_be_bytes());
Ok(result)
}
(Value::Uuid(bytes), ComparatorType::Uuid) => Ok(bytes.to_vec()),
(Value::Text(s), ComparatorType::Text) => {
let bytes = s.as_bytes();
let mut result = Vec::new();
encode_unsigned(bytes.len() as u64, &mut result);
result.extend_from_slice(bytes);
Ok(result)
}
(Value::Blob(bytes), ComparatorType::Blob) => {
let mut result = Vec::new();
encode_unsigned(bytes.len() as u64, &mut result);
result.extend_from_slice(bytes);
Ok(result)
}
(Value::Frozen(inner), _) => {
let bytes = serialize_value(inner)?;
let mut result = Vec::new();
encode_unsigned(bytes.len() as u64, &mut result);
result.extend_from_slice(&bytes);
Ok(result)
}
_ => Err(Error::InvalidInput(format!(
"Type mismatch or unsupported clustering type: value={:?}, comparator={:?}",
value, comparator
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{
ClusteringColumn, ClusteringOrder, Column, CqlType, KeyColumn, TableSchema,
};
use crate::storage::serialization::types::TypeSerializer;
use crate::storage::write_engine::mutation::{
CellOperation, ClusteringKey, PartitionKey, TableId,
};
use crate::types::UdtValue;
use std::collections::HashMap;
fn create_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "age".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
fn create_test_stats() -> StatisticsMetadata {
let mut stats = StatisticsMetadata::new();
stats.min_timestamp = 1000000;
stats.min_ttl = 0;
stats.min_local_deletion_time = 0;
stats
}
fn phase3_address_schema() -> UdtTypeDef {
UdtTypeDef::new("test_ks".to_string(), "address".to_string())
.with_field("street".to_string(), CqlType::Text, true)
.with_field("city".to_string(), CqlType::Text, true)
}
fn phase3_person_schema() -> UdtTypeDef {
UdtTypeDef::new("test_ks".to_string(), "person".to_string())
.with_field("name".to_string(), CqlType::Text, true)
.with_field(
"phone_numbers".to_string(),
CqlType::List(Box::new(CqlType::Frozen(Box::new(CqlType::Udt(
"phone_number".to_string(),
vec![],
))))),
true,
)
.with_field(
"home_address".to_string(),
CqlType::Frozen(Box::new(CqlType::Udt("address".to_string(), vec![]))),
true,
)
}
fn phase3_company_schema() -> UdtTypeDef {
UdtTypeDef::new("test_ks".to_string(), "company".to_string())
.with_field("name".to_string(), CqlType::Text, true)
.with_field(
"employees".to_string(),
CqlType::List(Box::new(CqlType::Frozen(Box::new(CqlType::Udt(
"person".to_string(),
vec![],
))))),
true,
)
.with_field(
"departments".to_string(),
CqlType::Map(
Box::new(CqlType::Text),
Box::new(CqlType::Frozen(Box::new(CqlType::List(Box::new(
CqlType::Frozen(Box::new(CqlType::Udt("person".to_string(), vec![]))),
))))),
),
true,
)
}
fn phase3_address_value() -> UdtValue {
UdtValue::new("address".to_string(), "test_ks".to_string())
.with_field(
"street".to_string(),
Some(Value::Text("Main St".to_string())),
)
.with_field("city".to_string(), Some(Value::Text("Seattle".to_string())))
}
fn phase3_phone_value() -> UdtValue {
UdtValue::new("phone_number".to_string(), "test_ks".to_string())
.with_field("label".to_string(), Some(Value::Text("mobile".to_string())))
.with_field(
"number".to_string(),
Some(Value::Text("+1-555-0101".to_string())),
)
}
fn phase3_person_value(name: &str) -> UdtValue {
UdtValue::new("person".to_string(), "test_ks".to_string())
.with_field("name".to_string(), Some(Value::Text(name.to_string())))
.with_field(
"phone_numbers".to_string(),
Some(Value::List(vec![Value::Frozen(Box::new(Value::Udt(
phase3_phone_value(),
)))])),
)
.with_field(
"home_address".to_string(),
Some(Value::Frozen(Box::new(Value::Udt(phase3_address_value())))),
)
}
fn phase3_company_value() -> UdtValue {
let person = phase3_person_value("Alice");
UdtValue::new("company".to_string(), "test_ks".to_string())
.with_field("name".to_string(), Some(Value::Text("Acme".to_string())))
.with_field(
"employees".to_string(),
Some(Value::List(vec![Value::Frozen(Box::new(Value::Udt(
person.clone(),
)))])),
)
.with_field(
"departments".to_string(),
Some(Value::Map(vec![(
Value::Text("platform".to_string()),
Value::Frozen(Box::new(Value::List(vec![Value::Frozen(Box::new(
Value::Udt(person),
))]))),
)])),
)
}
fn create_static_test_schema() -> TableSchema {
TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![ClusteringColumn {
name: "ck".to_string(),
data_type: "int".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}],
columns: vec![
Column {
name: "static_val".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: true,
},
Column {
name: "regular_val".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
}
}
#[test]
fn test_data_writer_new() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
assert_eq!(writer.position(), 0);
}
#[test]
fn test_write_partition_header() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let key = DecoratedKey::new(12345, vec![0x00, 0x00, 0x00, 0x2A]); writer.write_partition_header(&key, None).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[0..2], &[0x00, 0x04]); assert_eq!(&bytes[2..6], &[0x00, 0x00, 0x00, 0x2A]); assert_eq!(&bytes[6..10], &i32::MAX.to_be_bytes()); assert_eq!(&bytes[10..18], &i64::MIN.to_be_bytes()); }
#[test]
fn test_write_simple_row() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
},
CellOperation::Write {
column: "age".to_string(),
value: Value::Integer(30),
},
],
1001000, None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_TIMESTAMP,
ROW_HAS_TIMESTAMP,
"Should have timestamp"
);
assert_eq!(
flags & ROW_HAS_ALL_COLUMNS,
ROW_HAS_ALL_COLUMNS,
"Should have all columns"
);
}
#[test]
fn test_write_row_with_clustering() {
let mut schema = create_test_schema();
schema.clustering_keys = vec![ClusteringColumn {
name: "ts".to_string(),
data_type: "timestamp".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}];
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let ck = ClusteringKey::single("ts", Value::Timestamp(1234567890));
let mutation = Mutation::new(
table_id,
pk,
Some(ck),
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Bob".to_string()),
}],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(flags & ROW_HAS_TIMESTAMP, ROW_HAS_TIMESTAMP);
}
#[test]
fn test_write_partition_complete() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let key = DecoratedKey::new(12345, vec![0x00, 0x00, 0x00, 0x01]);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutations = vec![
Mutation::new(
table_id.clone(),
pk.clone(),
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}],
1001000,
None,
),
Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Bob".to_string()),
}],
1002000,
None,
),
];
let offset = writer
.write_partition(&key, &mutations, &schema, None, &[])
.unwrap();
assert_eq!(offset, 0);
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
assert_eq!(bytes[bytes.len() - 1], END_OF_PARTITION);
}
#[test]
fn test_delta_encoding_unsigned_vint_fix_644() {
let mut stats = create_test_stats();
stats.min_timestamp = 1_000_000;
stats.min_ttl = 3_600;
stats.min_local_deletion_time = 0;
let writer = DataWriter::new(stats.clone());
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Test".to_string()),
}],
1_005_000, Some(7200), );
let row_body = writer
.build_row_body(&mutation, &schema, ROW_HAS_TIMESTAMP | ROW_HAS_TTL)
.unwrap();
assert!(!row_body.is_empty(), "row body must be non-empty");
assert_eq!(
&row_body[0..2],
&[0x93u8, 0x88u8],
"Fix #644: timestamp delta=5000 must encode as unsigned VInt [0x93, 0x88], \
not ZigZag [0xA7, 0x10]. Reader uses parse_vuint (unsigned), so ZigZag would \
double the delta on readback (5000 → decoded as 10000)."
);
assert_eq!(
&row_body[2..4],
&[0x8Eu8, 0x10u8],
"Fix #644: TTL delta=3600 must encode as unsigned VInt [0x8E, 0x10], \
not ZigZag [0x9C, 0x20]. This is the first of two HAS_TTL fields."
);
}
#[test]
fn test_delta_encoding() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_ttl = 3600;
let writer = DataWriter::new(stats.clone());
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Test".to_string()),
}],
1005000, Some(7200), );
let row_body = writer
.build_row_body(&mutation, &schema, ROW_HAS_TIMESTAMP | ROW_HAS_TTL)
.unwrap();
assert!(!row_body.is_empty());
}
#[test]
fn test_serialize_value_types() {
let bytes = serialize_value(&Value::Boolean(true)).unwrap();
assert_eq!(bytes, vec![1]);
let bytes = serialize_value(&Value::Integer(42)).unwrap();
assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
let bytes = serialize_value(&Value::Text("hello".to_string())).unwrap();
assert_eq!(bytes, b"hello");
let bytes = serialize_value(&Value::BigInt(9223372036854775807)).unwrap();
assert_eq!(bytes, vec![0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]);
let bytes = serialize_value(&Value::Null).unwrap();
assert_eq!(bytes, Vec::<u8>::new());
}
#[test]
fn test_column_bitmap() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}],
1001000,
None,
);
let mut buf = Vec::new();
writer
.write_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(buf, vec![0x01]);
}
#[test]
fn test_partition_key_size_limit() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let key_256 = vec![0xFF; 256];
let key = DecoratedKey::new(12345, key_256);
let result = writer.write_partition_header(&key, None);
assert!(result.is_ok());
let mut writer2 = DataWriter::new(create_test_stats());
let large_key = vec![0xFF; 65536];
let key = DecoratedKey::new(12345, large_key);
let result = writer2.write_partition_header(&key, None);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("too large"));
}
#[test]
fn test_write_tombstone_cell() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000; let writer = DataWriter::new(stats);
let mut buf = Vec::new();
let timestamp = 1001000; let local_deletion_time = 1700000010; writer
.write_tombstone_cell(&mut buf, "deleted_col", timestamp, local_deletion_time)
.unwrap();
assert!(!buf.is_empty());
let flags = buf[0];
assert_eq!(
flags & CELL_IS_DELETED,
CELL_IS_DELETED,
"Should have IS_DELETED flag"
);
assert_eq!(
flags & CELL_USE_ROW_TIMESTAMP,
0,
"Should NOT have USE_ROW_TIMESTAMP flag"
);
assert!(
buf.len() > 1,
"Should have timestamp and deletion_time deltas"
);
}
#[test]
fn test_serialize_clustering_value_fixed_width() {
let bytes =
serialize_value_for_clustering(&Value::Integer(42), &ComparatorType::Int).unwrap();
assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x2A]);
let bytes =
serialize_value_for_clustering(&Value::BigInt(1000), &ComparatorType::BigInt).unwrap();
assert_eq!(bytes, vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8]);
}
#[test]
fn test_serialize_clustering_value_variable_width() {
let bytes =
serialize_value_for_clustering(&Value::Text("test".to_string()), &ComparatorType::Text)
.unwrap();
assert!(!bytes.is_empty());
assert_eq!(bytes[0], 0x04); assert_eq!(&bytes[1..], b"test");
}
#[test]
fn test_serialize_clustering_date_includes_length_prefix() {
let bytes = serialize_value_for_clustering(&Value::Date(0), &ComparatorType::Date).unwrap();
assert_eq!(
bytes[0], 0x04,
"date clustering values should be length-prefixed"
);
assert_eq!(
bytes.len(),
5,
"date clustering value should be 1-byte length + 4-byte payload"
);
}
#[test]
fn test_serialize_clustering_frozen_list_text() {
let value = Value::Frozen(Box::new(Value::List(vec![Value::Text("solo".to_string())])));
let comparator = ComparatorType::Frozen(Box::new(ComparatorType::List(Box::new(
ComparatorType::Text,
))));
let bytes = serialize_value_for_clustering(&value, &comparator).unwrap();
let expected_inner =
serialize_value(&Value::List(vec![Value::Text("solo".to_string())])).unwrap();
let mut expected = vec![expected_inner.len() as u8];
expected.extend_from_slice(&expected_inner);
assert_eq!(bytes, expected);
}
#[test]
fn test_null_vs_empty_string() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let result = writer.write_cell(&mut Vec::new(), "test_col", &Value::Null, 1001000);
assert!(result.is_err(), "NULL values should return error");
assert!(result
.unwrap_err()
.to_string()
.contains("NULL values should not be written"));
let mut buf = Vec::new();
writer
.write_cell(&mut buf, "test_col", &Value::Text(String::new()), 1001000)
.unwrap();
assert!(!buf.is_empty());
let flags = buf[0];
assert_eq!(
flags & CELL_HAS_EMPTY_VALUE,
CELL_HAS_EMPTY_VALUE,
"Empty string should have HAS_EMPTY_VALUE flag"
);
let mut buf2 = Vec::new();
writer
.write_cell(
&mut buf2,
"test_col",
&Value::Text("test".to_string()),
1001000,
)
.unwrap();
let flags2 = buf2[0];
assert_eq!(
flags2 & CELL_HAS_EMPTY_VALUE,
0,
"Non-empty string should NOT have HAS_EMPTY_VALUE flag"
);
assert_eq!(buf, vec![CELL_USE_ROW_TIMESTAMP | CELL_HAS_EMPTY_VALUE]);
}
#[test]
fn test_fixed_width_cell_omits_length_prefix() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
writer
.write_cell(&mut buf, "value", &Value::Integer(42), 1001000)
.unwrap();
assert_eq!(buf, vec![CELL_USE_ROW_TIMESTAMP, 0x00, 0x00, 0x00, 0x2A]);
}
#[test]
fn test_variable_width_cell_keeps_length_prefix() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
writer
.write_cell(&mut buf, "value", &Value::Text("abc".to_string()), 1001000)
.unwrap();
assert_eq!(buf, vec![CELL_USE_ROW_TIMESTAMP, 0x03, b'a', b'b', b'c']);
}
#[test]
fn test_value_length_bounds_check() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
let large_text = "x".repeat(1000);
let result = writer.write_cell(&mut buf, "test_col", &Value::Text(large_text), 1001000);
assert!(result.is_ok(), "Reasonable-sized values should succeed");
}
#[test]
fn test_tombstone_requires_deletion_time() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000;
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
let result = writer.write_tombstone_cell(
&mut buf,
"deleted_col",
1001000,
1700000010, );
assert!(result.is_ok(), "Valid deletion_time should succeed");
let mut buf2 = Vec::new();
let result2 = writer.write_tombstone_cell(
&mut buf2,
"deleted_col",
1001000,
1600000000, );
assert!(result2.is_err(), "deletion_time < min should fail");
assert!(result2
.unwrap_err()
.to_string()
.contains("less than min_local_deletion_time"));
}
#[test]
fn test_column_bitmap_skips_nulls() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
},
CellOperation::Write {
column: "age".to_string(),
value: Value::Null,
},
],
1001000,
None,
);
let mut buf = Vec::new();
writer
.write_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(
buf,
vec![0x01],
"Bitmap should encode age as missing (bit 0)"
);
}
#[test]
fn test_row_with_null_values() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
},
CellOperation::Write {
column: "age".to_string(),
value: Value::Null, },
],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_ALL_COLUMNS,
0,
"Row with NULL should NOT have HAS_ALL_COLUMNS flag"
);
}
#[test]
fn test_multiple_partitions() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let key1 = DecoratedKey::new(100, vec![0x00, 0x00, 0x00, 0x01]);
let table_id = TableId::new("test_ks", "test_table");
let pk1 = PartitionKey::single("id", Value::Integer(1));
let mutations1 = vec![Mutation::new(
table_id.clone(),
pk1,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}],
1001000,
None,
)];
let offset1 = writer
.write_partition(&key1, &mutations1, &schema, None, &[])
.unwrap();
assert_eq!(offset1, 0);
let key2 = DecoratedKey::new(200, vec![0x00, 0x00, 0x00, 0x02]);
let pk2 = PartitionKey::single("id", Value::Integer(2));
let mutations2 = vec![Mutation::new(
table_id,
pk2,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Bob".to_string()),
}],
1002000,
None,
)];
let offset2 = writer
.write_partition(&key2, &mutations2, &schema, None, &[])
.unwrap();
assert!(offset2 > offset1);
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
assert!(
offset2 > offset1,
"Second partition should start after first"
);
assert_eq!(
bytes[bytes.len() - 1],
END_OF_PARTITION,
"File should end with END_OF_PARTITION"
);
}
#[test]
fn test_row_tombstone() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::DeleteRow],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_DELETION,
ROW_HAS_DELETION,
"Should have HAS_DELETION flag"
);
assert_eq!(
flags & ROW_HAS_TIMESTAMP,
0,
"Pure row tombstone must not have HAS_TIMESTAMP"
);
assert_eq!(
flags & ROW_HAS_ALL_COLUMNS,
0,
"Row tombstone must not claim all columns"
);
let row_size = bytes[1] as usize;
assert!(
row_size >= 4,
"Row tombstone body must include the columns subset (got row_size={})",
row_size
);
let body_end = 2 + row_size; assert_eq!(
bytes[body_end - 1],
0b11,
"Columns subset must mark every regular column missing"
);
}
#[test]
fn test_partition_tombstone() {
use crate::storage::write_engine::mutation::PartitionTombstone;
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let key = DecoratedKey::new(12345, vec![0x00, 0x00, 0x00, 0x2A]);
let tombstone = PartitionTombstone {
deletion_time: 1001000, local_deletion_time: 1700000010, };
writer
.write_partition_header(&key, Some(&tombstone))
.unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(&bytes[0..2], &[0x00, 0x04], "Key length (u16 BE)");
let ldt_bytes = &bytes[6..10];
let ldt = i32::from_be_bytes([ldt_bytes[0], ldt_bytes[1], ldt_bytes[2], ldt_bytes[3]]);
assert_eq!(ldt, 1700000010, "Local deletion time should match");
let ts_bytes = &bytes[10..18];
let ts = i64::from_be_bytes([
ts_bytes[0],
ts_bytes[1],
ts_bytes[2],
ts_bytes[3],
ts_bytes[4],
ts_bytes[5],
ts_bytes[6],
ts_bytes[7],
]);
assert_eq!(ts, 1001000, "Deletion timestamp should match");
}
#[test]
fn test_range_tombstone_inclusive_bounds() {
use crate::storage::write_engine::mutation::{ClusteringBound, RangeTombstone};
let mut schema = create_test_schema();
schema.clustering_keys = vec![ClusteringColumn {
name: "ts".to_string(),
data_type: "timestamp".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}];
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let range = RangeTombstone {
start: ClusteringBound::Inclusive(ClusteringKey::single("ts", Value::Timestamp(1000))),
end: ClusteringBound::Inclusive(ClusteringKey::single("ts", Value::Timestamp(2000))),
deletion_time: 1001000,
local_deletion_time: 1700000010,
};
let open_size = writer
.write_range_bound(
&range.start,
true,
range.deletion_time,
range.local_deletion_time,
&schema,
0,
)
.unwrap();
writer
.write_range_bound(
&range.end,
false,
range.deletion_time,
range.local_deletion_time,
&schema,
open_size as u64,
)
.unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
assert_eq!(bytes[0], IS_MARKER, "Should have IS_MARKER flag");
assert_eq!(
bytes[1], INCL_START_BOUND,
"Should have INCL_START_BOUND kind (ordinal 1)"
);
assert_eq!(
u16::from_be_bytes([bytes[2], bytes[3]]),
1,
"Bound carries one clustering value"
);
assert_eq!(bytes[open_size], IS_MARKER);
assert_eq!(
bytes[open_size + 1],
INCL_END_BOUND,
"Should have INCL_END_BOUND kind (ordinal 6)"
);
}
#[test]
fn test_range_tombstone_exclusive_bounds() {
use crate::storage::write_engine::mutation::{ClusteringBound, RangeTombstone};
let mut schema = create_test_schema();
schema.clustering_keys = vec![ClusteringColumn {
name: "ts".to_string(),
data_type: "timestamp".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}];
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let range = RangeTombstone {
start: ClusteringBound::Exclusive(ClusteringKey::single("ts", Value::Timestamp(1000))),
end: ClusteringBound::Exclusive(ClusteringKey::single("ts", Value::Timestamp(2000))),
deletion_time: 1001000,
local_deletion_time: 1700000010,
};
let open_size = writer
.write_range_bound(
&range.start,
true,
range.deletion_time,
range.local_deletion_time,
&schema,
0,
)
.unwrap();
writer
.write_range_bound(
&range.end,
false,
range.deletion_time,
range.local_deletion_time,
&schema,
open_size as u64,
)
.unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
assert_eq!(bytes[0], IS_MARKER, "Should have IS_MARKER flag");
assert_eq!(
bytes[1], EXCL_START_BOUND,
"Should have EXCL_START_BOUND kind (ordinal 7)"
);
assert_eq!(
bytes[open_size + 1],
EXCL_END_BOUND,
"Should have EXCL_END_BOUND kind (ordinal 0)"
);
}
#[test]
fn test_range_tombstone_bottom_top_bounds() {
use crate::storage::write_engine::mutation::{ClusteringBound, RangeTombstone};
let mut schema = create_test_schema();
schema.clustering_keys = vec![ClusteringColumn {
name: "ts".to_string(),
data_type: "timestamp".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}];
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let range = RangeTombstone {
start: ClusteringBound::Bottom,
end: ClusteringBound::Top,
deletion_time: 1001000,
local_deletion_time: 1700000010,
};
let open_size = writer
.write_range_bound(
&range.start,
true,
range.deletion_time,
range.local_deletion_time,
&schema,
0,
)
.unwrap();
writer
.write_range_bound(
&range.end,
false,
range.deletion_time,
range.local_deletion_time,
&schema,
open_size as u64,
)
.unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
assert_eq!(bytes[0], IS_MARKER, "Should have IS_MARKER flag");
assert_eq!(
bytes[1], INCL_START_BOUND,
"Bottom should serialize as INCL_START_BOUND"
);
assert_eq!(
u16::from_be_bytes([bytes[2], bytes[3]]),
0,
"Bottom carries no clustering values"
);
assert_eq!(bytes[open_size + 1], INCL_END_BOUND);
}
#[test]
fn test_complete_partition_with_range_tombstone() {
use crate::storage::write_engine::mutation::{ClusteringBound, RangeTombstone};
let mut schema = create_test_schema();
schema.clustering_keys = vec![ClusteringColumn {
name: "ts".to_string(),
data_type: "timestamp".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}];
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let key = DecoratedKey::new(12345, vec![0x00, 0x00, 0x00, 0x01]);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutations = vec![Mutation::new(
table_id,
pk,
Some(ClusteringKey::single("ts", Value::Timestamp(1000))),
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
}],
1001000,
None,
)];
let range_tombstones = vec![RangeTombstone {
start: ClusteringBound::Inclusive(ClusteringKey::single("ts", Value::Timestamp(500))),
end: ClusteringBound::Inclusive(ClusteringKey::single("ts", Value::Timestamp(1500))),
deletion_time: 1002000, local_deletion_time: 1700000020,
}];
let offset = writer
.write_partition(&key, &mutations, &schema, None, &range_tombstones)
.unwrap();
assert_eq!(offset, 0);
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
assert_eq!(&bytes[0..2], &[0x00, 0x04], "Key length (u16 BE)");
}
#[test]
fn test_write_cell_with_ttl() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000;
stats.min_ttl = 3600;
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
let timestamp = 1001000;
let ttl_seconds = 7200;
writer
.write_cell_with_ttl(
&mut buf,
"test_col",
&Value::Text("test".to_string()),
timestamp,
ttl_seconds,
)
.unwrap();
assert!(!buf.is_empty());
let flags = buf[0];
assert_eq!(
flags & CELL_IS_EXPIRING,
CELL_IS_EXPIRING,
"Should have IS_EXPIRING flag"
);
assert_eq!(
flags & CELL_USE_ROW_TIMESTAMP,
0,
"Should NOT have USE_ROW_TIMESTAMP flag"
);
assert_eq!(
flags & CELL_USE_ROW_TTL,
0,
"Should NOT have USE_ROW_TTL flag"
);
assert!(buf.len() > 10, "Should have all TTL cell fields");
}
#[test]
fn test_row_with_ttl_cells() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000;
stats.min_ttl = 3600;
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::WriteWithTtl {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
ttl_seconds: 7200,
},
CellOperation::Write {
column: "age".to_string(),
value: Value::Integer(30),
},
],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_TIMESTAMP,
ROW_HAS_TIMESTAMP,
"Should have timestamp"
);
assert_eq!(
flags & ROW_HAS_ALL_COLUMNS,
ROW_HAS_ALL_COLUMNS,
"Should have all columns"
);
}
#[test]
fn test_row_with_multiple_ttl_cells() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000;
stats.min_ttl = 1800;
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::WriteWithTtl {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
ttl_seconds: 3600, },
CellOperation::WriteWithTtl {
column: "age".to_string(),
value: Value::Integer(30),
ttl_seconds: 7200, },
],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
}
#[test]
fn test_mixed_ttl_and_regular_cells() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000;
stats.min_ttl = 3600;
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
},
CellOperation::WriteWithTtl {
column: "age".to_string(),
value: Value::Integer(30),
ttl_seconds: 7200,
},
],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(flags & ROW_HAS_TIMESTAMP, ROW_HAS_TIMESTAMP);
}
#[test]
fn test_ttl_zero_special_case() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000;
stats.min_ttl = 0;
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
let timestamp = 1001000;
let ttl_seconds = 0;
writer
.write_cell_with_ttl(
&mut buf,
"test_col",
&Value::Text("test".to_string()),
timestamp,
ttl_seconds,
)
.unwrap();
assert!(!buf.is_empty());
let flags = buf[0];
assert_eq!(flags & CELL_IS_EXPIRING, CELL_IS_EXPIRING);
}
#[test]
fn test_ttl_statistics_tracking() {
let mut stats = StatisticsMetadata::new();
stats.update_ttl(3600);
stats.update_ttl(7200);
stats.update_ttl(1800);
stats.update_ttl(0);
assert_eq!(stats.min_ttl, 1800, "min_ttl should be 1800");
assert_eq!(stats.max_ttl, 7200, "max_ttl should be 7200");
}
#[test]
fn test_ttl_cell_with_null_value() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
let result = writer.write_cell_with_ttl(&mut buf, "test_col", &Value::Null, 1001000, 3600);
assert!(result.is_err(), "NULL values should return error");
assert!(result
.unwrap_err()
.to_string()
.contains("NULL values should not be written"));
}
#[test]
fn test_ttl_cell_local_deletion_time_calculation() {
let mut stats = create_test_stats();
stats.min_timestamp = 1000000;
stats.min_local_deletion_time = 1700000000;
stats.min_ttl = 3600;
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
let timestamp = 1001000;
let ttl_seconds = 7200;
writer
.write_cell_with_ttl(
&mut buf,
"test_col",
&Value::Text("test".to_string()),
timestamp,
ttl_seconds,
)
.unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_row_ttl_uses_row_ttl_cell_flags() {
let mut stats = create_test_stats();
stats.min_timestamp = 1001000;
stats.min_ttl = 7200;
stats.min_local_deletion_time = 1;
let mut writer = DataWriter::new(stats);
let schema = create_test_schema();
let mutation = Mutation::new(
TableId::new("test_ks", "test_table"),
PartitionKey::single("id", Value::Integer(1)),
None,
vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
},
CellOperation::Write {
column: "age".to_string(),
value: Value::Integer(30),
},
],
1001000,
Some(7200),
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert_eq!(bytes[0] & ROW_HAS_TTL, ROW_HAS_TTL);
let expiring_row_ttl_flags = CELL_IS_EXPIRING | CELL_USE_ROW_TIMESTAMP | CELL_USE_ROW_TTL;
let flag_count = bytes
.iter()
.filter(|&&byte| byte == expiring_row_ttl_flags)
.count();
assert_eq!(flag_count, 2, "expected both cells to inherit row TTL");
}
#[test]
fn test_write_partition_emits_static_row_before_regular_rows() {
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let schema = create_static_test_schema();
let key = DecoratedKey::new(1, vec![0, 0, 0, 1]);
let static_mutation = Mutation::new(
TableId::new("test_ks", "test_table"),
PartitionKey::single("id", Value::Integer(1)),
None,
vec![CellOperation::Write {
column: "static_val".to_string(),
value: Value::Text("static".to_string()),
}],
1001000,
None,
);
let regular_mutation = Mutation::new(
TableId::new("test_ks", "test_table"),
PartitionKey::single("id", Value::Integer(1)),
Some(ClusteringKey::single("ck", Value::Integer(1))),
vec![CellOperation::Write {
column: "regular_val".to_string(),
value: Value::Text("regular".to_string()),
}],
1002000,
None,
);
writer
.write_partition(
&key,
&[static_mutation, regular_mutation],
&schema,
None,
&[],
)
.unwrap();
let bytes = writer.finish().unwrap();
let partition_header_len = 2 + key.key.len() + 4 + 8;
assert_eq!(
bytes[partition_header_len] & ROW_HAS_EXTENDED_FLAGS,
ROW_HAS_EXTENDED_FLAGS
);
assert_eq!(bytes[partition_header_len + 1], EXTENDED_IS_STATIC);
}
#[test]
fn test_column_subset_exactly_64_regular_columns_uses_large_subset_encoding() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let columns: Vec<Column> = (0..64)
.map(|i| Column {
name: format!("col_{:03}", i),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
})
.collect();
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns,
comments: HashMap::new(),
};
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::Write {
column: "col_000".to_string(),
value: Value::Text("first".to_string()),
},
CellOperation::Write {
column: "col_063".to_string(),
value: Value::Text("last".to_string()),
},
],
1001000,
None,
);
let mut buf = Vec::new();
writer
.write_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(buf, vec![62, 0, 63]);
}
#[test]
fn test_column_subset_65_static_columns_uses_missing_indexes_when_present_majority() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let columns: Vec<Column> = (0..65)
.map(|i| Column {
name: format!("scol_{:03}", i),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: true,
})
.collect();
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![ClusteringColumn {
name: "ck".to_string(),
data_type: "int".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}],
columns,
comments: HashMap::new(),
};
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mut operations = Vec::new();
for i in 0..65 {
if i == 17 {
continue;
}
operations.push(CellOperation::Write {
column: format!("scol_{:03}", i),
value: Value::Text(format!("value-{}", i)),
});
}
let mutation = Mutation::new(table_id, pk, None, operations, 1001000, None);
let mut buf = Vec::new();
writer
.write_static_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(buf, vec![1, 17]);
}
#[test]
fn test_column_subset_under_64_regular_columns_uses_bitmap() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let columns: Vec<Column> = (0..4)
.map(|i| Column {
name: format!("col_{i}"),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
})
.collect();
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns,
comments: HashMap::new(),
};
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "col_1".to_string(),
value: Value::Text("present".to_string()),
}],
1001000,
None,
);
let mut buf = Vec::new();
writer
.write_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(buf, vec![0b1101]);
}
#[test]
fn test_regular_columns_sort_simple_before_complex() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "z_simple".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "a_complex".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "m_simple".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let ordered = writer.regular_columns(&schema);
let names: Vec<_> = ordered.iter().map(|column| column.name.as_str()).collect();
assert_eq!(names, vec!["m_simple", "z_simple", "a_complex"]);
}
#[test]
fn test_static_columns_sort_simple_before_complex() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![ClusteringColumn {
name: "ck".to_string(),
data_type: "int".to_string(),
position: 0,
order: ClusteringOrder::Asc,
}],
columns: vec![
Column {
name: "z_static_simple".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: true,
},
Column {
name: "a_static_complex".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: true,
},
Column {
name: "m_static_simple".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: true,
},
],
comments: HashMap::new(),
};
let ordered = writer.static_columns(&schema);
let names: Vec<_> = ordered.iter().map(|column| column.name.as_str()).collect();
assert_eq!(
names,
vec!["m_static_simple", "z_static_simple", "a_static_complex"]
);
}
#[test]
fn test_write_column_bitmap_zero_when_all_columns_present() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let columns: Vec<Column> = (0..65)
.map(|i| Column {
name: format!("col_{:03}", i),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
})
.collect();
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns,
comments: HashMap::new(),
};
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let operations: Vec<_> = (0..65)
.map(|i| CellOperation::Write {
column: format!("col_{:03}", i),
value: Value::Text(format!("value-{}", i)),
})
.collect();
let mutation = Mutation::new(table_id, pk, None, operations, 1001000, None);
let mut buf = Vec::new();
writer
.write_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(buf, vec![0]);
}
#[test]
fn test_serialize_list() {
let list = Value::List(vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
]);
let bytes = serialize_value(&list).unwrap();
assert_eq!(bytes.len(), 4 + 3 * 8);
assert_eq!(&bytes[0..4], &3i32.to_be_bytes());
assert_eq!(&bytes[4..8], &4i32.to_be_bytes());
assert_eq!(&bytes[8..12], &1i32.to_be_bytes());
}
#[test]
fn test_serialize_empty_list() {
let list = Value::List(vec![]);
let bytes = serialize_value(&list).unwrap();
assert_eq!(bytes.len(), 4);
assert_eq!(&bytes[0..4], &0i32.to_be_bytes());
}
#[test]
fn test_serialize_single_element_list() {
let list = Value::List(vec![Value::Integer(42)]);
let bytes = serialize_value(&list).unwrap();
assert_eq!(
bytes,
vec![
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x2A, ]
);
}
#[test]
fn test_serialize_set() {
let set = Value::Set(vec![
Value::Text("alpha".to_string()),
Value::Text("beta".to_string()),
]);
let bytes = serialize_value(&set).unwrap();
assert_eq!(&bytes[0..4], &2i32.to_be_bytes());
assert_eq!(&bytes[4..8], &5i32.to_be_bytes());
assert_eq!(&bytes[8..13], b"alpha");
}
#[test]
fn test_serialize_single_element_set() {
let set = Value::Set(vec![Value::Text("alpha".to_string())]);
let bytes = serialize_value(&set).unwrap();
assert_eq!(
bytes,
vec![
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x05, b'a', b'l', b'p', b'h', b'a', ]
);
}
#[test]
fn test_serialize_empty_set() {
let set = Value::Set(vec![]);
let bytes = serialize_value(&set).unwrap();
assert_eq!(bytes, 0i32.to_be_bytes().to_vec());
}
#[test]
fn test_serialize_map() {
let map = Value::Map(vec![(Value::Text("key1".to_string()), Value::Integer(100))]);
let bytes = serialize_value(&map).unwrap();
assert_eq!(&bytes[0..4], &1i32.to_be_bytes());
assert_eq!(&bytes[4..8], &4i32.to_be_bytes());
assert_eq!(&bytes[8..12], b"key1");
assert_eq!(&bytes[12..16], &4i32.to_be_bytes());
assert_eq!(&bytes[16..20], &100i32.to_be_bytes());
}
#[test]
fn test_serialize_empty_map() {
let map = Value::Map(vec![]);
let bytes = serialize_value(&map).unwrap();
assert_eq!(bytes.len(), 4);
assert_eq!(&bytes[0..4], &0i32.to_be_bytes());
}
#[test]
fn test_serialize_tuple() {
let tuple = Value::Tuple(vec![
Value::Integer(42),
Value::Text("hello".to_string()),
Value::Null,
]);
let bytes = serialize_value(&tuple).unwrap();
assert_eq!(&bytes[0..4], &4i32.to_be_bytes());
assert_eq!(&bytes[4..8], &42i32.to_be_bytes());
assert_eq!(&bytes[8..12], &5i32.to_be_bytes());
assert_eq!(&bytes[12..17], b"hello");
assert_eq!(&bytes[17..21], &(-1i32).to_be_bytes());
}
#[test]
fn test_serialize_single_element_tuple() {
let tuple = Value::Tuple(vec![Value::Text("solo".to_string())]);
let bytes = serialize_value(&tuple).unwrap();
assert_eq!(
bytes,
vec![
0x00, 0x00, 0x00, 0x04, b's', b'o', b'l', b'o', ]
);
}
#[test]
fn test_serialize_frozen() {
let frozen = Value::Frozen(Box::new(Value::List(vec![
Value::Integer(10),
Value::Integer(20),
])));
let frozen_bytes = serialize_value(&frozen).unwrap();
let list_bytes =
serialize_value(&Value::List(vec![Value::Integer(10), Value::Integer(20)])).unwrap();
assert_eq!(frozen_bytes, list_bytes);
}
#[test]
fn test_serialize_single_element_frozen() {
let frozen = Value::Frozen(Box::new(Value::List(vec![Value::Text("solo".to_string())])));
let frozen_bytes = serialize_value(&frozen).unwrap();
let list_bytes =
serialize_value(&Value::List(vec![Value::Text("solo".to_string())])).unwrap();
assert_eq!(frozen_bytes, list_bytes);
}
#[test]
fn test_serialize_nested_collection() {
let nested = Value::Map(vec![(
Value::Text("nums".to_string()),
Value::Frozen(Box::new(Value::List(vec![
Value::Integer(1),
Value::Integer(2),
]))),
)]);
let bytes = serialize_value(&nested).unwrap();
assert!(!bytes.is_empty());
assert_eq!(&bytes[0..4], &1i32.to_be_bytes());
}
#[test]
fn test_serialize_udt_with_nested_collections_matches_schema_aware_bytes() {
let serializer = TypeSerializer::new();
let company = phase3_company_value();
let bytes = serialize_value(&Value::Udt(company.clone())).unwrap();
let expected = serializer
.serialize_udt(&Value::Udt(company), &phase3_company_schema())
.unwrap();
assert_eq!(bytes, expected);
}
#[test]
fn test_serialize_collection_containing_nested_udts() {
let serializer = TypeSerializer::new();
let company = phase3_company_value();
let company_bytes = serializer
.serialize_udt(&Value::Udt(company.clone()), &phase3_company_schema())
.unwrap();
let value = Value::Map(vec![(
Value::Text("empresa_日本".to_string()),
Value::Frozen(Box::new(Value::Udt(company))),
)]);
let bytes = serialize_value(&value).unwrap();
let key = "empresa_日本".as_bytes();
let mut expected = Vec::new();
expected.extend_from_slice(&1i32.to_be_bytes());
expected.extend_from_slice(&(key.len() as i32).to_be_bytes());
expected.extend_from_slice(key);
expected.extend_from_slice(&(company_bytes.len() as i32).to_be_bytes());
expected.extend_from_slice(&company_bytes);
assert_eq!(bytes, expected);
}
#[test]
fn test_serialize_tuple_with_collection_fields_and_udt() {
let serializer = TypeSerializer::new();
let address = phase3_address_value();
let person = phase3_person_value("Tuple User");
let address_bytes = serializer
.serialize_udt(&Value::Udt(address.clone()), &phase3_address_schema())
.unwrap();
let person_bytes = serializer
.serialize_udt(&Value::Udt(person.clone()), &phase3_person_schema())
.unwrap();
let tuple = Value::Tuple(vec![
Value::Text("phase3".to_string()),
Value::Frozen(Box::new(Value::List(vec![
Value::Integer(3),
Value::Integer(5),
Value::Integer(8),
]))),
Value::Frozen(Box::new(Value::Map(vec![(
Value::Text("home".to_string()),
Value::Frozen(Box::new(Value::Udt(address))),
)]))),
Value::Frozen(Box::new(Value::Udt(person))),
]);
let bytes = serialize_value(&tuple).unwrap();
let list_bytes = serialize_value(&Value::List(vec![
Value::Integer(3),
Value::Integer(5),
Value::Integer(8),
]))
.unwrap();
let map_bytes = {
let key = b"home";
let mut encoded = Vec::new();
encoded.extend_from_slice(&1i32.to_be_bytes());
encoded.extend_from_slice(&(key.len() as i32).to_be_bytes());
encoded.extend_from_slice(key);
encoded.extend_from_slice(&(address_bytes.len() as i32).to_be_bytes());
encoded.extend_from_slice(&address_bytes);
encoded
};
let mut expected = Vec::new();
expected.extend_from_slice(&6i32.to_be_bytes());
expected.extend_from_slice(b"phase3");
expected.extend_from_slice(&(list_bytes.len() as i32).to_be_bytes());
expected.extend_from_slice(&list_bytes);
expected.extend_from_slice(&(map_bytes.len() as i32).to_be_bytes());
expected.extend_from_slice(&map_bytes);
expected.extend_from_slice(&(person_bytes.len() as i32).to_be_bytes());
expected.extend_from_slice(&person_bytes);
assert_eq!(bytes, expected);
}
#[test]
fn test_serialize_high_complexity_nested_collection() {
let nested = Value::Map(vec![(
Value::Text("outer".to_string()),
Value::Frozen(Box::new(Value::List(vec![Value::Frozen(Box::new(
Value::Map(vec![(
Value::Text("inner".to_string()),
Value::Frozen(Box::new(Value::List(vec![
Value::Integer(1),
Value::Integer(2),
]))),
)]),
))]))),
)]);
let bytes = serialize_value(&nested).unwrap();
assert!(!bytes.is_empty());
assert_eq!(&bytes[0..4], &1i32.to_be_bytes());
}
#[test]
fn test_is_complex_column() {
assert!(is_complex_column("set<int>"));
assert!(is_complex_column("list<text>"));
assert!(is_complex_column("map<text, int>"));
assert!(is_complex_column("SET<INT>"));
assert!(is_complex_column("List<Text>"));
assert!(is_complex_column("Map<Text, Int>"));
assert!(is_complex_column(
"org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.Int32Type)"
));
assert!(is_complex_column(
"org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.UTF8Type)"
));
assert!(is_complex_column(
"org.apache.cassandra.db.marshal.MapType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.Int32Type)"
));
assert!(!is_complex_column("frozen<set<int>>"));
assert!(!is_complex_column("frozen<list<text>>"));
assert!(!is_complex_column("frozen<map<text, int>>"));
assert!(!is_complex_column("FROZEN<SET<INT>>"));
assert!(!is_complex_column(
"org.apache.cassandra.db.marshal.FrozenType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.Int32Type))"
));
assert!(!is_complex_column("int"));
assert!(!is_complex_column("text"));
assert!(!is_complex_column("uuid"));
assert!(!is_complex_column("timestamp"));
}
#[test]
fn test_generate_list_cell_path_timeuuid() {
let ts = 1_704_067_200_000_000i64;
let uuid0 = generate_list_cell_path_timeuuid(ts, 0);
let uuid1 = generate_list_cell_path_timeuuid(ts, 1);
let uuid2 = generate_list_cell_path_timeuuid(ts, 2);
assert_eq!(uuid0.len(), 16);
assert_eq!(uuid1.len(), 16);
assert_eq!(uuid0[6] & 0xF0, 0x10, "Should be UUID version 1");
assert_eq!(uuid1[6] & 0xF0, 0x10, "Should be UUID version 1");
assert!(uuid0 < uuid1, "UUID0 should be less than UUID1");
assert!(uuid1 < uuid2, "UUID1 should be less than UUID2");
}
#[test]
fn test_write_set_complex_column() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Set(vec![
Value::Text("alpha".to_string()),
Value::Text("beta".to_string()),
]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, None)
.unwrap();
assert!(!buf.is_empty());
let expected_cell_flags = CELL_USE_ROW_TIMESTAMP | CELL_HAS_EMPTY_VALUE;
let cell_flag_count = buf.iter().filter(|&&b| b == expected_cell_flags).count();
assert_eq!(
cell_flag_count, 2,
"Should have 2 SET cells with USE_ROW_TIMESTAMP | HAS_EMPTY_VALUE flags"
);
}
#[test]
fn test_write_map_complex_column() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "props".to_string(),
data_type: "map<text, int>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Map(vec![
(Value::Text("key1".to_string()), Value::Integer(100)),
(Value::Text("key2".to_string()), Value::Integer(200)),
]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, None)
.unwrap();
assert!(!buf.is_empty());
let cell_flag_count = buf.iter().filter(|&&b| b == CELL_USE_ROW_TIMESTAMP).count();
assert_eq!(
cell_flag_count, 2,
"Should have 2 MAP cells with USE_ROW_TIMESTAMP flags"
);
}
#[test]
fn test_write_list_complex_column() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "items".to_string(),
data_type: "list<int>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::List(vec![Value::Integer(10), Value::Integer(20)]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, None)
.unwrap();
assert!(!buf.is_empty());
let cell_flag_count = buf.iter().filter(|&&b| b == CELL_USE_ROW_TIMESTAMP).count();
assert_eq!(
cell_flag_count, 2,
"Should have 2 LIST cells with USE_ROW_TIMESTAMP flags"
);
let timeuuid_len_count = buf.iter().filter(|&&b| b == 0x10).count();
assert!(
timeuuid_len_count >= 2,
"Should have TimeUUID path length (16) for each list cell"
);
}
#[test]
fn test_frozen_collection_not_complex() {
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "frozen_tags".to_string(),
data_type: "frozen<set<text>>".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: HashMap::new(),
};
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "frozen_tags".to_string(),
value: Value::Frozen(Box::new(Value::Set(vec![
Value::Text("a".to_string()),
Value::Text("b".to_string()),
]))),
}],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_COMPLEX_DELETION,
0,
"Frozen collection should NOT have HAS_COMPLEX_DELETION flag"
);
}
#[test]
fn test_mixed_simple_and_complex_columns() {
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
},
CellOperation::Write {
column: "tags".to_string(),
value: Value::Set(vec![
Value::Text("admin".to_string()),
Value::Text("user".to_string()),
]),
},
],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_COMPLEX_DELETION,
ROW_HAS_COMPLEX_DELETION,
"Row with non-frozen SET should have HAS_COMPLEX_DELETION flag"
);
assert_eq!(
flags & ROW_HAS_TIMESTAMP,
ROW_HAS_TIMESTAMP,
"Should have timestamp"
);
assert_eq!(
flags & ROW_HAS_ALL_COLUMNS,
ROW_HAS_ALL_COLUMNS,
"Should have all columns"
);
}
#[test]
fn test_set_canonical_ordering() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Set(vec![
Value::Text("zebra".to_string()),
Value::Text("alpha".to_string()),
Value::Text("mango".to_string()),
]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, None)
.unwrap();
let buf_str = String::from_utf8_lossy(&buf);
let alpha_pos = buf_str.find("alpha").expect("alpha should be in output");
let mango_pos = buf_str.find("mango").expect("mango should be in output");
let zebra_pos = buf_str.find("zebra").expect("zebra should be in output");
assert!(
alpha_pos < mango_pos && mango_pos < zebra_pos,
"SET elements should be in sorted order: alpha({}) < mango({}) < zebra({})",
alpha_pos,
mango_pos,
zebra_pos
);
}
#[test]
fn test_map_canonical_ordering() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "props".to_string(),
data_type: "map<text, int>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Map(vec![
(Value::Text("z_key".to_string()), Value::Integer(1)),
(Value::Text("a_key".to_string()), Value::Integer(2)),
]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, None)
.unwrap();
let buf_str = String::from_utf8_lossy(&buf);
let a_pos = buf_str.find("a_key").expect("a_key should be in output");
let z_pos = buf_str.find("z_key").expect("z_key should be in output");
assert!(
a_pos < z_pos,
"MAP entries should be sorted by key: a_key({}) < z_key({})",
a_pos,
z_pos
);
}
#[test]
fn test_set_rejects_list_value() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::List(vec![Value::Text("x".to_string())]);
let mut buf = Vec::new();
let result = writer.write_complex_column(&mut buf, &column, &value, 1001000, None);
assert!(result.is_err(), "SET column should reject Value::List");
}
#[test]
fn test_list_rejects_set_value() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "items".to_string(),
data_type: "list<text>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Set(vec![Value::Text("x".to_string())]);
let mut buf = Vec::new();
let result = writer.write_complex_column(&mut buf, &column, &value, 1001000, None);
assert!(result.is_err(), "LIST column should reject Value::Set");
}
#[test]
fn test_complex_column_deletion() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let mut buf = Vec::new();
writer
.write_complex_column_deletion(&mut buf, 1001000)
.unwrap();
assert!(!buf.is_empty());
assert_eq!(
buf[buf.len() - 1],
0x00,
"Last byte should be cell_count = 0"
);
}
#[test]
fn test_write_with_ttl_complex_column() {
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: HashMap::new(),
};
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::WriteWithTtl {
column: "tags".to_string(),
value: Value::Set(vec![
Value::Text("a".to_string()),
Value::Text("b".to_string()),
]),
ttl_seconds: 3600,
}],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_COMPLEX_DELETION,
ROW_HAS_COMPLEX_DELETION,
"WriteWithTtl on SET should set HAS_COMPLEX_DELETION"
);
}
#[test]
fn test_delete_complex_column() {
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: HashMap::new(),
};
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Delete {
column: "tags".to_string(),
}],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
assert!(!bytes.is_empty());
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_COMPLEX_DELETION,
ROW_HAS_COMPLEX_DELETION,
"Delete on SET should set HAS_COMPLEX_DELETION"
);
}
#[test]
fn test_internal_type_string_complex_column() {
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![Column {
name: "tags".to_string(),
data_type: "org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.UTF8Type)".to_string(),
nullable: true,
default: None,
is_static: false,
}],
comments: HashMap::new(),
};
let stats = create_test_stats();
let mut writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Write {
column: "tags".to_string(),
value: Value::Set(vec![Value::Text("test".to_string())]),
}],
1001000,
None,
);
writer.write_row(&mutation, &schema).unwrap();
let bytes = writer.finish().unwrap();
let flags = bytes[0];
assert_eq!(
flags & ROW_HAS_COMPLEX_DELETION,
ROW_HAS_COMPLEX_DELETION,
"Internal type string should be recognized as complex column"
);
}
fn parse_complex_cell_flags(buf: &[u8]) -> Vec<u8> {
fn read_uvint(buf: &[u8], pos: &mut usize) -> u64 {
let first = buf[*pos];
*pos += 1;
if first == 0xFF {
let mut v = 0u64;
for _ in 0..8 {
v = (v << 8) | buf[*pos] as u64;
*pos += 1;
}
return v;
}
let extra = first.leading_ones() as usize;
let mask = 0xFF_u8.wrapping_shr((extra + 1) as u32);
let mut v = (first & mask) as u64;
for _ in 0..extra {
v = (v << 8) | buf[*pos] as u64;
*pos += 1;
}
v
}
let mut pos = 0usize;
read_uvint(buf, &mut pos);
read_uvint(buf, &mut pos);
let cell_count = read_uvint(buf, &mut pos) as usize;
let mut flags_out = Vec::with_capacity(cell_count);
for _ in 0..cell_count {
let flags = buf[pos];
pos += 1;
flags_out.push(flags);
if (flags & CELL_IS_EXPIRING) != 0 {
read_uvint(buf, &mut pos);
read_uvint(buf, &mut pos);
read_uvint(buf, &mut pos);
}
let path_len = read_uvint(buf, &mut pos) as usize;
pos += path_len;
if (flags & CELL_HAS_EMPTY_VALUE) == 0 {
let value_len = read_uvint(buf, &mut pos) as usize;
pos += value_len;
}
}
flags_out
}
#[test]
fn test_set_complex_column_with_ttl() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Set(vec![
Value::Text("alpha".to_string()),
Value::Text("beta".to_string()),
]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, Some(3600))
.unwrap();
let cell_flags = parse_complex_cell_flags(&buf);
let expected_flags = CELL_IS_EXPIRING | CELL_HAS_EMPTY_VALUE;
assert_eq!(
cell_flags.len(),
2,
"SET with 2 elements should produce 2 cells"
);
assert!(
cell_flags.iter().all(|&f| f == expected_flags),
"SET with TTL: all cells should have IS_EXPIRING | HAS_EMPTY_VALUE (0x06), got: {:?}",
cell_flags
);
assert!(
cell_flags
.iter()
.all(|&f| (f & CELL_USE_ROW_TIMESTAMP) == 0),
"SET with TTL should NOT have USE_ROW_TIMESTAMP on any cell, got: {:?}",
cell_flags
);
}
#[test]
fn test_map_complex_column_with_ttl() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "props".to_string(),
data_type: "map<text, int>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Map(vec![(Value::Text("key1".to_string()), Value::Integer(100))]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, Some(7200))
.unwrap();
let cell_flags = parse_complex_cell_flags(&buf);
assert_eq!(
cell_flags.len(),
1,
"MAP with 1 entry should produce 1 cell"
);
assert_eq!(
cell_flags[0] & CELL_IS_EXPIRING,
CELL_IS_EXPIRING,
"MAP with TTL: cell should have IS_EXPIRING flag set, got flags byte: 0x{:02X}",
cell_flags[0]
);
assert_eq!(
cell_flags[0] & CELL_HAS_EMPTY_VALUE,
0,
"MAP with TTL: cell should NOT have HAS_EMPTY_VALUE, got flags byte: 0x{:02X}",
cell_flags[0]
);
}
#[test]
fn test_list_complex_column_with_ttl() {
let stats = create_test_stats();
let writer_ttl = DataWriter::new(stats.clone());
let writer_no_ttl = DataWriter::new(stats);
let column = Column {
name: "items".to_string(),
data_type: "list<int>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::List(vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
]);
let mut buf_ttl = Vec::new();
writer_ttl
.write_complex_column(&mut buf_ttl, &column, &value, 1001000, Some(1800))
.unwrap();
let mut buf_no_ttl = Vec::new();
writer_no_ttl
.write_complex_column(&mut buf_no_ttl, &column, &value, 1001000, None)
.unwrap();
assert!(
buf_ttl.len() > buf_no_ttl.len(),
"LIST with TTL ({} bytes) should be larger than without TTL ({} bytes)",
buf_ttl.len(),
buf_no_ttl.len()
);
let cell_flags_ttl = parse_complex_cell_flags(&buf_ttl);
assert_eq!(
cell_flags_ttl.len(),
3,
"LIST with 3 elements should produce 3 cells"
);
assert!(
cell_flags_ttl.iter().all(|&f| (f & CELL_IS_EXPIRING) != 0),
"LIST with TTL: all cells should have IS_EXPIRING flag set, got: {:?}",
cell_flags_ttl
);
let cell_flags_no_ttl = parse_complex_cell_flags(&buf_no_ttl);
assert_eq!(cell_flags_no_ttl.len(), 3);
assert!(
cell_flags_no_ttl
.iter()
.all(|&f| (f & CELL_IS_EXPIRING) == 0),
"LIST without TTL: no cells should have IS_EXPIRING flag, got: {:?}",
cell_flags_no_ttl
);
}
#[test]
fn test_complex_column_no_ttl_uses_row_timestamp() {
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let column = Column {
name: "tags".to_string(),
data_type: "set<text>".to_string(),
nullable: true,
default: None,
is_static: false,
};
let value = Value::Set(vec![Value::Text("x".to_string())]);
let mut buf = Vec::new();
writer
.write_complex_column(&mut buf, &column, &value, 1001000, None)
.unwrap();
let expected_flags = CELL_USE_ROW_TIMESTAMP | CELL_HAS_EMPTY_VALUE;
let count = buf.iter().filter(|&&b| b == expected_flags).count();
assert_eq!(
count, 1,
"Without TTL, SET cells should use USE_ROW_TIMESTAMP"
);
}
#[test]
fn test_bitmap_includes_deleted_columns() {
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "age".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![
CellOperation::Delete {
column: "age".to_string(),
},
CellOperation::Write {
column: "name".to_string(),
value: Value::Text("Alice".to_string()),
},
],
1001000,
None,
);
let mut buf = Vec::new();
writer
.write_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(buf.len(), 1, "Bitmap should be a single byte");
assert_eq!(
buf[0], 0,
"Bitmap should be 0 (all columns present) when both write and delete cover all columns"
);
}
#[test]
fn test_bitmap_delete_only_column_is_present() {
let schema = TableSchema {
keyspace: "test_ks".to_string(),
table: "test_table".to_string(),
partition_keys: vec![KeyColumn {
name: "id".to_string(),
data_type: "int".to_string(),
position: 0,
}],
clustering_keys: vec![],
columns: vec![
Column {
name: "age".to_string(),
data_type: "int".to_string(),
nullable: true,
default: None,
is_static: false,
},
Column {
name: "name".to_string(),
data_type: "text".to_string(),
nullable: true,
default: None,
is_static: false,
},
],
comments: HashMap::new(),
};
let stats = create_test_stats();
let writer = DataWriter::new(stats);
let table_id = TableId::new("test_ks", "test_table");
let pk = PartitionKey::single("id", Value::Integer(1));
let mutation = Mutation::new(
table_id,
pk,
None,
vec![CellOperation::Delete {
column: "age".to_string(),
}],
1001000,
None,
);
let mut buf = Vec::new();
writer
.write_column_bitmap(&mut buf, &mutation, &schema)
.unwrap();
assert_eq!(buf.len(), 1);
assert_eq!(
buf[0], 2,
"Bitmap should mark 'name' as missing (bit 1) but 'age' as present (bit 0)"
);
}
fn streaming_test_partitions() -> Vec<(DecoratedKey, Vec<Mutation>)> {
let table_id = TableId::new("test_ks", "test_table");
(0..16u32)
.map(|i| {
let key = DecoratedKey::new(i as i64, i.to_be_bytes().to_vec());
let pk = PartitionKey::single("id", Value::Integer(i as i32));
let mutation = Mutation::new(
table_id.clone(),
pk,
None,
vec![CellOperation::Write {
column: "name".to_string(),
value: Value::Text(format!("partition-{i}")),
}],
1_001_000 + i as i64,
None,
);
(key, vec![mutation])
})
.collect()
}
#[test]
fn test_streaming_writer_byte_identical_to_in_memory() {
let schema = create_test_schema();
let partitions = streaming_test_partitions();
let mut mem_writer = DataWriter::new(create_test_stats());
let mut mem_offsets = Vec::new();
for (key, mutations) in &partitions {
mem_offsets.push(
mem_writer
.write_partition(key, mutations, &schema, None, &[])
.unwrap(),
);
}
let expected_bytes = mem_writer.finish().unwrap();
let dir = tempfile::tempdir().unwrap();
let data_path = dir.path().join("nb-1-big-Data.db");
let mut stream_writer = DataWriter::with_sink(create_test_stats(), data_path.clone());
let mut stream_offsets = Vec::new();
for (key, mutations) in &partitions {
stream_offsets.push(
stream_writer
.write_partition(key, mutations, &schema, None, &[])
.unwrap(),
);
}
let data_size = stream_writer.finish_streaming().unwrap();
assert_eq!(
stream_offsets, mem_offsets,
"streaming partition offsets must equal in-memory offsets"
);
let on_disk = std::fs::read(&data_path).unwrap();
assert_eq!(
on_disk, expected_bytes,
"streamed Data.db must be byte-identical to in-memory Data.db"
);
assert_eq!(
data_size as usize,
expected_bytes.len(),
"finish_streaming() data_size must equal file length"
);
for &off in &stream_offsets {
assert_eq!(
&on_disk[off as usize..off as usize + 2],
&[0x00, 0x04],
"offset {off} must land on a partition's key-length prefix"
);
}
}
#[test]
fn test_streaming_writer_bounds_memory_to_one_partition() {
let schema = create_test_schema();
let partitions = streaming_test_partitions();
let dir = tempfile::tempdir().unwrap();
let data_path = dir.path().join("nb-1-big-Data.db");
let mut writer = DataWriter::with_sink(create_test_stats(), data_path);
let mut prev_flushed = 0u64;
let mut max_partition_size = 0usize;
for (i, (key, mutations)) in partitions.iter().enumerate() {
let flushed_before = writer.flushed_position();
writer
.write_partition(key, mutations, &schema, None, &[])
.unwrap();
assert_eq!(
writer.scratch_len(),
0,
"scratch must be cleared after partition {i} (bounded memory)"
);
let flushed_after = writer.flushed_position();
assert!(
flushed_after > flushed_before,
"flushed position must grow after writing partition {i}"
);
let this_partition_size = (flushed_after - flushed_before) as usize;
max_partition_size = max_partition_size.max(this_partition_size);
assert!(flushed_after > prev_flushed);
prev_flushed = flushed_after;
}
let total = writer.finish_streaming().unwrap();
assert_eq!(
total, prev_flushed,
"total size must equal last flushed pos"
);
assert!(
(max_partition_size as u64) < total,
"largest single partition ({max_partition_size}) must be smaller than the full file ({total})"
);
}
}