1use serde::{Deserialize, Serialize};
10use zerompk::{FromMessagePack, ToMessagePack};
11
12pub const MAGIC: [u8; 4] = *b"NDBS";
14
15pub const VERSION_MAJOR: u8 = 1;
17
18pub const VERSION_MINOR: u8 = 1;
21
22pub const ENDIANNESS_LE: u8 = 0x01;
24
25pub const BLOCK_SIZE: usize = 1024;
29
30pub const HEADER_SIZE: usize = 7;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub struct SegmentHeader {
36 pub magic: [u8; 4],
37 pub version_major: u8,
38 pub version_minor: u8,
39 pub endianness: u8,
40}
41
42impl SegmentHeader {
43 pub fn current() -> Self {
45 Self {
46 magic: MAGIC,
47 version_major: VERSION_MAJOR,
48 version_minor: VERSION_MINOR,
49 endianness: ENDIANNESS_LE,
50 }
51 }
52
53 pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
55 let mut buf = [0u8; HEADER_SIZE];
56 buf[0..4].copy_from_slice(&self.magic);
57 buf[4] = self.version_major;
58 buf[5] = self.version_minor;
59 buf[6] = self.endianness;
60 buf
61 }
62
63 pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
65 if data.len() < HEADER_SIZE {
66 return Err(crate::error::ColumnarError::TruncatedSegment {
67 expected: HEADER_SIZE,
68 got: data.len(),
69 });
70 }
71
72 let mut magic = [0u8; 4];
73 magic.copy_from_slice(&data[0..4]);
74 if magic != MAGIC {
75 return Err(crate::error::ColumnarError::InvalidMagic(magic));
76 }
77
78 let version_major = data[4];
79 let version_minor = data[5];
80
81 if version_major > VERSION_MAJOR {
83 return Err(crate::error::ColumnarError::IncompatibleVersion {
84 reader_major: VERSION_MAJOR,
85 segment_major: version_major,
86 segment_minor: version_minor,
87 });
88 }
89
90 Ok(Self {
91 magic,
92 version_major,
93 version_minor,
94 endianness: data[6],
95 })
96 }
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
105pub struct BloomFilter {
106 pub k: u8,
109 pub m: u32,
112 pub bytes: Vec<u8>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
119pub struct BlockStats {
120 pub min: f64,
126 pub max: f64,
128 pub null_count: u32,
130 pub row_count: u32,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
135 pub str_min: Option<String>,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
139 pub str_max: Option<String>,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
147 pub bloom: Option<BloomFilter>,
148 #[serde(default, skip_serializing_if = "Option::is_none")]
154 pub min_i64: Option<i64>,
155 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub max_i64: Option<i64>,
158}
159
160impl BlockStats {
161 pub fn numeric(min: f64, max: f64, null_count: u32, row_count: u32) -> Self {
163 Self {
164 min,
165 max,
166 null_count,
167 row_count,
168 str_min: None,
169 str_max: None,
170 bloom: None,
171 min_i64: None,
172 max_i64: None,
173 }
174 }
175
176 pub fn integer(min: i64, max: i64, null_count: u32, row_count: u32) -> Self {
182 Self {
183 min: min as f64,
184 max: max as f64,
185 null_count,
186 row_count,
187 str_min: None,
188 str_max: None,
189 bloom: None,
190 min_i64: Some(min),
191 max_i64: Some(max),
192 }
193 }
194
195 pub fn non_numeric(null_count: u32, row_count: u32) -> Self {
197 Self {
198 min: f64::NAN,
199 max: f64::NAN,
200 null_count,
201 row_count,
202 str_min: None,
203 str_max: None,
204 bloom: None,
205 min_i64: None,
206 max_i64: None,
207 }
208 }
209
210 pub fn string_block(
212 null_count: u32,
213 row_count: u32,
214 str_min: Option<String>,
215 str_max: Option<String>,
216 bloom: Option<BloomFilter>,
217 ) -> Self {
218 Self {
219 min: f64::NAN,
220 max: f64::NAN,
221 null_count,
222 row_count,
223 str_min,
224 str_max,
225 bloom,
226 min_i64: None,
227 max_i64: None,
228 }
229 }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
234pub struct ColumnMeta {
235 pub name: String,
237 pub offset: u64,
240 pub length: u64,
242 pub codec: nodedb_codec::ResolvedColumnCodec,
246 pub block_count: u32,
248 pub block_stats: Vec<BlockStats>,
250 #[serde(default, skip_serializing_if = "Option::is_none")]
255 pub dictionary: Option<Vec<String>>,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
271pub struct SegmentFooter {
272 pub schema_hash: u64,
274 pub column_count: u32,
276 pub row_count: u64,
278 pub profile_tag: u8,
280 pub columns: Vec<ColumnMeta>,
282}
283
284impl SegmentFooter {
285 pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
287 let footer_msgpack = zerompk::to_msgpack_vec(self)
288 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))?;
289
290 let footer_len = footer_msgpack.len() as u32;
291 let footer_crc = crc32c::crc32c(&footer_msgpack);
292
293 let mut buf = Vec::with_capacity(footer_msgpack.len() + 8);
295 buf.extend_from_slice(&footer_msgpack);
296 buf.extend_from_slice(&footer_len.to_le_bytes());
297 buf.extend_from_slice(&footer_crc.to_le_bytes());
298 Ok(buf)
299 }
300
301 pub fn from_segment_tail(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
306 if data.len() < 8 {
307 return Err(crate::error::ColumnarError::TruncatedSegment {
308 expected: 8,
309 got: data.len(),
310 });
311 }
312
313 let tail = &data[data.len() - 8..];
314 let footer_len = u32::from_le_bytes(tail[0..4].try_into().map_err(|_| {
315 crate::error::ColumnarError::Corruption {
316 segment_id: None,
317 reason: "footer length field: expected 4 bytes at segment tail - 8".into(),
318 offset: Some((data.len() - 8) as u64),
319 }
320 })?) as usize;
321 let stored_crc = u32::from_le_bytes(tail[4..8].try_into().map_err(|_| {
322 crate::error::ColumnarError::Corruption {
323 segment_id: None,
324 reason: "footer CRC field: expected 4 bytes at segment tail - 4".into(),
325 offset: Some((data.len() - 4) as u64),
326 }
327 })?);
328
329 let footer_start = data.len().checked_sub(8 + footer_len).ok_or(
330 crate::error::ColumnarError::TruncatedSegment {
331 expected: 8 + footer_len,
332 got: data.len(),
333 },
334 )?;
335
336 let footer_bytes = &data[footer_start..footer_start + footer_len];
337 let computed_crc = crc32c::crc32c(footer_bytes);
338
339 if computed_crc != stored_crc {
340 return Err(crate::error::ColumnarError::FooterCrcMismatch {
341 stored: stored_crc,
342 computed: computed_crc,
343 });
344 }
345
346 zerompk::from_msgpack(footer_bytes)
347 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn header_roundtrip() {
357 let header = SegmentHeader::current();
358 let bytes = header.to_bytes();
359 let parsed = SegmentHeader::from_bytes(&bytes).expect("valid header");
360 assert_eq!(parsed, header);
361 }
362
363 #[test]
364 fn header_invalid_magic() {
365 let mut bytes = SegmentHeader::current().to_bytes();
366 bytes[0] = b'X';
367 assert!(matches!(
368 SegmentHeader::from_bytes(&bytes),
369 Err(crate::error::ColumnarError::InvalidMagic(_))
370 ));
371 }
372
373 #[test]
374 fn header_incompatible_major() {
375 let mut bytes = SegmentHeader::current().to_bytes();
376 bytes[4] = VERSION_MAJOR + 1; assert!(matches!(
378 SegmentHeader::from_bytes(&bytes),
379 Err(crate::error::ColumnarError::IncompatibleVersion { .. })
380 ));
381 }
382
383 #[test]
384 fn header_compatible_minor() {
385 let mut bytes = SegmentHeader::current().to_bytes();
386 bytes[5] = VERSION_MINOR + 5; let parsed = SegmentHeader::from_bytes(&bytes).expect("compatible minor");
388 assert_eq!(parsed.version_major, VERSION_MAJOR);
389 assert_eq!(parsed.version_minor, VERSION_MINOR + 5);
390 }
391
392 #[test]
393 fn footer_roundtrip() {
394 let footer = SegmentFooter {
395 schema_hash: 0xDEAD_BEEF_CAFE_1234,
396 column_count: 3,
397 row_count: 2048,
398 profile_tag: 0,
399 columns: vec![
400 ColumnMeta {
401 name: "id".into(),
402 offset: 7,
403 length: 512,
404 codec: nodedb_codec::ResolvedColumnCodec::DeltaFastLanesLz4,
405 block_count: 2,
406 block_stats: vec![
407 BlockStats::numeric(1.0, 1024.0, 0, 1024),
408 BlockStats::numeric(1025.0, 2048.0, 0, 1024),
409 ],
410 dictionary: None,
411 },
412 ColumnMeta {
413 name: "name".into(),
414 offset: 519,
415 length: 256,
416 codec: nodedb_codec::ResolvedColumnCodec::FsstLz4,
417 block_count: 2,
418 block_stats: vec![
419 BlockStats::non_numeric(0, 1024),
420 BlockStats::non_numeric(5, 1024),
421 ],
422 dictionary: None,
423 },
424 ColumnMeta {
425 name: "score".into(),
426 offset: 775,
427 length: 128,
428 codec: nodedb_codec::ResolvedColumnCodec::AlpFastLanesLz4,
429 block_count: 2,
430 block_stats: vec![
431 BlockStats::numeric(0.0, 100.0, 10, 1024),
432 BlockStats::numeric(0.5, 99.5, 3, 1024),
433 ],
434 dictionary: None,
435 },
436 ],
437 };
438
439 let footer_bytes = footer.to_bytes().expect("serialize");
441
442 let mut segment = Vec::new();
444 segment.extend_from_slice(&SegmentHeader::current().to_bytes());
445 segment.extend_from_slice(&vec![0u8; 896]); segment.extend_from_slice(&footer_bytes);
447
448 let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse footer");
449 assert_eq!(parsed.schema_hash, footer.schema_hash);
450 assert_eq!(parsed.column_count, 3);
451 assert_eq!(parsed.row_count, 2048);
452 assert_eq!(parsed.columns.len(), 3);
453 assert_eq!(parsed.columns[0].name, "id");
454 assert_eq!(parsed.columns[1].name, "name");
455 assert_eq!(parsed.columns[2].name, "score");
456 }
457
458 #[test]
459 fn footer_crc_mismatch() {
460 let footer = SegmentFooter {
461 schema_hash: 0,
462 column_count: 0,
463 row_count: 0,
464 profile_tag: 0,
465 columns: vec![],
466 };
467 let mut bytes = footer.to_bytes().expect("serialize");
468 let len = bytes.len();
470 bytes[len - 1] ^= 0xFF;
471
472 assert!(matches!(
473 SegmentFooter::from_segment_tail(&bytes),
474 Err(crate::error::ColumnarError::FooterCrcMismatch { .. })
475 ));
476 }
477
478 #[test]
479 fn block_stats_predicate_skip() {
480 let stats = BlockStats::numeric(10.0, 50.0, 0, 1024);
481
482 use crate::predicate::ScanPredicate;
483
484 assert!(ScanPredicate::gt(0, 60.0).can_skip_block(&stats));
486 assert!(!ScanPredicate::gt(0, 40.0).can_skip_block(&stats));
488 assert!(ScanPredicate::lt(0, 5.0).can_skip_block(&stats));
490 assert!(ScanPredicate::eq(0, 100.0).can_skip_block(&stats));
492 assert!(!ScanPredicate::eq(0, 30.0).can_skip_block(&stats));
494 }
495
496 #[cfg(test)]
497 mod golden {
498 use super::*;
499
500 #[test]
503 fn golden_columnar_segment_format() {
504 let header = SegmentHeader::current();
506 let bytes = header.to_bytes();
507 assert_eq!(&bytes[0..4], b"NDBS", "magic mismatch");
508 assert_eq!(bytes[4], VERSION_MAJOR, "major version mismatch");
509 assert_eq!(bytes[5], VERSION_MINOR, "minor version mismatch");
510 assert_eq!(bytes[4], 1u8, "expected VERSION_MAJOR == 1");
511 assert_eq!(bytes[5], 1u8, "expected VERSION_MINOR == 1");
512
513 let footer = SegmentFooter {
515 schema_hash: 0xAB_CD_EF_01,
516 column_count: 1,
517 row_count: 128,
518 profile_tag: 0,
519 columns: vec![ColumnMeta {
520 name: "v".into(),
521 offset: 0,
522 length: 64,
523 codec: nodedb_codec::ResolvedColumnCodec::Lz4,
524 block_count: 1,
525 block_stats: vec![BlockStats::non_numeric(0, 128)],
526 dictionary: None,
527 }],
528 };
529 let footer_bytes = footer.to_bytes().expect("serialize");
530 let n = footer_bytes.len();
532 let stored_crc = u32::from_le_bytes([
533 footer_bytes[n - 4],
534 footer_bytes[n - 3],
535 footer_bytes[n - 2],
536 footer_bytes[n - 1],
537 ]);
538 let body_len = u32::from_le_bytes([
539 footer_bytes[n - 8],
540 footer_bytes[n - 7],
541 footer_bytes[n - 6],
542 footer_bytes[n - 5],
543 ]) as usize;
544 let recomputed = crc32c::crc32c(&footer_bytes[..body_len]);
546 assert_eq!(stored_crc, recomputed, "footer CRC mismatch");
547
548 let mut segment = Vec::new();
550 segment.extend_from_slice(&bytes);
551 segment.extend_from_slice(&[0u8; 64]);
552 segment.extend_from_slice(&footer_bytes);
553 let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse");
554 assert_eq!(parsed.schema_hash, footer.schema_hash);
555 assert_eq!(parsed.row_count, 128);
556 }
557 }
558}