Skip to main content

nodedb_columnar/
format.rs

1//! Columnar segment binary format definitions.
2//!
3//! All multi-byte integers are little-endian. The segment is self-describing:
4//! the footer contains all metadata needed to read any column without
5//! scanning the entire file.
6
7use serde::{Deserialize, Serialize};
8use zerompk::{FromMessagePack, ToMessagePack};
9
10/// Magic bytes identifying a NodeDB columnar segment.
11pub const MAGIC: [u8; 4] = *b"NDBS";
12
13/// Current format major version. Readers reject segments with higher major.
14pub const VERSION_MAJOR: u8 = 1;
15
16/// Current format minor version. Readers tolerate segments with same major
17/// but higher minor (unknown footer fields are ignored).
18pub const VERSION_MINOR: u8 = 0;
19
20/// Endianness marker: 0x01 = little-endian (always LE for NodeDB).
21pub const ENDIANNESS_LE: u8 = 0x01;
22
23/// Rows per block. Each block is independently compressed and decompressible.
24/// 1024 balances compression ratio (larger = better) vs random access
25/// granularity (smaller = less waste on filtered scans).
26pub const BLOCK_SIZE: usize = 1024;
27
28/// Size of the segment header in bytes: magic(4) + major(1) + minor(1) + endianness(1) = 7.
29pub const HEADER_SIZE: usize = 7;
30
31/// Segment header: identifies the file as a NodeDB columnar segment.
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub struct SegmentHeader {
34    pub magic: [u8; 4],
35    pub version_major: u8,
36    pub version_minor: u8,
37    pub endianness: u8,
38}
39
40impl SegmentHeader {
41    /// Create a header with the current format version.
42    pub fn current() -> Self {
43        Self {
44            magic: MAGIC,
45            version_major: VERSION_MAJOR,
46            version_minor: VERSION_MINOR,
47            endianness: ENDIANNESS_LE,
48        }
49    }
50
51    /// Serialize the header to bytes.
52    pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
53        let mut buf = [0u8; HEADER_SIZE];
54        buf[0..4].copy_from_slice(&self.magic);
55        buf[4] = self.version_major;
56        buf[5] = self.version_minor;
57        buf[6] = self.endianness;
58        buf
59    }
60
61    /// Parse a header from bytes. Returns None if magic/version is invalid.
62    pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
63        if data.len() < HEADER_SIZE {
64            return Err(crate::error::ColumnarError::TruncatedSegment {
65                expected: HEADER_SIZE,
66                got: data.len(),
67            });
68        }
69
70        let mut magic = [0u8; 4];
71        magic.copy_from_slice(&data[0..4]);
72        if magic != MAGIC {
73            return Err(crate::error::ColumnarError::InvalidMagic(magic));
74        }
75
76        let version_major = data[4];
77        let version_minor = data[5];
78
79        // Reject incompatible major version.
80        if version_major > VERSION_MAJOR {
81            return Err(crate::error::ColumnarError::IncompatibleVersion {
82                reader_major: VERSION_MAJOR,
83                segment_major: version_major,
84                segment_minor: version_minor,
85            });
86        }
87
88        Ok(Self {
89            magic,
90            version_major,
91            version_minor,
92            endianness: data[6],
93        })
94    }
95}
96
97/// Per-block statistics for a single column. Enables predicate pushdown:
98/// skip blocks where `WHERE price > 100` and block's `max_price < 100`.
99#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
100pub struct BlockStats {
101    /// Minimum value in this block (encoded as f64 for uniformity;
102    /// i64 values are cast losslessly, strings use NaN).
103    pub min: f64,
104    /// Maximum value in this block.
105    pub max: f64,
106    /// Number of null values in this block.
107    pub null_count: u32,
108    /// Number of rows in this block (≤ BLOCK_SIZE, last block may be smaller).
109    pub row_count: u32,
110    /// Lexicographic minimum for string columns (truncated to 32 bytes).
111    /// `None` for numeric, bool, binary, vector, and other non-string columns.
112    #[serde(default, skip_serializing_if = "Option::is_none")]
113    pub str_min: Option<String>,
114    /// Lexicographic maximum for string columns (truncated to 32 bytes).
115    /// `None` for non-string columns.
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    pub str_max: Option<String>,
118    /// Bloom filter bytes for equality-predicate skipping on string columns
119    /// (256 bytes = 2048 bits, 3 FNV-variant hash functions).
120    /// `None` when there are no non-null string values in the block.
121    #[serde(default, skip_serializing_if = "Option::is_none")]
122    pub bloom: Option<Vec<u8>>,
123}
124
125impl BlockStats {
126    /// Create stats for a numeric block with known min/max.
127    pub fn numeric(min: f64, max: f64, null_count: u32, row_count: u32) -> Self {
128        Self {
129            min,
130            max,
131            null_count,
132            row_count,
133            str_min: None,
134            str_max: None,
135            bloom: None,
136        }
137    }
138
139    /// Create stats for a block with no meaningful numeric min/max (non-numeric columns).
140    pub fn non_numeric(null_count: u32, row_count: u32) -> Self {
141        Self {
142            min: f64::NAN,
143            max: f64::NAN,
144            null_count,
145            row_count,
146            str_min: None,
147            str_max: None,
148            bloom: None,
149        }
150    }
151
152    /// Create stats for a string block with lexicographic bounds and bloom filter.
153    pub fn string_block(
154        null_count: u32,
155        row_count: u32,
156        str_min: Option<String>,
157        str_max: Option<String>,
158        bloom: Option<Vec<u8>>,
159    ) -> Self {
160        Self {
161            min: f64::NAN,
162            max: f64::NAN,
163            null_count,
164            row_count,
165            str_min,
166            str_max,
167            bloom,
168        }
169    }
170}
171
172/// Metadata for a single column within the segment footer.
173#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
174pub struct ColumnMeta {
175    /// Column name (matches schema definition).
176    pub name: String,
177    /// Byte offset of this column's first block from the start of the segment
178    /// (after the header).
179    pub offset: u64,
180    /// Total byte length of all blocks for this column.
181    pub length: u64,
182    /// Codec used for this column's blocks.
183    pub codec: nodedb_codec::ColumnCodec,
184    /// Number of blocks for this column.
185    pub block_count: u32,
186    /// Per-block statistics (one entry per block).
187    pub block_stats: Vec<BlockStats>,
188    /// Dictionary for dict-encoded columns (`None` for non-dict columns).
189    ///
190    /// Strings are stored in ID order: `dictionary[0]` is ID 0, etc.
191    /// The actual column data is stored as i64 IDs via `DeltaFastLanesLz4`.
192    #[serde(default, skip_serializing_if = "Option::is_none")]
193    pub dictionary: Option<Vec<String>>,
194}
195
196/// Segment footer: contains all metadata needed to read any column.
197///
198/// Serialized as MessagePack at the end of the segment, followed by
199/// a 4-byte footer length (u32 LE) and a 4-byte CRC32C of the
200/// serialized footer bytes.
201///
202/// ```text
203/// [...column blocks...][footer_msgpack][footer_len: u32 LE][footer_crc: u32 LE]
204/// ```
205///
206/// To read: seek to end - 8, read footer_len + footer_crc, seek to
207/// end - 8 - footer_len, read + verify CRC, deserialize footer.
208#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
209pub struct SegmentFooter {
210    /// xxHash64 of the schema definition (for schema compatibility checks).
211    pub schema_hash: u64,
212    /// Number of columns in this segment.
213    pub column_count: u32,
214    /// Total number of rows across all blocks.
215    pub row_count: u64,
216    /// Profile that wrote this segment (0 = plain, 1 = timeseries, 2 = spatial).
217    pub profile_tag: u8,
218    /// Per-column metadata (offsets, codecs, block stats).
219    pub columns: Vec<ColumnMeta>,
220}
221
222impl SegmentFooter {
223    /// Serialize the footer to bytes with trailing length + CRC32C.
224    pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
225        let footer_msgpack = zerompk::to_msgpack_vec(self)
226            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))?;
227
228        let footer_len = footer_msgpack.len() as u32;
229        let footer_crc = crc32c::crc32c(&footer_msgpack);
230
231        let mut buf = Vec::with_capacity(footer_msgpack.len() + 8);
232        buf.extend_from_slice(&footer_msgpack);
233        buf.extend_from_slice(&footer_len.to_le_bytes());
234        buf.extend_from_slice(&footer_crc.to_le_bytes());
235        Ok(buf)
236    }
237
238    /// Parse a footer from the tail of a segment byte slice.
239    ///
240    /// Reads footer_len and CRC from the last 8 bytes, then deserializes
241    /// the footer and validates the CRC.
242    pub fn from_segment_tail(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
243        if data.len() < 8 {
244            return Err(crate::error::ColumnarError::TruncatedSegment {
245                expected: 8,
246                got: data.len(),
247            });
248        }
249
250        let tail = &data[data.len() - 8..];
251        let footer_len =
252            u32::from_le_bytes(tail[0..4].try_into().expect("4 bytes from slice")) as usize;
253        let stored_crc = u32::from_le_bytes(tail[4..8].try_into().expect("4 bytes from slice"));
254
255        let footer_start = data.len().checked_sub(8 + footer_len).ok_or(
256            crate::error::ColumnarError::TruncatedSegment {
257                expected: 8 + footer_len,
258                got: data.len(),
259            },
260        )?;
261
262        let footer_bytes = &data[footer_start..footer_start + footer_len];
263        let computed_crc = crc32c::crc32c(footer_bytes);
264
265        if computed_crc != stored_crc {
266            return Err(crate::error::ColumnarError::FooterCrcMismatch {
267                stored: stored_crc,
268                computed: computed_crc,
269            });
270        }
271
272        zerompk::from_msgpack(footer_bytes)
273            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    #[test]
282    fn header_roundtrip() {
283        let header = SegmentHeader::current();
284        let bytes = header.to_bytes();
285        let parsed = SegmentHeader::from_bytes(&bytes).expect("valid header");
286        assert_eq!(parsed, header);
287    }
288
289    #[test]
290    fn header_invalid_magic() {
291        let mut bytes = SegmentHeader::current().to_bytes();
292        bytes[0] = b'X';
293        assert!(matches!(
294            SegmentHeader::from_bytes(&bytes),
295            Err(crate::error::ColumnarError::InvalidMagic(_))
296        ));
297    }
298
299    #[test]
300    fn header_incompatible_major() {
301        let mut bytes = SegmentHeader::current().to_bytes();
302        bytes[4] = VERSION_MAJOR + 1; // Future major version.
303        assert!(matches!(
304            SegmentHeader::from_bytes(&bytes),
305            Err(crate::error::ColumnarError::IncompatibleVersion { .. })
306        ));
307    }
308
309    #[test]
310    fn header_compatible_minor() {
311        let mut bytes = SegmentHeader::current().to_bytes();
312        bytes[5] = VERSION_MINOR + 5; // Future minor version, same major.
313        let parsed = SegmentHeader::from_bytes(&bytes).expect("compatible minor");
314        assert_eq!(parsed.version_major, VERSION_MAJOR);
315        assert_eq!(parsed.version_minor, VERSION_MINOR + 5);
316    }
317
318    #[test]
319    fn footer_roundtrip() {
320        let footer = SegmentFooter {
321            schema_hash: 0xDEAD_BEEF_CAFE_1234,
322            column_count: 3,
323            row_count: 2048,
324            profile_tag: 0,
325            columns: vec![
326                ColumnMeta {
327                    name: "id".into(),
328                    offset: 7,
329                    length: 512,
330                    codec: nodedb_codec::ColumnCodec::DeltaFastLanesLz4,
331                    block_count: 2,
332                    block_stats: vec![
333                        BlockStats::numeric(1.0, 1024.0, 0, 1024),
334                        BlockStats::numeric(1025.0, 2048.0, 0, 1024),
335                    ],
336                    dictionary: None,
337                },
338                ColumnMeta {
339                    name: "name".into(),
340                    offset: 519,
341                    length: 256,
342                    codec: nodedb_codec::ColumnCodec::FsstLz4,
343                    block_count: 2,
344                    block_stats: vec![
345                        BlockStats::non_numeric(0, 1024),
346                        BlockStats::non_numeric(5, 1024),
347                    ],
348                    dictionary: None,
349                },
350                ColumnMeta {
351                    name: "score".into(),
352                    offset: 775,
353                    length: 128,
354                    codec: nodedb_codec::ColumnCodec::AlpFastLanesLz4,
355                    block_count: 2,
356                    block_stats: vec![
357                        BlockStats::numeric(0.0, 100.0, 10, 1024),
358                        BlockStats::numeric(0.5, 99.5, 3, 1024),
359                    ],
360                    dictionary: None,
361                },
362            ],
363        };
364
365        // Serialize footer to bytes (with length + CRC trailer).
366        let footer_bytes = footer.to_bytes().expect("serialize");
367
368        // Simulate a full segment: header + dummy data + footer.
369        let mut segment = Vec::new();
370        segment.extend_from_slice(&SegmentHeader::current().to_bytes());
371        segment.extend_from_slice(&vec![0u8; 896]); // Dummy column data.
372        segment.extend_from_slice(&footer_bytes);
373
374        let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse footer");
375        assert_eq!(parsed.schema_hash, footer.schema_hash);
376        assert_eq!(parsed.column_count, 3);
377        assert_eq!(parsed.row_count, 2048);
378        assert_eq!(parsed.columns.len(), 3);
379        assert_eq!(parsed.columns[0].name, "id");
380        assert_eq!(parsed.columns[1].name, "name");
381        assert_eq!(parsed.columns[2].name, "score");
382    }
383
384    #[test]
385    fn footer_crc_mismatch() {
386        let footer = SegmentFooter {
387            schema_hash: 0,
388            column_count: 0,
389            row_count: 0,
390            profile_tag: 0,
391            columns: vec![],
392        };
393        let mut bytes = footer.to_bytes().expect("serialize");
394        // Corrupt the CRC.
395        let len = bytes.len();
396        bytes[len - 1] ^= 0xFF;
397
398        assert!(matches!(
399            SegmentFooter::from_segment_tail(&bytes),
400            Err(crate::error::ColumnarError::FooterCrcMismatch { .. })
401        ));
402    }
403
404    #[test]
405    fn block_stats_predicate_skip() {
406        let stats = BlockStats::numeric(10.0, 50.0, 0, 1024);
407
408        use crate::predicate::ScanPredicate;
409
410        // WHERE x > 60 → can skip (max=50 ≤ 60).
411        assert!(ScanPredicate::gt(0, 60.0).can_skip_block(&stats));
412        // WHERE x > 40 → cannot skip (max=50 > 40).
413        assert!(!ScanPredicate::gt(0, 40.0).can_skip_block(&stats));
414        // WHERE x < 5 → can skip (min=10 ≥ 5).
415        assert!(ScanPredicate::lt(0, 5.0).can_skip_block(&stats));
416        // WHERE x = 100 → can skip (100 > max=50).
417        assert!(ScanPredicate::eq(0, 100.0).can_skip_block(&stats));
418        // WHERE x = 30 → cannot skip (10 ≤ 30 ≤ 50).
419        assert!(!ScanPredicate::eq(0, 30.0).can_skip_block(&stats));
420    }
421}