use std::sync::Arc;
use nodedb_codec::{ColumnCodec, ColumnTypeHint, ResolvedColumnCodec};
use nodedb_mem::{EngineId, MemoryGovernor};
use nodedb_types::columnar::{ColumnType, ColumnarSchema};
use crate::error::ColumnarError;
use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
use crate::memtable::ColumnData;
use super::block::encode_column_blocks;
use super::encode::compute_schema_hash;
pub const PROFILE_PLAIN: u8 = 0;
pub const PROFILE_TIMESERIES: u8 = 1;
pub const PROFILE_SPATIAL: u8 = 2;
pub struct SegmentWriter {
profile_tag: u8,
governor: Option<Arc<MemoryGovernor>>,
}
impl SegmentWriter {
pub fn new(profile_tag: u8) -> Self {
Self {
profile_tag,
governor: None,
}
}
pub fn with_governor(profile_tag: u8, governor: Arc<MemoryGovernor>) -> Self {
Self {
profile_tag,
governor: Some(governor),
}
}
pub fn plain() -> Self {
Self::new(PROFILE_PLAIN)
}
pub fn write_segment(
&self,
schema: &ColumnarSchema,
columns: &[ColumnData],
row_count: usize,
kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
) -> Result<Vec<u8>, ColumnarError> {
if row_count == 0 {
return Err(ColumnarError::EmptyMemtable);
}
if columns.len() != schema.columns.len() {
return Err(ColumnarError::SchemaMismatch {
expected: schema.columns.len(),
got: columns.len(),
});
}
let mut buf = Vec::new();
buf.extend_from_slice(&SegmentHeader::current().to_bytes());
let _metas_guard = self
.governor
.as_ref()
.map(|g| {
g.reserve(
EngineId::Columnar,
columns.len() * std::mem::size_of::<ColumnMeta>(),
)
})
.transpose()?;
let mut column_metas = Vec::with_capacity(columns.len());
for (i, (col_def, col_data)) in schema.columns.iter().zip(columns.iter()).enumerate() {
let col_start = buf.len() as u64;
let codec = select_codec_for_profile(&col_def.column_type, self.profile_tag);
let block_stats = encode_column_blocks(
&mut buf,
col_data,
&col_def.column_type,
codec,
row_count,
self.governor.as_ref(),
)?;
let col_end = buf.len() as u64;
let (effective_codec, dictionary) = match col_data {
ColumnData::DictEncoded { dictionary, .. } => (
ResolvedColumnCodec::DeltaFastLanesLz4,
Some(dictionary.clone()),
),
_ => (codec, None),
};
column_metas.push(ColumnMeta {
name: col_def.name.clone(),
offset: col_start - HEADER_SIZE as u64,
length: col_end - col_start,
codec: effective_codec,
block_count: block_stats.len() as u32,
block_stats,
dictionary,
});
let _ = i; }
let schema_hash = compute_schema_hash(schema);
let footer = SegmentFooter {
schema_hash,
column_count: schema.columns.len() as u32,
row_count: row_count as u64,
profile_tag: self.profile_tag,
columns: column_metas,
};
let footer_bytes = footer.to_bytes()?;
buf.extend_from_slice(&footer_bytes);
if let Some(key) = kek {
return crate::encrypt::encrypt_segment(key, &buf);
}
Ok(buf)
}
}
pub fn select_codec_for_profile(col_type: &ColumnType, profile_tag: u8) -> ResolvedColumnCodec {
if profile_tag == PROFILE_TIMESERIES && matches!(col_type, ColumnType::Float64) {
return ResolvedColumnCodec::Gorilla;
}
if profile_tag == PROFILE_TIMESERIES
&& matches!(col_type, ColumnType::Timestamp | ColumnType::Timestamptz)
{
return ResolvedColumnCodec::DeltaFastLanesLz4;
}
select_codec(col_type)
}
fn select_codec(col_type: &ColumnType) -> ResolvedColumnCodec {
let hint = match col_type {
ColumnType::Int64 => ColumnTypeHint::Int64,
ColumnType::Float64 => ColumnTypeHint::Float64,
ColumnType::Timestamp | ColumnType::Timestamptz | ColumnType::SystemTimestamp => {
ColumnTypeHint::Timestamp
}
ColumnType::String | ColumnType::Geometry | ColumnType::Regex => ColumnTypeHint::String,
ColumnType::Bool
| ColumnType::Bytes
| ColumnType::Decimal { .. }
| ColumnType::Uuid
| ColumnType::Ulid
| ColumnType::Json
| ColumnType::Array
| ColumnType::Set
| ColumnType::Range
| ColumnType::Record => {
return ResolvedColumnCodec::Lz4;
}
ColumnType::Duration => ColumnTypeHint::Int64, ColumnType::Vector(_) => {
return ResolvedColumnCodec::Lz4;
}
_ => {
return ResolvedColumnCodec::Lz4;
}
};
nodedb_codec::detect_codec(ColumnCodec::Auto, hint)
.try_resolve()
.unwrap_or(ResolvedColumnCodec::Lz4)
}
#[cfg(test)]
mod tests {
use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
use nodedb_types::value::Value;
use super::*;
use crate::format::{SegmentFooter, SegmentHeader};
use crate::memtable::ColumnarMemtable;
fn analytics_schema() -> ColumnarSchema {
ColumnarSchema::new(vec![
ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
ColumnDef::required("name", ColumnType::String),
ColumnDef::nullable("score", ColumnType::Float64),
])
.expect("valid")
}
#[test]
fn auto_codec_resolves_to_concrete_before_write() {
let schema = ColumnarSchema::new(vec![
ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
ColumnDef::required("name", ColumnType::String),
ColumnDef::nullable("score", ColumnType::Float64),
])
.expect("valid");
let mut mt = ColumnarMemtable::new(&schema);
for i in 0..10 {
mt.append_row(&[
Value::Integer(i),
Value::String(format!("item_{i}")),
Value::Float(i as f64 * 1.5),
])
.expect("append");
}
let (schema, columns, row_count) = mt.drain();
let writer = SegmentWriter::plain();
let segment = writer
.write_segment(&schema, &columns, row_count, None)
.expect("write must succeed");
let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
for col in &footer.columns {
let encoded = zerompk::to_msgpack_vec(&col.codec).expect("serialize");
let discriminant_byte = *encoded.last().expect("non-empty");
assert_ne!(
discriminant_byte, 0,
"column '{}' has Auto discriminant (0) on disk — resolve was skipped",
col.name
);
}
}
#[test]
fn auto_codec_int64_resolves_to_non_raw() {
use nodedb_codec::ResolvedColumnCodec;
let schema = ColumnarSchema::new(vec![
ColumnDef::required("val", ColumnType::Int64).with_primary_key(),
])
.expect("valid");
let mut mt = ColumnarMemtable::new(&schema);
for i in 0..10 {
mt.append_row(&[Value::Integer(i)]).expect("append");
}
let (schema, columns, row_count) = mt.drain();
let writer = SegmentWriter::plain();
let segment = writer
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
let codec = footer.columns[0].codec;
assert_ne!(
codec,
ResolvedColumnCodec::Raw,
"Int64 should not resolve to Raw"
);
}
#[test]
fn write_segment_roundtrip() {
let schema = analytics_schema();
let mut mt = ColumnarMemtable::new(&schema);
for i in 0..100 {
mt.append_row(&[
Value::Integer(i),
Value::String(format!("user_{i}")),
if i % 3 == 0 {
Value::Null
} else {
Value::Float(i as f64 * 0.25)
},
])
.expect("append");
}
let (schema, columns, row_count) = mt.drain();
let writer = SegmentWriter::plain();
let segment = writer
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let header = SegmentHeader::from_bytes(&segment).expect("valid header");
assert_eq!(header.magic, *b"NDBS");
assert_eq!(header.version_major, 1);
let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
assert_eq!(footer.column_count, 3);
assert_eq!(footer.row_count, 100);
assert_eq!(footer.profile_tag, PROFILE_PLAIN);
assert_eq!(footer.columns.len(), 3);
assert_eq!(footer.columns[0].name, "id");
assert_eq!(footer.columns[1].name, "name");
assert_eq!(footer.columns[2].name, "score");
assert_eq!(footer.columns[0].block_count, 1);
assert_eq!(footer.columns[0].block_stats[0].row_count, 100);
assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
assert_eq!(footer.columns[0].block_stats[0].max, 99.0);
assert_eq!(footer.columns[0].block_stats[0].null_count, 0);
assert_eq!(footer.columns[2].block_stats[0].null_count, 34);
}
#[test]
fn write_segment_multi_block() {
let schema =
ColumnarSchema::new(vec![ColumnDef::required("x", ColumnType::Int64)]).expect("valid");
let mut mt = ColumnarMemtable::new(&schema);
for i in 0..2500 {
mt.append_row(&[Value::Integer(i)]).expect("append");
}
let (schema, columns, row_count) = mt.drain();
let writer = SegmentWriter::plain();
let segment = writer
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
assert_eq!(footer.row_count, 2500);
assert_eq!(footer.columns[0].block_count, 3);
assert_eq!(footer.columns[0].block_stats[0].row_count, 1024);
assert_eq!(footer.columns[0].block_stats[1].row_count, 1024);
assert_eq!(footer.columns[0].block_stats[2].row_count, 452);
assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
assert_eq!(footer.columns[0].block_stats[0].max, 1023.0);
assert_eq!(footer.columns[0].block_stats[2].min, 2048.0);
assert_eq!(footer.columns[0].block_stats[2].max, 2499.0);
}
#[test]
fn write_segment_empty_rejected() {
let schema = analytics_schema();
let mt = ColumnarMemtable::new(&schema);
let (schema, columns, row_count) = {
let mut m = mt;
m.drain()
};
let writer = SegmentWriter::plain();
assert!(matches!(
writer.write_segment(&schema, &columns, row_count, None),
Err(ColumnarError::EmptyMemtable)
));
}
#[test]
fn block_stats_predicate_pushdown() {
let schema = analytics_schema();
let mut mt = ColumnarMemtable::new(&schema);
for i in 0..50 {
mt.append_row(&[
Value::Integer(i + 100),
Value::String(format!("item_{i}")),
Value::Float(i as f64 + 10.0),
])
.expect("append");
}
let (schema, columns, row_count) = mt.drain();
let writer = SegmentWriter::plain();
let segment = writer
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let footer = SegmentFooter::from_segment_tail(&segment).expect("valid");
use crate::predicate::ScanPredicate;
let id_stats = &footer.columns[0].block_stats[0];
assert!(ScanPredicate::gt(0, 200.0).can_skip_block(id_stats)); assert!(!ScanPredicate::gt(0, 120.0).can_skip_block(id_stats)); assert!(ScanPredicate::lt(0, 50.0).can_skip_block(id_stats)); assert!(ScanPredicate::eq(0, 200.0).can_skip_block(id_stats)); assert!(!ScanPredicate::eq(0, 125.0).can_skip_block(id_stats)); }
#[test]
fn string_block_stats_zone_map() {
let schema = ColumnarSchema::new(vec![ColumnDef::required("tag", ColumnType::String)])
.expect("valid");
let mut mt = ColumnarMemtable::new(&schema);
let values: Vec<String> = (0..20).map(|i| format!("item_{i:02}")).collect();
for name in &values {
mt.append_row(&[Value::String(name.clone())])
.expect("append");
}
mt.append_row(&[Value::String("apple".into())])
.expect("append");
mt.append_row(&[Value::String("date".into())])
.expect("append");
let (schema, columns, row_count) = mt.drain();
let writer = SegmentWriter::plain();
let segment = writer
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
let stats = &footer.columns[0].block_stats[0];
assert!(stats.str_min.is_some(), "str_min should be populated");
assert!(stats.str_max.is_some(), "str_max should be populated");
assert_eq!(stats.str_min.as_deref(), Some("apple"));
assert_eq!(stats.str_max.as_deref(), Some("item_19"));
assert!(
stats.bloom.is_some(),
"bloom should be populated for >16 distinct values"
);
use crate::predicate::ScanPredicate;
assert!(ScanPredicate::str_eq(0, "aaa").can_skip_block(stats));
assert!(ScanPredicate::str_eq(0, "zzz").can_skip_block(stats));
assert!(!ScanPredicate::str_eq(0, "date").can_skip_block(stats));
assert!(ScanPredicate::str_gt(0, "item_19").can_skip_block(stats));
assert!(ScanPredicate::str_lt(0, "apple").can_skip_block(stats));
}
#[test]
fn timestamp_large_value_roundtrip() {
use crate::predicate::ScanPredicate;
let schema = ColumnarSchema::new(vec![
ColumnDef::required("ts", ColumnType::Timestamp).with_primary_key(),
])
.expect("valid schema");
let base: i64 = 10_413_792_000_000_000;
let target = base + 500_000;
let mut mt = ColumnarMemtable::new(&schema);
for delta in 0..10i64 {
mt.append_row(&[Value::Integer(base + delta * 100_000)])
.expect("append");
}
let (schema, columns, row_count) = mt.drain();
let segment = SegmentWriter::plain()
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
let stats = &footer.columns[0].block_stats[0];
assert!(
stats.min_i64.is_some(),
"min_i64 must be set for timestamp columns"
);
assert!(
stats.max_i64.is_some(),
"max_i64 must be set for timestamp columns"
);
assert_eq!(stats.min_i64.unwrap(), base);
assert_eq!(stats.max_i64.unwrap(), base + 9 * 100_000);
assert!(
!ScanPredicate::eq_i64(0, target).can_skip_block(stats),
"must not skip: target={target} is within the block range"
);
assert!(
ScanPredicate::eq_i64(0, base - 1).can_skip_block(stats),
"must skip: base-1 is below block min"
);
let min_f64 = base as f64;
let target_f64 = target as f64;
let max_f64 = (base + 9 * 100_000) as f64;
let _ = (min_f64, target_f64, max_f64); }
#[test]
fn integer_block_stats_have_exact_i64_fields() {
let schema = ColumnarSchema::new(vec![
ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
])
.expect("valid");
let mut mt = ColumnarMemtable::new(&schema);
for i in 0..5i64 {
mt.append_row(&[Value::Integer(i64::MAX - 4 + i)])
.expect("append");
}
let (schema, columns, row_count) = mt.drain();
let segment = SegmentWriter::plain()
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
let stats = &footer.columns[0].block_stats[0];
assert_eq!(stats.min_i64, Some(i64::MAX - 4));
assert_eq!(stats.max_i64, Some(i64::MAX));
use crate::predicate::ScanPredicate;
assert!(!ScanPredicate::eq_i64(0, i64::MAX - 2).can_skip_block(stats));
assert!(ScanPredicate::eq_i64(0, i64::MAX - 10).can_skip_block(stats));
}
#[test]
fn string_block_stats_bloom_rejects_absent_value() {
let schema = ColumnarSchema::new(vec![ColumnDef::required("label", ColumnType::String)])
.expect("valid");
let mut mt = ColumnarMemtable::new(&schema);
let values: Vec<String> = (0..20).map(|i| format!("val_{i:02}")).collect();
for name in &values {
mt.append_row(&[Value::String(name.clone())])
.expect("append");
}
mt.append_row(&[Value::String("alpha".into())])
.expect("append");
mt.append_row(&[Value::String("beta".into())])
.expect("append");
mt.append_row(&[Value::String("gamma".into())])
.expect("append");
let (schema, columns, row_count) = mt.drain();
let segment = SegmentWriter::plain()
.write_segment(&schema, &columns, row_count, None)
.expect("write");
let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
let stats = &footer.columns[0].block_stats[0];
use crate::predicate::{ScanPredicate, bloom_may_contain};
let bloom = stats
.bloom
.as_ref()
.expect("bloom present for >16 distinct");
assert!(bloom_may_contain(bloom, "alpha"));
assert!(bloom_may_contain(bloom, "beta"));
assert!(bloom_may_contain(bloom, "gamma"));
let delta_absent = !bloom_may_contain(bloom, "delta");
if delta_absent {
assert!(ScanPredicate::str_eq(0, "delta").can_skip_block(stats));
}
}
}