Skip to main content

nodedb_columnar/
format.rs

1//! Columnar segment binary format definitions.
2//!
3//! All multi-byte integers are little-endian. The segment is self-describing:
4//! the footer contains all metadata needed to read any column without
5//! scanning the entire file.
6
7use serde::{Deserialize, Serialize};
8
9/// Magic bytes identifying a NodeDB columnar segment.
10pub const MAGIC: [u8; 4] = *b"NDBS";
11
12/// Current format major version. Readers reject segments with higher major.
13pub const VERSION_MAJOR: u8 = 1;
14
15/// Current format minor version. Readers tolerate segments with same major
16/// but higher minor (unknown footer fields are ignored).
17pub const VERSION_MINOR: u8 = 0;
18
19/// Endianness marker: 0x01 = little-endian (always LE for NodeDB).
20pub const ENDIANNESS_LE: u8 = 0x01;
21
22/// Rows per block. Each block is independently compressed and decompressible.
23/// 1024 balances compression ratio (larger = better) vs random access
24/// granularity (smaller = less waste on filtered scans).
25pub const BLOCK_SIZE: usize = 1024;
26
27/// Size of the segment header in bytes: magic(4) + major(1) + minor(1) + endianness(1) = 7.
28pub const HEADER_SIZE: usize = 7;
29
30/// Segment header: identifies the file as a NodeDB columnar segment.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub struct SegmentHeader {
33    pub magic: [u8; 4],
34    pub version_major: u8,
35    pub version_minor: u8,
36    pub endianness: u8,
37}
38
39impl SegmentHeader {
40    /// Create a header with the current format version.
41    pub fn current() -> Self {
42        Self {
43            magic: MAGIC,
44            version_major: VERSION_MAJOR,
45            version_minor: VERSION_MINOR,
46            endianness: ENDIANNESS_LE,
47        }
48    }
49
50    /// Serialize the header to bytes.
51    pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
52        let mut buf = [0u8; HEADER_SIZE];
53        buf[0..4].copy_from_slice(&self.magic);
54        buf[4] = self.version_major;
55        buf[5] = self.version_minor;
56        buf[6] = self.endianness;
57        buf
58    }
59
60    /// Parse a header from bytes. Returns None if magic/version is invalid.
61    pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
62        if data.len() < HEADER_SIZE {
63            return Err(crate::error::ColumnarError::TruncatedSegment {
64                expected: HEADER_SIZE,
65                got: data.len(),
66            });
67        }
68
69        let mut magic = [0u8; 4];
70        magic.copy_from_slice(&data[0..4]);
71        if magic != MAGIC {
72            return Err(crate::error::ColumnarError::InvalidMagic(magic));
73        }
74
75        let version_major = data[4];
76        let version_minor = data[5];
77
78        // Reject incompatible major version.
79        if version_major > VERSION_MAJOR {
80            return Err(crate::error::ColumnarError::IncompatibleVersion {
81                reader_major: VERSION_MAJOR,
82                segment_major: version_major,
83                segment_minor: version_minor,
84            });
85        }
86
87        Ok(Self {
88            magic,
89            version_major,
90            version_minor,
91            endianness: data[6],
92        })
93    }
94}
95
96/// Per-block statistics for a single column. Enables predicate pushdown:
97/// skip blocks where `WHERE price > 100` and block's `max_price < 100`.
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct BlockStats {
100    /// Minimum value in this block (encoded as f64 for uniformity;
101    /// i64 values are cast losslessly, strings use 0.0).
102    pub min: f64,
103    /// Maximum value in this block.
104    pub max: f64,
105    /// Number of null values in this block.
106    pub null_count: u32,
107    /// Number of rows in this block (≤ BLOCK_SIZE, last block may be smaller).
108    pub row_count: u32,
109}
110
111impl BlockStats {
112    /// Create stats for a block with no meaningful min/max (e.g., string columns).
113    pub fn non_numeric(null_count: u32, row_count: u32) -> Self {
114        Self {
115            min: f64::NAN,
116            max: f64::NAN,
117            null_count,
118            row_count,
119        }
120    }
121}
122
123/// Metadata for a single column within the segment footer.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct ColumnMeta {
126    /// Column name (matches schema definition).
127    pub name: String,
128    /// Byte offset of this column's first block from the start of the segment
129    /// (after the header).
130    pub offset: u64,
131    /// Total byte length of all blocks for this column.
132    pub length: u64,
133    /// Codec used for this column's blocks.
134    pub codec: nodedb_codec::ColumnCodec,
135    /// Number of blocks for this column.
136    pub block_count: u32,
137    /// Per-block statistics (one entry per block).
138    pub block_stats: Vec<BlockStats>,
139}
140
141/// Segment footer: contains all metadata needed to read any column.
142///
143/// Serialized as MessagePack at the end of the segment, followed by
144/// a 4-byte footer length (u32 LE) and a 4-byte CRC32C of the
145/// serialized footer bytes.
146///
147/// ```text
148/// [...column blocks...][footer_msgpack][footer_len: u32 LE][footer_crc: u32 LE]
149/// ```
150///
151/// To read: seek to end - 8, read footer_len + footer_crc, seek to
152/// end - 8 - footer_len, read + verify CRC, deserialize footer.
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct SegmentFooter {
155    /// xxHash64 of the schema definition (for schema compatibility checks).
156    pub schema_hash: u64,
157    /// Number of columns in this segment.
158    pub column_count: u32,
159    /// Total number of rows across all blocks.
160    pub row_count: u64,
161    /// Profile that wrote this segment (0 = plain, 1 = timeseries, 2 = spatial).
162    pub profile_tag: u8,
163    /// Per-column metadata (offsets, codecs, block stats).
164    pub columns: Vec<ColumnMeta>,
165}
166
167impl SegmentFooter {
168    /// Serialize the footer to bytes with trailing length + CRC32C.
169    pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
170        let footer_msgpack = rmp_serde::to_vec_named(self)
171            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))?;
172
173        let footer_len = footer_msgpack.len() as u32;
174        let footer_crc = crc32c::crc32c(&footer_msgpack);
175
176        let mut buf = Vec::with_capacity(footer_msgpack.len() + 8);
177        buf.extend_from_slice(&footer_msgpack);
178        buf.extend_from_slice(&footer_len.to_le_bytes());
179        buf.extend_from_slice(&footer_crc.to_le_bytes());
180        Ok(buf)
181    }
182
183    /// Parse a footer from the tail of a segment byte slice.
184    ///
185    /// Reads footer_len and CRC from the last 8 bytes, then deserializes
186    /// the footer and validates the CRC.
187    pub fn from_segment_tail(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
188        if data.len() < 8 {
189            return Err(crate::error::ColumnarError::TruncatedSegment {
190                expected: 8,
191                got: data.len(),
192            });
193        }
194
195        let tail = &data[data.len() - 8..];
196        let footer_len =
197            u32::from_le_bytes(tail[0..4].try_into().expect("4 bytes from slice")) as usize;
198        let stored_crc = u32::from_le_bytes(tail[4..8].try_into().expect("4 bytes from slice"));
199
200        let footer_start = data.len().checked_sub(8 + footer_len).ok_or(
201            crate::error::ColumnarError::TruncatedSegment {
202                expected: 8 + footer_len,
203                got: data.len(),
204            },
205        )?;
206
207        let footer_bytes = &data[footer_start..footer_start + footer_len];
208        let computed_crc = crc32c::crc32c(footer_bytes);
209
210        if computed_crc != stored_crc {
211            return Err(crate::error::ColumnarError::FooterCrcMismatch {
212                stored: stored_crc,
213                computed: computed_crc,
214            });
215        }
216
217        rmp_serde::from_slice(footer_bytes)
218            .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn header_roundtrip() {
228        let header = SegmentHeader::current();
229        let bytes = header.to_bytes();
230        let parsed = SegmentHeader::from_bytes(&bytes).expect("valid header");
231        assert_eq!(parsed, header);
232    }
233
234    #[test]
235    fn header_invalid_magic() {
236        let mut bytes = SegmentHeader::current().to_bytes();
237        bytes[0] = b'X';
238        assert!(matches!(
239            SegmentHeader::from_bytes(&bytes),
240            Err(crate::error::ColumnarError::InvalidMagic(_))
241        ));
242    }
243
244    #[test]
245    fn header_incompatible_major() {
246        let mut bytes = SegmentHeader::current().to_bytes();
247        bytes[4] = VERSION_MAJOR + 1; // Future major version.
248        assert!(matches!(
249            SegmentHeader::from_bytes(&bytes),
250            Err(crate::error::ColumnarError::IncompatibleVersion { .. })
251        ));
252    }
253
254    #[test]
255    fn header_compatible_minor() {
256        let mut bytes = SegmentHeader::current().to_bytes();
257        bytes[5] = VERSION_MINOR + 5; // Future minor version, same major.
258        let parsed = SegmentHeader::from_bytes(&bytes).expect("compatible minor");
259        assert_eq!(parsed.version_major, VERSION_MAJOR);
260        assert_eq!(parsed.version_minor, VERSION_MINOR + 5);
261    }
262
263    #[test]
264    fn footer_roundtrip() {
265        let footer = SegmentFooter {
266            schema_hash: 0xDEAD_BEEF_CAFE_1234,
267            column_count: 3,
268            row_count: 2048,
269            profile_tag: 0,
270            columns: vec![
271                ColumnMeta {
272                    name: "id".into(),
273                    offset: 7,
274                    length: 512,
275                    codec: nodedb_codec::ColumnCodec::DeltaFastLanesLz4,
276                    block_count: 2,
277                    block_stats: vec![
278                        BlockStats {
279                            min: 1.0,
280                            max: 1024.0,
281                            null_count: 0,
282                            row_count: 1024,
283                        },
284                        BlockStats {
285                            min: 1025.0,
286                            max: 2048.0,
287                            null_count: 0,
288                            row_count: 1024,
289                        },
290                    ],
291                },
292                ColumnMeta {
293                    name: "name".into(),
294                    offset: 519,
295                    length: 256,
296                    codec: nodedb_codec::ColumnCodec::FsstLz4,
297                    block_count: 2,
298                    block_stats: vec![
299                        BlockStats::non_numeric(0, 1024),
300                        BlockStats::non_numeric(5, 1024),
301                    ],
302                },
303                ColumnMeta {
304                    name: "score".into(),
305                    offset: 775,
306                    length: 128,
307                    codec: nodedb_codec::ColumnCodec::AlpFastLanesLz4,
308                    block_count: 2,
309                    block_stats: vec![
310                        BlockStats {
311                            min: 0.0,
312                            max: 100.0,
313                            null_count: 10,
314                            row_count: 1024,
315                        },
316                        BlockStats {
317                            min: 0.5,
318                            max: 99.5,
319                            null_count: 3,
320                            row_count: 1024,
321                        },
322                    ],
323                },
324            ],
325        };
326
327        // Serialize footer to bytes (with length + CRC trailer).
328        let footer_bytes = footer.to_bytes().expect("serialize");
329
330        // Simulate a full segment: header + dummy data + footer.
331        let mut segment = Vec::new();
332        segment.extend_from_slice(&SegmentHeader::current().to_bytes());
333        segment.extend_from_slice(&vec![0u8; 896]); // Dummy column data.
334        segment.extend_from_slice(&footer_bytes);
335
336        let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse footer");
337        assert_eq!(parsed.schema_hash, footer.schema_hash);
338        assert_eq!(parsed.column_count, 3);
339        assert_eq!(parsed.row_count, 2048);
340        assert_eq!(parsed.columns.len(), 3);
341        assert_eq!(parsed.columns[0].name, "id");
342        assert_eq!(parsed.columns[1].name, "name");
343        assert_eq!(parsed.columns[2].name, "score");
344    }
345
346    #[test]
347    fn footer_crc_mismatch() {
348        let footer = SegmentFooter {
349            schema_hash: 0,
350            column_count: 0,
351            row_count: 0,
352            profile_tag: 0,
353            columns: vec![],
354        };
355        let mut bytes = footer.to_bytes().expect("serialize");
356        // Corrupt the CRC.
357        let len = bytes.len();
358        bytes[len - 1] ^= 0xFF;
359
360        assert!(matches!(
361            SegmentFooter::from_segment_tail(&bytes),
362            Err(crate::error::ColumnarError::FooterCrcMismatch { .. })
363        ));
364    }
365
366    #[test]
367    fn block_stats_predicate_skip() {
368        let stats = BlockStats {
369            min: 10.0,
370            max: 50.0,
371            null_count: 0,
372            row_count: 1024,
373        };
374
375        use crate::predicate::ScanPredicate;
376
377        // WHERE x > 60 → can skip (max=50 ≤ 60).
378        assert!(ScanPredicate::gt(0, 60.0).can_skip_block(&stats));
379        // WHERE x > 40 → cannot skip (max=50 > 40).
380        assert!(!ScanPredicate::gt(0, 40.0).can_skip_block(&stats));
381        // WHERE x < 5 → can skip (min=10 ≥ 5).
382        assert!(ScanPredicate::lt(0, 5.0).can_skip_block(&stats));
383        // WHERE x = 100 → can skip (100 > max=50).
384        assert!(ScanPredicate::eq(0, 100.0).can_skip_block(&stats));
385        // WHERE x = 30 → cannot skip (10 ≤ 30 ≤ 50).
386        assert!(!ScanPredicate::eq(0, 30.0).can_skip_block(&stats));
387    }
388}