Skip to main content

nodedb_columnar/writer/
mod.rs

1//! Segment writer: drains a memtable into a compressed columnar segment.
2//!
3//! Encodes each column through the `nodedb-codec` pipeline in blocks of
4//! BLOCK_SIZE rows. Computes per-block statistics for predicate pushdown.
5//! Assembles the final segment: header + column blocks + footer with CRC32C.
6
7mod block;
8mod encode;
9mod stats;
10
11use nodedb_codec::{ColumnCodec, ColumnTypeHint};
12use nodedb_types::columnar::{ColumnType, ColumnarSchema};
13
14use crate::error::ColumnarError;
15use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
16use crate::memtable::ColumnData;
17
18use self::block::encode_column_blocks;
19use self::encode::compute_schema_hash;
20
21/// Profile tag values for the segment footer.
22pub const PROFILE_PLAIN: u8 = 0;
23pub const PROFILE_TIMESERIES: u8 = 1;
24pub const PROFILE_SPATIAL: u8 = 2;
25
26/// Writes a drained memtable into a complete segment byte buffer.
27///
28/// The segment is self-contained: header identifies the format, column
29/// blocks store compressed data, and the footer enables random access to
30/// any column without scanning the entire file.
31pub struct SegmentWriter {
32    profile_tag: u8,
33}
34
35impl SegmentWriter {
36    /// Create a writer for the given profile.
37    pub fn new(profile_tag: u8) -> Self {
38        Self { profile_tag }
39    }
40
41    /// Create a writer for the plain (default) profile.
42    pub fn plain() -> Self {
43        Self::new(PROFILE_PLAIN)
44    }
45
46    /// Encode a drained memtable into a segment byte buffer.
47    ///
48    /// `schema` is the column schema, `columns` are the drained column data,
49    /// `row_count` is the total number of rows.
50    pub fn write_segment(
51        &self,
52        schema: &ColumnarSchema,
53        columns: &[ColumnData],
54        row_count: usize,
55    ) -> Result<Vec<u8>, ColumnarError> {
56        if row_count == 0 {
57            return Err(ColumnarError::EmptyMemtable);
58        }
59        if columns.len() != schema.columns.len() {
60            return Err(ColumnarError::SchemaMismatch {
61                expected: schema.columns.len(),
62                got: columns.len(),
63            });
64        }
65
66        let mut buf = Vec::new();
67
68        // 1. Write header.
69        buf.extend_from_slice(&SegmentHeader::current().to_bytes());
70
71        // 2. Encode each column's blocks.
72        let mut column_metas = Vec::with_capacity(columns.len());
73
74        for (i, (col_def, col_data)) in schema.columns.iter().zip(columns.iter()).enumerate() {
75            let col_start = buf.len() as u64;
76
77            // Select codec for this column type.
78            let codec = select_codec_for_profile(&col_def.column_type, self.profile_tag);
79
80            // Encode blocks.
81            let block_stats =
82                encode_column_blocks(&mut buf, col_data, &col_def.column_type, codec, row_count)?;
83
84            let col_end = buf.len() as u64;
85
86            // For DictEncoded columns, the codec stored in meta is DeltaFastLanesLz4 (IDs),
87            // and the dictionary strings are stored in the meta for reader reconstruction.
88            let (effective_codec, dictionary) = match col_data {
89                ColumnData::DictEncoded { dictionary, .. } => {
90                    (ColumnCodec::DeltaFastLanesLz4, Some(dictionary.clone()))
91                }
92                _ => (codec, None),
93            };
94
95            column_metas.push(ColumnMeta {
96                name: col_def.name.clone(),
97                offset: col_start - HEADER_SIZE as u64,
98                length: col_end - col_start,
99                codec: effective_codec,
100                block_count: block_stats.len() as u32,
101                block_stats,
102                dictionary,
103            });
104
105            let _ = i; // Satisfy clippy about unused index.
106        }
107
108        // 3. Compute schema hash (simple hash of column names + types).
109        let schema_hash = compute_schema_hash(schema);
110
111        // 4. Write footer.
112        let footer = SegmentFooter {
113            schema_hash,
114            column_count: schema.columns.len() as u32,
115            row_count: row_count as u64,
116            profile_tag: self.profile_tag,
117            columns: column_metas,
118        };
119        let footer_bytes = footer.to_bytes()?;
120        buf.extend_from_slice(&footer_bytes);
121
122        Ok(buf)
123    }
124}
125
126/// Select the best codec for a column type, with profile-aware overrides.
127///
128/// For timeseries profiles (tag=1), Float64 metric columns use Gorilla XOR
129/// encoding when the data is monotonic/slowly-changing. For other profiles,
130/// the standard auto-detection pipeline applies.
131pub fn select_codec_for_profile(col_type: &ColumnType, profile_tag: u8) -> ColumnCodec {
132    // Timeseries profile: prefer Gorilla for Float64 metrics.
133    if profile_tag == PROFILE_TIMESERIES && matches!(col_type, ColumnType::Float64) {
134        return ColumnCodec::Gorilla;
135    }
136    // Timeseries profile: delta-of-delta for timestamps.
137    if profile_tag == PROFILE_TIMESERIES && matches!(col_type, ColumnType::Timestamp) {
138        return ColumnCodec::DeltaFastLanesLz4;
139    }
140    select_codec(col_type)
141}
142
143/// Select the best codec for a column type using nodedb-codec's auto-detection.
144fn select_codec(col_type: &ColumnType) -> ColumnCodec {
145    let hint = match col_type {
146        ColumnType::Int64 => ColumnTypeHint::Int64,
147        ColumnType::Float64 => ColumnTypeHint::Float64,
148        ColumnType::Timestamp => ColumnTypeHint::Timestamp,
149        ColumnType::String | ColumnType::Geometry | ColumnType::Regex => ColumnTypeHint::String,
150        ColumnType::Bool
151        | ColumnType::Bytes
152        | ColumnType::Decimal
153        | ColumnType::Uuid
154        | ColumnType::Ulid
155        | ColumnType::Json
156        | ColumnType::Array
157        | ColumnType::Set
158        | ColumnType::Range
159        | ColumnType::Record => {
160            return ColumnCodec::Lz4;
161        }
162        ColumnType::Duration => ColumnTypeHint::Int64, // i64 microseconds
163        ColumnType::Vector(_) => {
164            return ColumnCodec::Lz4;
165        }
166    };
167    nodedb_codec::detect_codec(ColumnCodec::Auto, hint)
168}
169
170#[cfg(test)]
171mod tests {
172    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
173    use nodedb_types::value::Value;
174
175    use super::*;
176    use crate::format::{SegmentFooter, SegmentHeader};
177    use crate::memtable::ColumnarMemtable;
178
179    fn analytics_schema() -> ColumnarSchema {
180        ColumnarSchema::new(vec![
181            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
182            ColumnDef::required("name", ColumnType::String),
183            ColumnDef::nullable("score", ColumnType::Float64),
184        ])
185        .expect("valid")
186    }
187
188    #[test]
189    fn write_segment_roundtrip() {
190        let schema = analytics_schema();
191        let mut mt = ColumnarMemtable::new(&schema);
192
193        for i in 0..100 {
194            mt.append_row(&[
195                Value::Integer(i),
196                Value::String(format!("user_{i}")),
197                if i % 3 == 0 {
198                    Value::Null
199                } else {
200                    Value::Float(i as f64 * 0.25)
201                },
202            ])
203            .expect("append");
204        }
205
206        let (schema, columns, row_count) = mt.drain();
207        let writer = SegmentWriter::plain();
208        let segment = writer
209            .write_segment(&schema, &columns, row_count)
210            .expect("write");
211
212        // Verify header.
213        let header = SegmentHeader::from_bytes(&segment).expect("valid header");
214        assert_eq!(header.magic, *b"NDBS");
215        assert_eq!(header.version_major, 1);
216
217        // Verify footer.
218        let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
219        assert_eq!(footer.column_count, 3);
220        assert_eq!(footer.row_count, 100);
221        assert_eq!(footer.profile_tag, PROFILE_PLAIN);
222        assert_eq!(footer.columns.len(), 3);
223
224        // Verify column metadata.
225        assert_eq!(footer.columns[0].name, "id");
226        assert_eq!(footer.columns[1].name, "name");
227        assert_eq!(footer.columns[2].name, "score");
228
229        // Each column should have 1 block (100 rows < BLOCK_SIZE=1024).
230        assert_eq!(footer.columns[0].block_count, 1);
231        assert_eq!(footer.columns[0].block_stats[0].row_count, 100);
232
233        // id: min=0, max=99.
234        assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
235        assert_eq!(footer.columns[0].block_stats[0].max, 99.0);
236        assert_eq!(footer.columns[0].block_stats[0].null_count, 0);
237
238        // score: 34 nulls (every 3rd row), min=0.25 (row 1), max=99*0.25=24.75 (row 99).
239        assert_eq!(footer.columns[2].block_stats[0].null_count, 34);
240    }
241
242    #[test]
243    fn write_segment_multi_block() {
244        let schema =
245            ColumnarSchema::new(vec![ColumnDef::required("x", ColumnType::Int64)]).expect("valid");
246
247        let mut mt = ColumnarMemtable::new(&schema);
248        for i in 0..2500 {
249            mt.append_row(&[Value::Integer(i)]).expect("append");
250        }
251
252        let (schema, columns, row_count) = mt.drain();
253        let writer = SegmentWriter::plain();
254        let segment = writer
255            .write_segment(&schema, &columns, row_count)
256            .expect("write");
257
258        let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
259        assert_eq!(footer.row_count, 2500);
260
261        // 2500 rows / 1024 = 3 blocks (1024 + 1024 + 452).
262        assert_eq!(footer.columns[0].block_count, 3);
263        assert_eq!(footer.columns[0].block_stats[0].row_count, 1024);
264        assert_eq!(footer.columns[0].block_stats[1].row_count, 1024);
265        assert_eq!(footer.columns[0].block_stats[2].row_count, 452);
266
267        // Block 0: min=0, max=1023.
268        assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
269        assert_eq!(footer.columns[0].block_stats[0].max, 1023.0);
270        // Block 2: min=2048, max=2499.
271        assert_eq!(footer.columns[0].block_stats[2].min, 2048.0);
272        assert_eq!(footer.columns[0].block_stats[2].max, 2499.0);
273    }
274
275    #[test]
276    fn write_segment_empty_rejected() {
277        let schema = analytics_schema();
278        let mt = ColumnarMemtable::new(&schema);
279        let (schema, columns, row_count) = {
280            let mut m = mt;
281            m.drain()
282        };
283        let writer = SegmentWriter::plain();
284        assert!(matches!(
285            writer.write_segment(&schema, &columns, row_count),
286            Err(ColumnarError::EmptyMemtable)
287        ));
288    }
289
290    #[test]
291    fn block_stats_predicate_pushdown() {
292        let schema = analytics_schema();
293        let mut mt = ColumnarMemtable::new(&schema);
294
295        for i in 0..50 {
296            mt.append_row(&[
297                Value::Integer(i + 100),
298                Value::String(format!("item_{i}")),
299                Value::Float(i as f64 + 10.0),
300            ])
301            .expect("append");
302        }
303
304        let (schema, columns, row_count) = mt.drain();
305        let writer = SegmentWriter::plain();
306        let segment = writer
307            .write_segment(&schema, &columns, row_count)
308            .expect("write");
309        let footer = SegmentFooter::from_segment_tail(&segment).expect("valid");
310
311        use crate::predicate::ScanPredicate;
312
313        let id_stats = &footer.columns[0].block_stats[0];
314        // id: min=100, max=149.
315        assert!(ScanPredicate::gt(0, 200.0).can_skip_block(id_stats)); // WHERE id > 200 → skip.
316        assert!(!ScanPredicate::gt(0, 120.0).can_skip_block(id_stats)); // WHERE id > 120 → cannot skip.
317        assert!(ScanPredicate::lt(0, 50.0).can_skip_block(id_stats)); // WHERE id < 50 → skip.
318        assert!(ScanPredicate::eq(0, 200.0).can_skip_block(id_stats)); // WHERE id = 200 → skip.
319        assert!(!ScanPredicate::eq(0, 125.0).can_skip_block(id_stats)); // WHERE id = 125 → cannot skip.
320    }
321
322    #[test]
323    fn string_block_stats_zone_map() {
324        // Write a segment with known string values, then verify str_min/str_max.
325        let schema = ColumnarSchema::new(vec![ColumnDef::required("tag", ColumnType::String)])
326            .expect("valid");
327
328        let mut mt = ColumnarMemtable::new(&schema);
329        // Insert > 16 distinct values to trigger bloom filter construction.
330        // Lexicographic order: apple < banana < cherry < date (first/last matter for zone map).
331        let values: Vec<String> = (0..20).map(|i| format!("item_{i:02}")).collect();
332        for name in &values {
333            mt.append_row(&[Value::String(name.clone())])
334                .expect("append");
335        }
336        // Add known boundary values for zone-map assertions.
337        mt.append_row(&[Value::String("apple".into())])
338            .expect("append");
339        mt.append_row(&[Value::String("date".into())])
340            .expect("append");
341
342        let (schema, columns, row_count) = mt.drain();
343        let writer = SegmentWriter::plain();
344        let segment = writer
345            .write_segment(&schema, &columns, row_count)
346            .expect("write");
347        let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
348
349        let stats = &footer.columns[0].block_stats[0];
350        assert!(stats.str_min.is_some(), "str_min should be populated");
351        assert!(stats.str_max.is_some(), "str_max should be populated");
352        // "apple" is lex smallest, "item_19" is lex largest (> "date").
353        assert_eq!(stats.str_min.as_deref(), Some("apple"));
354        assert_eq!(stats.str_max.as_deref(), Some("item_19"));
355
356        // Bloom filter should be present (>16 distinct values).
357        assert!(
358            stats.bloom.is_some(),
359            "bloom should be populated for >16 distinct values"
360        );
361
362        use crate::predicate::ScanPredicate;
363
364        // WHERE tag = "aaa" → below "apple" → skip.
365        assert!(ScanPredicate::str_eq(0, "aaa").can_skip_block(stats));
366        // WHERE tag = "zzz" → above "item_19" → skip.
367        assert!(ScanPredicate::str_eq(0, "zzz").can_skip_block(stats));
368        // WHERE tag = "date" → in range [apple, item_19], inserted in bloom → cannot skip.
369        assert!(!ScanPredicate::str_eq(0, "date").can_skip_block(stats));
370        // WHERE tag > "item_19" → smax ≤ value → skip.
371        assert!(ScanPredicate::str_gt(0, "item_19").can_skip_block(stats));
372        // WHERE tag < "apple" → smin ≥ value → skip.
373        assert!(ScanPredicate::str_lt(0, "apple").can_skip_block(stats));
374    }
375
376    #[test]
377    fn string_block_stats_bloom_rejects_absent_value() {
378        let schema = ColumnarSchema::new(vec![ColumnDef::required("label", ColumnType::String)])
379            .expect("valid");
380
381        let mut mt = ColumnarMemtable::new(&schema);
382        // Insert > 16 distinct values to trigger bloom construction.
383        let values: Vec<String> = (0..20).map(|i| format!("val_{i:02}")).collect();
384        for name in &values {
385            mt.append_row(&[Value::String(name.clone())])
386                .expect("append");
387        }
388        // Add known values for bloom assertions.
389        mt.append_row(&[Value::String("alpha".into())])
390            .expect("append");
391        mt.append_row(&[Value::String("beta".into())])
392            .expect("append");
393        mt.append_row(&[Value::String("gamma".into())])
394            .expect("append");
395
396        let (schema, columns, row_count) = mt.drain();
397        let segment = SegmentWriter::plain()
398            .write_segment(&schema, &columns, row_count)
399            .expect("write");
400        let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
401        let stats = &footer.columns[0].block_stats[0];
402
403        use crate::predicate::{ScanPredicate, bloom_may_contain};
404
405        let bloom = stats
406            .bloom
407            .as_deref()
408            .expect("bloom present for >16 distinct");
409        assert!(bloom_may_contain(bloom, "alpha"));
410        assert!(bloom_may_contain(bloom, "beta"));
411        assert!(bloom_may_contain(bloom, "gamma"));
412
413        // "delta" was not inserted. If bloom says absent, the predicate skips.
414        let delta_absent = !bloom_may_contain(bloom, "delta");
415        if delta_absent {
416            // "delta" is in [alpha, val_19] range → only bloom can skip this.
417            assert!(ScanPredicate::str_eq(0, "delta").can_skip_block(stats));
418        }
419    }
420}