1use std::sync::Arc;
6
7use nodedb_codec::{ColumnCodec, ColumnTypeHint, ResolvedColumnCodec};
8use nodedb_mem::{EngineId, MemoryGovernor};
9use nodedb_types::columnar::{ColumnType, ColumnarSchema};
10
11use crate::error::ColumnarError;
12use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
13use crate::memtable::ColumnData;
14
15use super::block::encode_column_blocks;
16use super::encode::compute_schema_hash;
17
18pub const PROFILE_PLAIN: u8 = 0;
20pub const PROFILE_TIMESERIES: u8 = 1;
21pub const PROFILE_SPATIAL: u8 = 2;
22
23pub struct SegmentWriter {
29 profile_tag: u8,
30 governor: Option<Arc<MemoryGovernor>>,
33}
34
35impl SegmentWriter {
36 pub fn new(profile_tag: u8) -> Self {
38 Self {
39 profile_tag,
40 governor: None,
41 }
42 }
43
44 pub fn with_governor(profile_tag: u8, governor: Arc<MemoryGovernor>) -> Self {
46 Self {
47 profile_tag,
48 governor: Some(governor),
49 }
50 }
51
52 pub fn plain() -> Self {
54 Self::new(PROFILE_PLAIN)
55 }
56
57 pub fn write_segment(
66 &self,
67 schema: &ColumnarSchema,
68 columns: &[ColumnData],
69 row_count: usize,
70 kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
71 ) -> Result<Vec<u8>, ColumnarError> {
72 if row_count == 0 {
73 return Err(ColumnarError::EmptyMemtable);
74 }
75 if columns.len() != schema.columns.len() {
76 return Err(ColumnarError::SchemaMismatch {
77 expected: schema.columns.len(),
78 got: columns.len(),
79 });
80 }
81
82 let mut buf = Vec::new();
83
84 buf.extend_from_slice(&SegmentHeader::current().to_bytes());
86
87 let _metas_guard = self
89 .governor
90 .as_ref()
91 .map(|g| {
92 g.reserve(
93 EngineId::Columnar,
94 columns.len() * std::mem::size_of::<ColumnMeta>(),
95 )
96 })
97 .transpose()?;
98 let mut column_metas = Vec::with_capacity(columns.len());
100
101 for (i, (col_def, col_data)) in schema.columns.iter().zip(columns.iter()).enumerate() {
102 let col_start = buf.len() as u64;
103
104 let codec = select_codec_for_profile(&col_def.column_type, self.profile_tag);
106
107 let block_stats = encode_column_blocks(
109 &mut buf,
110 col_data,
111 &col_def.column_type,
112 codec,
113 row_count,
114 self.governor.as_ref(),
115 )?;
116
117 let col_end = buf.len() as u64;
118
119 let (effective_codec, dictionary) = match col_data {
122 ColumnData::DictEncoded { dictionary, .. } => (
123 ResolvedColumnCodec::DeltaFastLanesLz4,
124 Some(dictionary.clone()),
125 ),
126 _ => (codec, None),
127 };
128
129 column_metas.push(ColumnMeta {
130 name: col_def.name.clone(),
131 offset: col_start - HEADER_SIZE as u64,
132 length: col_end - col_start,
133 codec: effective_codec,
134 block_count: block_stats.len() as u32,
135 block_stats,
136 dictionary,
137 });
138
139 let _ = i; }
141
142 let schema_hash = compute_schema_hash(schema);
144
145 let footer = SegmentFooter {
147 schema_hash,
148 column_count: schema.columns.len() as u32,
149 row_count: row_count as u64,
150 profile_tag: self.profile_tag,
151 columns: column_metas,
152 };
153 let footer_bytes = footer.to_bytes()?;
154 buf.extend_from_slice(&footer_bytes);
155
156 if let Some(key) = kek {
158 return crate::encrypt::encrypt_segment(key, &buf);
159 }
160
161 Ok(buf)
162 }
163}
164
165pub fn select_codec_for_profile(col_type: &ColumnType, profile_tag: u8) -> ResolvedColumnCodec {
173 if profile_tag == PROFILE_TIMESERIES && matches!(col_type, ColumnType::Float64) {
175 return ResolvedColumnCodec::Gorilla;
176 }
177 if profile_tag == PROFILE_TIMESERIES
179 && matches!(col_type, ColumnType::Timestamp | ColumnType::Timestamptz)
180 {
181 return ResolvedColumnCodec::DeltaFastLanesLz4;
182 }
183 select_codec(col_type)
184}
185
186fn select_codec(col_type: &ColumnType) -> ResolvedColumnCodec {
191 let hint = match col_type {
192 ColumnType::Int64 => ColumnTypeHint::Int64,
193 ColumnType::Float64 => ColumnTypeHint::Float64,
194 ColumnType::Timestamp | ColumnType::Timestamptz | ColumnType::SystemTimestamp => {
195 ColumnTypeHint::Timestamp
196 }
197 ColumnType::String | ColumnType::Geometry | ColumnType::Regex => ColumnTypeHint::String,
198 ColumnType::Bool
199 | ColumnType::Bytes
200 | ColumnType::Decimal { .. }
201 | ColumnType::Uuid
202 | ColumnType::Ulid
203 | ColumnType::Json
204 | ColumnType::Array
205 | ColumnType::Set
206 | ColumnType::Range
207 | ColumnType::Record => {
208 return ResolvedColumnCodec::Lz4;
209 }
210 ColumnType::Duration => ColumnTypeHint::Int64, ColumnType::Vector(_) => {
212 return ResolvedColumnCodec::Lz4;
213 }
214 _ => {
217 return ResolvedColumnCodec::Lz4;
218 }
219 };
220 nodedb_codec::detect_codec(ColumnCodec::Auto, hint)
224 .try_resolve()
225 .unwrap_or(ResolvedColumnCodec::Lz4)
226}
227
228#[cfg(test)]
229mod tests {
230 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
231 use nodedb_types::value::Value;
232
233 use super::*;
234 use crate::format::{SegmentFooter, SegmentHeader};
235 use crate::memtable::ColumnarMemtable;
236
237 fn analytics_schema() -> ColumnarSchema {
238 ColumnarSchema::new(vec![
239 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
240 ColumnDef::required("name", ColumnType::String),
241 ColumnDef::nullable("score", ColumnType::Float64),
242 ])
243 .expect("valid")
244 }
245
246 #[test]
251 fn auto_codec_resolves_to_concrete_before_write() {
252 let schema = ColumnarSchema::new(vec![
253 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
254 ColumnDef::required("name", ColumnType::String),
255 ColumnDef::nullable("score", ColumnType::Float64),
256 ])
257 .expect("valid");
258
259 let mut mt = ColumnarMemtable::new(&schema);
260 for i in 0..10 {
261 mt.append_row(&[
262 Value::Integer(i),
263 Value::String(format!("item_{i}")),
264 Value::Float(i as f64 * 1.5),
265 ])
266 .expect("append");
267 }
268 let (schema, columns, row_count) = mt.drain();
269 let writer = SegmentWriter::plain();
270 let segment = writer
271 .write_segment(&schema, &columns, row_count, None)
272 .expect("write must succeed");
273
274 let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
275
276 for col in &footer.columns {
279 let encoded = zerompk::to_msgpack_vec(&col.codec).expect("serialize");
283 let discriminant_byte = *encoded.last().expect("non-empty");
286 assert_ne!(
287 discriminant_byte, 0,
288 "column '{}' has Auto discriminant (0) on disk — resolve was skipped",
289 col.name
290 );
291 }
292 }
293
294 #[test]
296 fn auto_codec_int64_resolves_to_non_raw() {
297 use nodedb_codec::ResolvedColumnCodec;
298
299 let schema = ColumnarSchema::new(vec![
300 ColumnDef::required("val", ColumnType::Int64).with_primary_key(),
301 ])
302 .expect("valid");
303
304 let mut mt = ColumnarMemtable::new(&schema);
305 for i in 0..10 {
306 mt.append_row(&[Value::Integer(i)]).expect("append");
307 }
308 let (schema, columns, row_count) = mt.drain();
309 let writer = SegmentWriter::plain();
310 let segment = writer
311 .write_segment(&schema, &columns, row_count, None)
312 .expect("write");
313 let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
314
315 let codec = footer.columns[0].codec;
317 assert_ne!(
318 codec,
319 ResolvedColumnCodec::Raw,
320 "Int64 should not resolve to Raw"
321 );
322 }
323
324 #[test]
325 fn write_segment_roundtrip() {
326 let schema = analytics_schema();
327 let mut mt = ColumnarMemtable::new(&schema);
328
329 for i in 0..100 {
330 mt.append_row(&[
331 Value::Integer(i),
332 Value::String(format!("user_{i}")),
333 if i % 3 == 0 {
334 Value::Null
335 } else {
336 Value::Float(i as f64 * 0.25)
337 },
338 ])
339 .expect("append");
340 }
341
342 let (schema, columns, row_count) = mt.drain();
343 let writer = SegmentWriter::plain();
344 let segment = writer
345 .write_segment(&schema, &columns, row_count, None)
346 .expect("write");
347
348 let header = SegmentHeader::from_bytes(&segment).expect("valid header");
350 assert_eq!(header.magic, *b"NDBS");
351 assert_eq!(header.version_major, 1);
352
353 let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
355 assert_eq!(footer.column_count, 3);
356 assert_eq!(footer.row_count, 100);
357 assert_eq!(footer.profile_tag, PROFILE_PLAIN);
358 assert_eq!(footer.columns.len(), 3);
359
360 assert_eq!(footer.columns[0].name, "id");
362 assert_eq!(footer.columns[1].name, "name");
363 assert_eq!(footer.columns[2].name, "score");
364
365 assert_eq!(footer.columns[0].block_count, 1);
367 assert_eq!(footer.columns[0].block_stats[0].row_count, 100);
368
369 assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
371 assert_eq!(footer.columns[0].block_stats[0].max, 99.0);
372 assert_eq!(footer.columns[0].block_stats[0].null_count, 0);
373
374 assert_eq!(footer.columns[2].block_stats[0].null_count, 34);
376 }
377
378 #[test]
379 fn write_segment_multi_block() {
380 let schema =
381 ColumnarSchema::new(vec![ColumnDef::required("x", ColumnType::Int64)]).expect("valid");
382
383 let mut mt = ColumnarMemtable::new(&schema);
384 for i in 0..2500 {
385 mt.append_row(&[Value::Integer(i)]).expect("append");
386 }
387
388 let (schema, columns, row_count) = mt.drain();
389 let writer = SegmentWriter::plain();
390 let segment = writer
391 .write_segment(&schema, &columns, row_count, None)
392 .expect("write");
393
394 let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
395 assert_eq!(footer.row_count, 2500);
396
397 assert_eq!(footer.columns[0].block_count, 3);
399 assert_eq!(footer.columns[0].block_stats[0].row_count, 1024);
400 assert_eq!(footer.columns[0].block_stats[1].row_count, 1024);
401 assert_eq!(footer.columns[0].block_stats[2].row_count, 452);
402
403 assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
405 assert_eq!(footer.columns[0].block_stats[0].max, 1023.0);
406 assert_eq!(footer.columns[0].block_stats[2].min, 2048.0);
408 assert_eq!(footer.columns[0].block_stats[2].max, 2499.0);
409 }
410
411 #[test]
412 fn write_segment_empty_rejected() {
413 let schema = analytics_schema();
414 let mt = ColumnarMemtable::new(&schema);
415 let (schema, columns, row_count) = {
416 let mut m = mt;
417 m.drain()
418 };
419 let writer = SegmentWriter::plain();
420 assert!(matches!(
421 writer.write_segment(&schema, &columns, row_count, None),
422 Err(ColumnarError::EmptyMemtable)
423 ));
424 }
425
426 #[test]
427 fn block_stats_predicate_pushdown() {
428 let schema = analytics_schema();
429 let mut mt = ColumnarMemtable::new(&schema);
430
431 for i in 0..50 {
432 mt.append_row(&[
433 Value::Integer(i + 100),
434 Value::String(format!("item_{i}")),
435 Value::Float(i as f64 + 10.0),
436 ])
437 .expect("append");
438 }
439
440 let (schema, columns, row_count) = mt.drain();
441 let writer = SegmentWriter::plain();
442 let segment = writer
443 .write_segment(&schema, &columns, row_count, None)
444 .expect("write");
445 let footer = SegmentFooter::from_segment_tail(&segment).expect("valid");
446
447 use crate::predicate::ScanPredicate;
448
449 let id_stats = &footer.columns[0].block_stats[0];
450 assert!(ScanPredicate::gt(0, 200.0).can_skip_block(id_stats)); assert!(!ScanPredicate::gt(0, 120.0).can_skip_block(id_stats)); assert!(ScanPredicate::lt(0, 50.0).can_skip_block(id_stats)); assert!(ScanPredicate::eq(0, 200.0).can_skip_block(id_stats)); assert!(!ScanPredicate::eq(0, 125.0).can_skip_block(id_stats)); }
457
458 #[test]
459 fn string_block_stats_zone_map() {
460 let schema = ColumnarSchema::new(vec![ColumnDef::required("tag", ColumnType::String)])
462 .expect("valid");
463
464 let mut mt = ColumnarMemtable::new(&schema);
465 let values: Vec<String> = (0..20).map(|i| format!("item_{i:02}")).collect();
468 for name in &values {
469 mt.append_row(&[Value::String(name.clone())])
470 .expect("append");
471 }
472 mt.append_row(&[Value::String("apple".into())])
474 .expect("append");
475 mt.append_row(&[Value::String("date".into())])
476 .expect("append");
477
478 let (schema, columns, row_count) = mt.drain();
479 let writer = SegmentWriter::plain();
480 let segment = writer
481 .write_segment(&schema, &columns, row_count, None)
482 .expect("write");
483 let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
484
485 let stats = &footer.columns[0].block_stats[0];
486 assert!(stats.str_min.is_some(), "str_min should be populated");
487 assert!(stats.str_max.is_some(), "str_max should be populated");
488 assert_eq!(stats.str_min.as_deref(), Some("apple"));
490 assert_eq!(stats.str_max.as_deref(), Some("item_19"));
491
492 assert!(
494 stats.bloom.is_some(),
495 "bloom should be populated for >16 distinct values"
496 );
497
498 use crate::predicate::ScanPredicate;
499
500 assert!(ScanPredicate::str_eq(0, "aaa").can_skip_block(stats));
502 assert!(ScanPredicate::str_eq(0, "zzz").can_skip_block(stats));
504 assert!(!ScanPredicate::str_eq(0, "date").can_skip_block(stats));
506 assert!(ScanPredicate::str_gt(0, "item_19").can_skip_block(stats));
508 assert!(ScanPredicate::str_lt(0, "apple").can_skip_block(stats));
510 }
511
512 #[test]
516 fn timestamp_large_value_roundtrip() {
517 use crate::predicate::ScanPredicate;
518
519 let schema = ColumnarSchema::new(vec![
520 ColumnDef::required("ts", ColumnType::Timestamp).with_primary_key(),
521 ])
522 .expect("valid schema");
523
524 let base: i64 = 10_413_792_000_000_000;
528 let target = base + 500_000; let mut mt = ColumnarMemtable::new(&schema);
531 for delta in 0..10i64 {
532 mt.append_row(&[Value::Integer(base + delta * 100_000)])
533 .expect("append");
534 }
535
536 let (schema, columns, row_count) = mt.drain();
537 let segment = SegmentWriter::plain()
538 .write_segment(&schema, &columns, row_count, None)
539 .expect("write");
540 let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
541
542 let stats = &footer.columns[0].block_stats[0];
543
544 assert!(
546 stats.min_i64.is_some(),
547 "min_i64 must be set for timestamp columns"
548 );
549 assert!(
550 stats.max_i64.is_some(),
551 "max_i64 must be set for timestamp columns"
552 );
553 assert_eq!(stats.min_i64.unwrap(), base);
554 assert_eq!(stats.max_i64.unwrap(), base + 9 * 100_000);
555
556 assert!(
558 !ScanPredicate::eq_i64(0, target).can_skip_block(stats),
559 "must not skip: target={target} is within the block range"
560 );
561
562 assert!(
564 ScanPredicate::eq_i64(0, base - 1).can_skip_block(stats),
565 "must skip: base-1 is below block min"
566 );
567
568 let min_f64 = base as f64;
571 let target_f64 = target as f64;
572 let max_f64 = (base + 9 * 100_000) as f64;
573 let _ = (min_f64, target_f64, max_f64); }
577
578 #[test]
579 fn integer_block_stats_have_exact_i64_fields() {
580 let schema = ColumnarSchema::new(vec![
582 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
583 ])
584 .expect("valid");
585
586 let mut mt = ColumnarMemtable::new(&schema);
587 for i in 0..5i64 {
588 mt.append_row(&[Value::Integer(i64::MAX - 4 + i)])
589 .expect("append");
590 }
591
592 let (schema, columns, row_count) = mt.drain();
593 let segment = SegmentWriter::plain()
594 .write_segment(&schema, &columns, row_count, None)
595 .expect("write");
596 let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
597
598 let stats = &footer.columns[0].block_stats[0];
599 assert_eq!(stats.min_i64, Some(i64::MAX - 4));
600 assert_eq!(stats.max_i64, Some(i64::MAX));
601
602 use crate::predicate::ScanPredicate;
604 assert!(!ScanPredicate::eq_i64(0, i64::MAX - 2).can_skip_block(stats));
605 assert!(ScanPredicate::eq_i64(0, i64::MAX - 10).can_skip_block(stats));
607 }
608
609 #[test]
610 fn string_block_stats_bloom_rejects_absent_value() {
611 let schema = ColumnarSchema::new(vec![ColumnDef::required("label", ColumnType::String)])
612 .expect("valid");
613
614 let mut mt = ColumnarMemtable::new(&schema);
615 let values: Vec<String> = (0..20).map(|i| format!("val_{i:02}")).collect();
617 for name in &values {
618 mt.append_row(&[Value::String(name.clone())])
619 .expect("append");
620 }
621 mt.append_row(&[Value::String("alpha".into())])
623 .expect("append");
624 mt.append_row(&[Value::String("beta".into())])
625 .expect("append");
626 mt.append_row(&[Value::String("gamma".into())])
627 .expect("append");
628
629 let (schema, columns, row_count) = mt.drain();
630 let segment = SegmentWriter::plain()
631 .write_segment(&schema, &columns, row_count, None)
632 .expect("write");
633 let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
634 let stats = &footer.columns[0].block_stats[0];
635
636 use crate::predicate::{ScanPredicate, bloom_may_contain};
637
638 let bloom = stats
639 .bloom
640 .as_ref()
641 .expect("bloom present for >16 distinct");
642 assert!(bloom_may_contain(bloom, "alpha"));
643 assert!(bloom_may_contain(bloom, "beta"));
644 assert!(bloom_may_contain(bloom, "gamma"));
645
646 let delta_absent = !bloom_may_contain(bloom, "delta");
648 if delta_absent {
649 assert!(ScanPredicate::str_eq(0, "delta").can_skip_block(stats));
651 }
652 }
653}