Skip to main content

nodedb_columnar/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Columnar segment binary format definitions.
4//!
5//! All multi-byte integers are little-endian. The segment is self-describing:
6//! the footer contains all metadata needed to read any column without
7//! scanning the entire file.
8
9use serde::{Deserialize, Serialize};
10use zerompk::{FromMessagePack, ToMessagePack};
11
12/// Magic bytes identifying a NodeDB columnar segment.
13pub const MAGIC: [u8; 4] = *b"NDBS";
14
15/// Current format major version. Readers reject segments with higher major.
16pub const VERSION_MAJOR: u8 = 1;
17
18/// Current format minor version. Readers tolerate segments with same major
19/// but higher minor (unknown footer fields are ignored).
20pub const VERSION_MINOR: u8 = 1;
21
22/// Endianness marker: 0x01 = little-endian (always LE for NodeDB).
23pub const ENDIANNESS_LE: u8 = 0x01;
24
25/// Rows per block. Each block is independently compressed and decompressible.
26/// 1024 balances compression ratio (larger = better) vs random access
27/// granularity (smaller = less waste on filtered scans).
28pub const BLOCK_SIZE: usize = 1024;
29
30/// Size of the segment header in bytes: magic(4) + major(1) + minor(1) + endianness(1) = 7.
31pub const HEADER_SIZE: usize = 7;
32
33/// Segment header: identifies the file as a NodeDB columnar segment.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub struct SegmentHeader {
36    pub magic: [u8; 4],
37    pub version_major: u8,
38    pub version_minor: u8,
39    pub endianness: u8,
40}
41
42impl SegmentHeader {
43    /// Create a header with the current format version.
44    pub fn current() -> Self {
45        Self {
46            magic: MAGIC,
47            version_major: VERSION_MAJOR,
48            version_minor: VERSION_MINOR,
49            endianness: ENDIANNESS_LE,
50        }
51    }
52
53    /// Serialize the header to bytes.
54    pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
55        let mut buf = [0u8; HEADER_SIZE];
56        buf[0..4].copy_from_slice(&self.magic);
57        buf[4] = self.version_major;
58        buf[5] = self.version_minor;
59        buf[6] = self.endianness;
60        buf
61    }
62
63    /// Parse a header from bytes. Returns None if magic/version is invalid.
64    pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
65        if data.len() < HEADER_SIZE {
66            return Err(crate::error::ColumnarError::TruncatedSegment {
67                expected: HEADER_SIZE,
68                got: data.len(),
69            });
70        }
71
72        let mut magic = [0u8; 4];
73        magic.copy_from_slice(&data[0..4]);
74        if magic != MAGIC {
75            return Err(crate::error::ColumnarError::InvalidMagic(magic));
76        }
77
78        let version_major = data[4];
79        let version_minor = data[5];
80
81        // Reject incompatible major version.
82        if version_major > VERSION_MAJOR {
83            return Err(crate::error::ColumnarError::IncompatibleVersion {
84                reader_major: VERSION_MAJOR,
85                segment_major: version_major,
86                segment_minor: version_minor,
87            });
88        }
89
90        Ok(Self {
91            magic,
92            version_major,
93            version_minor,
94            endianness: data[6],
95        })
96    }
97}
98
99/// On-disk representation of a Bloom filter, bundled with the parameters
100/// that produced it.
101///
102/// Storing `k` and `m` alongside the bytes allows any future reader to
103/// interpret the filter without relying on compile-time constants.
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
105pub struct BloomFilter {
106    /// Number of independent hash functions used when building and querying
107    /// the filter.
108    pub k: u8,
109    /// Size of the bit array in bits. The `bytes` field holds `(m + 7) / 8`
110    /// bytes.
111    pub m: u32,
112    /// Packed bit array.
113    pub bytes: Vec<u8>,
114}
115
116/// Per-block statistics for a single column. Enables predicate pushdown:
117/// skip blocks where `WHERE price > 100` and block's `max_price < 100`.
118#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
119pub struct BlockStats {
120    /// Minimum value in this block (encoded as f64 for uniformity;
121    /// i64 values are cast losslessly for small values; strings use NaN).
122    ///
123    /// For i64/timestamp columns, prefer `min_i64` when available — f64 cannot
124    /// represent all i64 values exactly (values outside ±2^53 may be rounded).
125    pub min: f64,
126    /// Maximum value in this block (see `min` for caveats on i64 precision).
127    pub max: f64,
128    /// Number of null values in this block.
129    pub null_count: u32,
130    /// Number of rows in this block (≤ BLOCK_SIZE, last block may be smaller).
131    pub row_count: u32,
132    /// Lexicographic minimum for string columns (truncated to 32 bytes).
133    /// `None` for numeric, bool, binary, vector, and other non-string columns.
134    #[serde(default, skip_serializing_if = "Option::is_none")]
135    pub str_min: Option<String>,
136    /// Lexicographic maximum for string columns (truncated to 32 bytes).
137    /// `None` for non-string columns.
138    #[serde(default, skip_serializing_if = "Option::is_none")]
139    pub str_max: Option<String>,
140    /// Bloom filter for equality-predicate skipping on string columns.
141    ///
142    /// Carries the filter parameters (`k`, `m`) alongside the bytes so that
143    /// readers never depend on compile-time constants to interpret the filter.
144    /// `None` when there are no non-null string values in the block, or when
145    /// cardinality is too low to justify the overhead.
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    pub bloom: Option<BloomFilter>,
148    /// Exact integer minimum for i64/timestamp columns.
149    ///
150    /// Set alongside `min` (which holds the lossy f64 cast) so that predicates
151    /// with integral values outside ±2^53 can compare losslessly. `None` for
152    /// all non-integer column types and for segments written before minor v1.
153    #[serde(default, skip_serializing_if = "Option::is_none")]
154    pub min_i64: Option<i64>,
155    /// Exact integer maximum for i64/timestamp columns (see `min_i64`).
156    #[serde(default, skip_serializing_if = "Option::is_none")]
157    pub max_i64: Option<i64>,
158}
159
160impl BlockStats {
161    /// Create stats for a numeric block with known min/max.
162    pub fn numeric(min: f64, max: f64, null_count: u32, row_count: u32) -> Self {
163        Self {
164            min,
165            max,
166            null_count,
167            row_count,
168            str_min: None,
169            str_max: None,
170            bloom: None,
171            min_i64: None,
172            max_i64: None,
173        }
174    }
175
176    /// Create stats for an i64 or timestamp column block.
177    ///
178    /// Populates both the lossless `min_i64`/`max_i64` fields AND the lossy
179    /// `min`/`max` f64 fields so that the f64 path remains a valid fallback
180    /// for non-integral predicate values.
181    pub fn integer(min: i64, max: i64, null_count: u32, row_count: u32) -> Self {
182        Self {
183            min: min as f64,
184            max: max as f64,
185            null_count,
186            row_count,
187            str_min: None,
188            str_max: None,
189            bloom: None,
190            min_i64: Some(min),
191            max_i64: Some(max),
192        }
193    }
194
195    /// Create stats for a block with no meaningful numeric min/max (non-numeric columns).
196    pub fn non_numeric(null_count: u32, row_count: u32) -> Self {
197        Self {
198            min: f64::NAN,
199            max: f64::NAN,
200            null_count,
201            row_count,
202            str_min: None,
203            str_max: None,
204            bloom: None,
205            min_i64: None,
206            max_i64: None,
207        }
208    }
209
210    /// Create stats for a string block with lexicographic bounds and bloom filter.
211    pub fn string_block(
212        null_count: u32,
213        row_count: u32,
214        str_min: Option<String>,
215        str_max: Option<String>,
216        bloom: Option<BloomFilter>,
217    ) -> Self {
218        Self {
219            min: f64::NAN,
220            max: f64::NAN,
221            null_count,
222            row_count,
223            str_min,
224            str_max,
225            bloom,
226            min_i64: None,
227            max_i64: None,
228        }
229    }
230}
231
232/// Metadata for a single column within the segment footer.
233#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
234pub struct ColumnMeta {
235    /// Column name (matches schema definition).
236    pub name: String,
237    /// Byte offset of this column's first block from the start of the segment
238    /// (after the header).
239    pub offset: u64,
240    /// Total byte length of all blocks for this column.
241    pub length: u64,
242    /// Codec used for this column's blocks.
243    ///
244    /// Always a concrete, resolved codec — never `Auto`.
245    pub codec: nodedb_codec::ResolvedColumnCodec,
246    /// Number of blocks for this column.
247    pub block_count: u32,
248    /// Per-block statistics (one entry per block).
249    pub block_stats: Vec<BlockStats>,
250    /// Dictionary for dict-encoded columns (`None` for non-dict columns).
251    ///
252    /// Strings are stored in ID order: `dictionary[0]` is ID 0, etc.
253    /// The actual column data is stored as i64 IDs via `DeltaFastLanesLz4`.
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub dictionary: Option<Vec<String>>,
256}
257
258/// Segment footer: contains all metadata needed to read any column.
259///
260/// Serialized as MessagePack at the end of the segment, followed by
261/// a 4-byte footer length (u32 LE) and a 4-byte CRC32C of the
262/// serialized footer bytes.
263///
264/// ```text
265/// [...column blocks...][footer_msgpack][footer_len: u32 LE][footer_crc: u32 LE]
266/// ```
267///
268/// To read: seek to end - 8, read footer_len + footer_crc, seek to
269/// end - 8 - footer_len, read + verify CRC, deserialize footer.
270#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
271pub struct SegmentFooter {
272    /// xxHash64 of the schema definition (for schema compatibility checks).
273    pub schema_hash: u64,
274    /// Number of columns in this segment.
275    pub column_count: u32,
276    /// Total number of rows across all blocks.
277    pub row_count: u64,
278    /// Profile that wrote this segment (0 = plain, 1 = timeseries, 2 = spatial).
279    pub profile_tag: u8,
280    /// Per-column metadata (offsets, codecs, block stats).
281    pub columns: Vec<ColumnMeta>,
282}
283
284impl SegmentFooter {
285    /// Serialize the footer to bytes with trailing length + CRC32C.
286    pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
287        let footer_msgpack = zerompk::to_msgpack_vec(self)
288            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))?;
289
290        let footer_len = footer_msgpack.len() as u32;
291        let footer_crc = crc32c::crc32c(&footer_msgpack);
292
293        // no-governor: cold footer serialize; fixed overhead (msgpack + 8-byte trailer), governed at write call site
294        let mut buf = Vec::with_capacity(footer_msgpack.len() + 8);
295        buf.extend_from_slice(&footer_msgpack);
296        buf.extend_from_slice(&footer_len.to_le_bytes());
297        buf.extend_from_slice(&footer_crc.to_le_bytes());
298        Ok(buf)
299    }
300
301    /// Parse a footer from the tail of a segment byte slice.
302    ///
303    /// Reads footer_len and CRC from the last 8 bytes, then deserializes
304    /// the footer and validates the CRC.
305    pub fn from_segment_tail(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
306        if data.len() < 8 {
307            return Err(crate::error::ColumnarError::TruncatedSegment {
308                expected: 8,
309                got: data.len(),
310            });
311        }
312
313        let tail = &data[data.len() - 8..];
314        let footer_len = u32::from_le_bytes(tail[0..4].try_into().map_err(|_| {
315            crate::error::ColumnarError::Corruption {
316                segment_id: None,
317                reason: "footer length field: expected 4 bytes at segment tail - 8".into(),
318                offset: Some((data.len() - 8) as u64),
319            }
320        })?) as usize;
321        let stored_crc = u32::from_le_bytes(tail[4..8].try_into().map_err(|_| {
322            crate::error::ColumnarError::Corruption {
323                segment_id: None,
324                reason: "footer CRC field: expected 4 bytes at segment tail - 4".into(),
325                offset: Some((data.len() - 4) as u64),
326            }
327        })?);
328
329        let footer_start = data.len().checked_sub(8 + footer_len).ok_or(
330            crate::error::ColumnarError::TruncatedSegment {
331                expected: 8 + footer_len,
332                got: data.len(),
333            },
334        )?;
335
336        let footer_bytes = &data[footer_start..footer_start + footer_len];
337        let computed_crc = crc32c::crc32c(footer_bytes);
338
339        if computed_crc != stored_crc {
340            return Err(crate::error::ColumnarError::FooterCrcMismatch {
341                stored: stored_crc,
342                computed: computed_crc,
343            });
344        }
345
346        zerompk::from_msgpack(footer_bytes)
347            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn header_roundtrip() {
357        let header = SegmentHeader::current();
358        let bytes = header.to_bytes();
359        let parsed = SegmentHeader::from_bytes(&bytes).expect("valid header");
360        assert_eq!(parsed, header);
361    }
362
363    #[test]
364    fn header_invalid_magic() {
365        let mut bytes = SegmentHeader::current().to_bytes();
366        bytes[0] = b'X';
367        assert!(matches!(
368            SegmentHeader::from_bytes(&bytes),
369            Err(crate::error::ColumnarError::InvalidMagic(_))
370        ));
371    }
372
373    #[test]
374    fn header_incompatible_major() {
375        let mut bytes = SegmentHeader::current().to_bytes();
376        bytes[4] = VERSION_MAJOR + 1; // Future major version.
377        assert!(matches!(
378            SegmentHeader::from_bytes(&bytes),
379            Err(crate::error::ColumnarError::IncompatibleVersion { .. })
380        ));
381    }
382
383    #[test]
384    fn header_compatible_minor() {
385        let mut bytes = SegmentHeader::current().to_bytes();
386        bytes[5] = VERSION_MINOR + 5; // Future minor version, same major.
387        let parsed = SegmentHeader::from_bytes(&bytes).expect("compatible minor");
388        assert_eq!(parsed.version_major, VERSION_MAJOR);
389        assert_eq!(parsed.version_minor, VERSION_MINOR + 5);
390    }
391
392    #[test]
393    fn footer_roundtrip() {
394        let footer = SegmentFooter {
395            schema_hash: 0xDEAD_BEEF_CAFE_1234,
396            column_count: 3,
397            row_count: 2048,
398            profile_tag: 0,
399            columns: vec![
400                ColumnMeta {
401                    name: "id".into(),
402                    offset: 7,
403                    length: 512,
404                    codec: nodedb_codec::ResolvedColumnCodec::DeltaFastLanesLz4,
405                    block_count: 2,
406                    block_stats: vec![
407                        BlockStats::numeric(1.0, 1024.0, 0, 1024),
408                        BlockStats::numeric(1025.0, 2048.0, 0, 1024),
409                    ],
410                    dictionary: None,
411                },
412                ColumnMeta {
413                    name: "name".into(),
414                    offset: 519,
415                    length: 256,
416                    codec: nodedb_codec::ResolvedColumnCodec::FsstLz4,
417                    block_count: 2,
418                    block_stats: vec![
419                        BlockStats::non_numeric(0, 1024),
420                        BlockStats::non_numeric(5, 1024),
421                    ],
422                    dictionary: None,
423                },
424                ColumnMeta {
425                    name: "score".into(),
426                    offset: 775,
427                    length: 128,
428                    codec: nodedb_codec::ResolvedColumnCodec::AlpFastLanesLz4,
429                    block_count: 2,
430                    block_stats: vec![
431                        BlockStats::numeric(0.0, 100.0, 10, 1024),
432                        BlockStats::numeric(0.5, 99.5, 3, 1024),
433                    ],
434                    dictionary: None,
435                },
436            ],
437        };
438
439        // Serialize footer to bytes (with length + CRC trailer).
440        let footer_bytes = footer.to_bytes().expect("serialize");
441
442        // Simulate a full segment: header + dummy data + footer.
443        let mut segment = Vec::new();
444        segment.extend_from_slice(&SegmentHeader::current().to_bytes());
445        segment.extend_from_slice(&vec![0u8; 896]); // Dummy column data.
446        segment.extend_from_slice(&footer_bytes);
447
448        let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse footer");
449        assert_eq!(parsed.schema_hash, footer.schema_hash);
450        assert_eq!(parsed.column_count, 3);
451        assert_eq!(parsed.row_count, 2048);
452        assert_eq!(parsed.columns.len(), 3);
453        assert_eq!(parsed.columns[0].name, "id");
454        assert_eq!(parsed.columns[1].name, "name");
455        assert_eq!(parsed.columns[2].name, "score");
456    }
457
458    #[test]
459    fn footer_crc_mismatch() {
460        let footer = SegmentFooter {
461            schema_hash: 0,
462            column_count: 0,
463            row_count: 0,
464            profile_tag: 0,
465            columns: vec![],
466        };
467        let mut bytes = footer.to_bytes().expect("serialize");
468        // Corrupt the CRC.
469        let len = bytes.len();
470        bytes[len - 1] ^= 0xFF;
471
472        assert!(matches!(
473            SegmentFooter::from_segment_tail(&bytes),
474            Err(crate::error::ColumnarError::FooterCrcMismatch { .. })
475        ));
476    }
477
478    #[test]
479    fn block_stats_predicate_skip() {
480        let stats = BlockStats::numeric(10.0, 50.0, 0, 1024);
481
482        use crate::predicate::ScanPredicate;
483
484        // WHERE x > 60 → can skip (max=50 ≤ 60).
485        assert!(ScanPredicate::gt(0, 60.0).can_skip_block(&stats));
486        // WHERE x > 40 → cannot skip (max=50 > 40).
487        assert!(!ScanPredicate::gt(0, 40.0).can_skip_block(&stats));
488        // WHERE x < 5 → can skip (min=10 ≥ 5).
489        assert!(ScanPredicate::lt(0, 5.0).can_skip_block(&stats));
490        // WHERE x = 100 → can skip (100 > max=50).
491        assert!(ScanPredicate::eq(0, 100.0).can_skip_block(&stats));
492        // WHERE x = 30 → cannot skip (10 ≤ 30 ≤ 50).
493        assert!(!ScanPredicate::eq(0, 30.0).can_skip_block(&stats));
494    }
495
496    #[cfg(test)]
497    mod golden {
498        use super::*;
499
500        /// Asserts magic bytes, version_major == 1, version_minor == 1, and
501        /// that the footer CRC round-trips cleanly.
502        #[test]
503        fn golden_columnar_segment_format() {
504            // Header golden: magic at [0..4], major at [4], minor at [5].
505            let header = SegmentHeader::current();
506            let bytes = header.to_bytes();
507            assert_eq!(&bytes[0..4], b"NDBS", "magic mismatch");
508            assert_eq!(bytes[4], VERSION_MAJOR, "major version mismatch");
509            assert_eq!(bytes[5], VERSION_MINOR, "minor version mismatch");
510            assert_eq!(bytes[4], 1u8, "expected VERSION_MAJOR == 1");
511            assert_eq!(bytes[5], 1u8, "expected VERSION_MINOR == 1");
512
513            // Footer golden: serialize, then re-parse, asserting CRC consistency.
514            let footer = SegmentFooter {
515                schema_hash: 0xAB_CD_EF_01,
516                column_count: 1,
517                row_count: 128,
518                profile_tag: 0,
519                columns: vec![ColumnMeta {
520                    name: "v".into(),
521                    offset: 0,
522                    length: 64,
523                    codec: nodedb_codec::ResolvedColumnCodec::Lz4,
524                    block_count: 1,
525                    block_stats: vec![BlockStats::non_numeric(0, 128)],
526                    dictionary: None,
527                }],
528            };
529            let footer_bytes = footer.to_bytes().expect("serialize");
530            // Layout: [msgpack_body][footer_len u32 LE][crc u32 LE]
531            let n = footer_bytes.len();
532            let stored_crc = u32::from_le_bytes([
533                footer_bytes[n - 4],
534                footer_bytes[n - 3],
535                footer_bytes[n - 2],
536                footer_bytes[n - 1],
537            ]);
538            let body_len = u32::from_le_bytes([
539                footer_bytes[n - 8],
540                footer_bytes[n - 7],
541                footer_bytes[n - 6],
542                footer_bytes[n - 5],
543            ]) as usize;
544            // CRC is computed over the msgpack body only (bytes 0..body_len).
545            let recomputed = crc32c::crc32c(&footer_bytes[..body_len]);
546            assert_eq!(stored_crc, recomputed, "footer CRC mismatch");
547
548            // Round-trip via from_segment_tail.
549            let mut segment = Vec::new();
550            segment.extend_from_slice(&bytes);
551            segment.extend_from_slice(&[0u8; 64]);
552            segment.extend_from_slice(&footer_bytes);
553            let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse");
554            assert_eq!(parsed.schema_hash, footer.schema_hash);
555            assert_eq!(parsed.row_count, 128);
556        }
557    }
558}