1use serde::{Deserialize, Serialize};
8use zerompk::{FromMessagePack, ToMessagePack};
9
10pub const MAGIC: [u8; 4] = *b"NDBS";
12
13pub const VERSION_MAJOR: u8 = 1;
15
16pub const VERSION_MINOR: u8 = 0;
19
20pub const ENDIANNESS_LE: u8 = 0x01;
22
23pub const BLOCK_SIZE: usize = 1024;
27
28pub const HEADER_SIZE: usize = 7;
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub struct SegmentHeader {
34 pub magic: [u8; 4],
35 pub version_major: u8,
36 pub version_minor: u8,
37 pub endianness: u8,
38}
39
40impl SegmentHeader {
41 pub fn current() -> Self {
43 Self {
44 magic: MAGIC,
45 version_major: VERSION_MAJOR,
46 version_minor: VERSION_MINOR,
47 endianness: ENDIANNESS_LE,
48 }
49 }
50
51 pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
53 let mut buf = [0u8; HEADER_SIZE];
54 buf[0..4].copy_from_slice(&self.magic);
55 buf[4] = self.version_major;
56 buf[5] = self.version_minor;
57 buf[6] = self.endianness;
58 buf
59 }
60
61 pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
63 if data.len() < HEADER_SIZE {
64 return Err(crate::error::ColumnarError::TruncatedSegment {
65 expected: HEADER_SIZE,
66 got: data.len(),
67 });
68 }
69
70 let mut magic = [0u8; 4];
71 magic.copy_from_slice(&data[0..4]);
72 if magic != MAGIC {
73 return Err(crate::error::ColumnarError::InvalidMagic(magic));
74 }
75
76 let version_major = data[4];
77 let version_minor = data[5];
78
79 if version_major > VERSION_MAJOR {
81 return Err(crate::error::ColumnarError::IncompatibleVersion {
82 reader_major: VERSION_MAJOR,
83 segment_major: version_major,
84 segment_minor: version_minor,
85 });
86 }
87
88 Ok(Self {
89 magic,
90 version_major,
91 version_minor,
92 endianness: data[6],
93 })
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
100pub struct BlockStats {
101 pub min: f64,
104 pub max: f64,
106 pub null_count: u32,
108 pub row_count: u32,
110 #[serde(default, skip_serializing_if = "Option::is_none")]
113 pub str_min: Option<String>,
114 #[serde(default, skip_serializing_if = "Option::is_none")]
117 pub str_max: Option<String>,
118 #[serde(default, skip_serializing_if = "Option::is_none")]
122 pub bloom: Option<Vec<u8>>,
123}
124
125impl BlockStats {
126 pub fn numeric(min: f64, max: f64, null_count: u32, row_count: u32) -> Self {
128 Self {
129 min,
130 max,
131 null_count,
132 row_count,
133 str_min: None,
134 str_max: None,
135 bloom: None,
136 }
137 }
138
139 pub fn non_numeric(null_count: u32, row_count: u32) -> Self {
141 Self {
142 min: f64::NAN,
143 max: f64::NAN,
144 null_count,
145 row_count,
146 str_min: None,
147 str_max: None,
148 bloom: None,
149 }
150 }
151
152 pub fn string_block(
154 null_count: u32,
155 row_count: u32,
156 str_min: Option<String>,
157 str_max: Option<String>,
158 bloom: Option<Vec<u8>>,
159 ) -> Self {
160 Self {
161 min: f64::NAN,
162 max: f64::NAN,
163 null_count,
164 row_count,
165 str_min,
166 str_max,
167 bloom,
168 }
169 }
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
174pub struct ColumnMeta {
175 pub name: String,
177 pub offset: u64,
180 pub length: u64,
182 pub codec: nodedb_codec::ColumnCodec,
184 pub block_count: u32,
186 pub block_stats: Vec<BlockStats>,
188 #[serde(default, skip_serializing_if = "Option::is_none")]
193 pub dictionary: Option<Vec<String>>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
209pub struct SegmentFooter {
210 pub schema_hash: u64,
212 pub column_count: u32,
214 pub row_count: u64,
216 pub profile_tag: u8,
218 pub columns: Vec<ColumnMeta>,
220}
221
222impl SegmentFooter {
223 pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
225 let footer_msgpack = zerompk::to_msgpack_vec(self)
226 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))?;
227
228 let footer_len = footer_msgpack.len() as u32;
229 let footer_crc = crc32c::crc32c(&footer_msgpack);
230
231 let mut buf = Vec::with_capacity(footer_msgpack.len() + 8);
232 buf.extend_from_slice(&footer_msgpack);
233 buf.extend_from_slice(&footer_len.to_le_bytes());
234 buf.extend_from_slice(&footer_crc.to_le_bytes());
235 Ok(buf)
236 }
237
238 pub fn from_segment_tail(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
243 if data.len() < 8 {
244 return Err(crate::error::ColumnarError::TruncatedSegment {
245 expected: 8,
246 got: data.len(),
247 });
248 }
249
250 let tail = &data[data.len() - 8..];
251 let footer_len =
252 u32::from_le_bytes(tail[0..4].try_into().expect("4 bytes from slice")) as usize;
253 let stored_crc = u32::from_le_bytes(tail[4..8].try_into().expect("4 bytes from slice"));
254
255 let footer_start = data.len().checked_sub(8 + footer_len).ok_or(
256 crate::error::ColumnarError::TruncatedSegment {
257 expected: 8 + footer_len,
258 got: data.len(),
259 },
260 )?;
261
262 let footer_bytes = &data[footer_start..footer_start + footer_len];
263 let computed_crc = crc32c::crc32c(footer_bytes);
264
265 if computed_crc != stored_crc {
266 return Err(crate::error::ColumnarError::FooterCrcMismatch {
267 stored: stored_crc,
268 computed: computed_crc,
269 });
270 }
271
272 zerompk::from_msgpack(footer_bytes)
273 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn header_roundtrip() {
283 let header = SegmentHeader::current();
284 let bytes = header.to_bytes();
285 let parsed = SegmentHeader::from_bytes(&bytes).expect("valid header");
286 assert_eq!(parsed, header);
287 }
288
289 #[test]
290 fn header_invalid_magic() {
291 let mut bytes = SegmentHeader::current().to_bytes();
292 bytes[0] = b'X';
293 assert!(matches!(
294 SegmentHeader::from_bytes(&bytes),
295 Err(crate::error::ColumnarError::InvalidMagic(_))
296 ));
297 }
298
299 #[test]
300 fn header_incompatible_major() {
301 let mut bytes = SegmentHeader::current().to_bytes();
302 bytes[4] = VERSION_MAJOR + 1; assert!(matches!(
304 SegmentHeader::from_bytes(&bytes),
305 Err(crate::error::ColumnarError::IncompatibleVersion { .. })
306 ));
307 }
308
309 #[test]
310 fn header_compatible_minor() {
311 let mut bytes = SegmentHeader::current().to_bytes();
312 bytes[5] = VERSION_MINOR + 5; let parsed = SegmentHeader::from_bytes(&bytes).expect("compatible minor");
314 assert_eq!(parsed.version_major, VERSION_MAJOR);
315 assert_eq!(parsed.version_minor, VERSION_MINOR + 5);
316 }
317
318 #[test]
319 fn footer_roundtrip() {
320 let footer = SegmentFooter {
321 schema_hash: 0xDEAD_BEEF_CAFE_1234,
322 column_count: 3,
323 row_count: 2048,
324 profile_tag: 0,
325 columns: vec![
326 ColumnMeta {
327 name: "id".into(),
328 offset: 7,
329 length: 512,
330 codec: nodedb_codec::ColumnCodec::DeltaFastLanesLz4,
331 block_count: 2,
332 block_stats: vec![
333 BlockStats::numeric(1.0, 1024.0, 0, 1024),
334 BlockStats::numeric(1025.0, 2048.0, 0, 1024),
335 ],
336 dictionary: None,
337 },
338 ColumnMeta {
339 name: "name".into(),
340 offset: 519,
341 length: 256,
342 codec: nodedb_codec::ColumnCodec::FsstLz4,
343 block_count: 2,
344 block_stats: vec![
345 BlockStats::non_numeric(0, 1024),
346 BlockStats::non_numeric(5, 1024),
347 ],
348 dictionary: None,
349 },
350 ColumnMeta {
351 name: "score".into(),
352 offset: 775,
353 length: 128,
354 codec: nodedb_codec::ColumnCodec::AlpFastLanesLz4,
355 block_count: 2,
356 block_stats: vec![
357 BlockStats::numeric(0.0, 100.0, 10, 1024),
358 BlockStats::numeric(0.5, 99.5, 3, 1024),
359 ],
360 dictionary: None,
361 },
362 ],
363 };
364
365 let footer_bytes = footer.to_bytes().expect("serialize");
367
368 let mut segment = Vec::new();
370 segment.extend_from_slice(&SegmentHeader::current().to_bytes());
371 segment.extend_from_slice(&vec![0u8; 896]); segment.extend_from_slice(&footer_bytes);
373
374 let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse footer");
375 assert_eq!(parsed.schema_hash, footer.schema_hash);
376 assert_eq!(parsed.column_count, 3);
377 assert_eq!(parsed.row_count, 2048);
378 assert_eq!(parsed.columns.len(), 3);
379 assert_eq!(parsed.columns[0].name, "id");
380 assert_eq!(parsed.columns[1].name, "name");
381 assert_eq!(parsed.columns[2].name, "score");
382 }
383
384 #[test]
385 fn footer_crc_mismatch() {
386 let footer = SegmentFooter {
387 schema_hash: 0,
388 column_count: 0,
389 row_count: 0,
390 profile_tag: 0,
391 columns: vec![],
392 };
393 let mut bytes = footer.to_bytes().expect("serialize");
394 let len = bytes.len();
396 bytes[len - 1] ^= 0xFF;
397
398 assert!(matches!(
399 SegmentFooter::from_segment_tail(&bytes),
400 Err(crate::error::ColumnarError::FooterCrcMismatch { .. })
401 ));
402 }
403
404 #[test]
405 fn block_stats_predicate_skip() {
406 let stats = BlockStats::numeric(10.0, 50.0, 0, 1024);
407
408 use crate::predicate::ScanPredicate;
409
410 assert!(ScanPredicate::gt(0, 60.0).can_skip_block(&stats));
412 assert!(!ScanPredicate::gt(0, 40.0).can_skip_block(&stats));
414 assert!(ScanPredicate::lt(0, 5.0).can_skip_block(&stats));
416 assert!(ScanPredicate::eq(0, 100.0).can_skip_block(&stats));
418 assert!(!ScanPredicate::eq(0, 30.0).can_skip_block(&stats));
420 }
421}