1use serde::{Deserialize, Serialize};
8
9pub const MAGIC: [u8; 4] = *b"NDBS";
11
12pub const VERSION_MAJOR: u8 = 1;
14
15pub const VERSION_MINOR: u8 = 0;
18
19pub const ENDIANNESS_LE: u8 = 0x01;
21
22pub const BLOCK_SIZE: usize = 1024;
26
27pub const HEADER_SIZE: usize = 7;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub struct SegmentHeader {
33 pub magic: [u8; 4],
34 pub version_major: u8,
35 pub version_minor: u8,
36 pub endianness: u8,
37}
38
39impl SegmentHeader {
40 pub fn current() -> Self {
42 Self {
43 magic: MAGIC,
44 version_major: VERSION_MAJOR,
45 version_minor: VERSION_MINOR,
46 endianness: ENDIANNESS_LE,
47 }
48 }
49
50 pub fn to_bytes(&self) -> [u8; HEADER_SIZE] {
52 let mut buf = [0u8; HEADER_SIZE];
53 buf[0..4].copy_from_slice(&self.magic);
54 buf[4] = self.version_major;
55 buf[5] = self.version_minor;
56 buf[6] = self.endianness;
57 buf
58 }
59
60 pub fn from_bytes(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
62 if data.len() < HEADER_SIZE {
63 return Err(crate::error::ColumnarError::TruncatedSegment {
64 expected: HEADER_SIZE,
65 got: data.len(),
66 });
67 }
68
69 let mut magic = [0u8; 4];
70 magic.copy_from_slice(&data[0..4]);
71 if magic != MAGIC {
72 return Err(crate::error::ColumnarError::InvalidMagic(magic));
73 }
74
75 let version_major = data[4];
76 let version_minor = data[5];
77
78 if version_major > VERSION_MAJOR {
80 return Err(crate::error::ColumnarError::IncompatibleVersion {
81 reader_major: VERSION_MAJOR,
82 segment_major: version_major,
83 segment_minor: version_minor,
84 });
85 }
86
87 Ok(Self {
88 magic,
89 version_major,
90 version_minor,
91 endianness: data[6],
92 })
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct BlockStats {
100 pub min: f64,
103 pub max: f64,
105 pub null_count: u32,
107 pub row_count: u32,
109}
110
111impl BlockStats {
112 pub fn non_numeric(null_count: u32, row_count: u32) -> Self {
114 Self {
115 min: f64::NAN,
116 max: f64::NAN,
117 null_count,
118 row_count,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct ColumnMeta {
126 pub name: String,
128 pub offset: u64,
131 pub length: u64,
133 pub codec: nodedb_codec::ColumnCodec,
135 pub block_count: u32,
137 pub block_stats: Vec<BlockStats>,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct SegmentFooter {
155 pub schema_hash: u64,
157 pub column_count: u32,
159 pub row_count: u64,
161 pub profile_tag: u8,
163 pub columns: Vec<ColumnMeta>,
165}
166
167impl SegmentFooter {
168 pub fn to_bytes(&self) -> Result<Vec<u8>, crate::error::ColumnarError> {
170 let footer_msgpack = rmp_serde::to_vec_named(self)
171 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))?;
172
173 let footer_len = footer_msgpack.len() as u32;
174 let footer_crc = crc32c::crc32c(&footer_msgpack);
175
176 let mut buf = Vec::with_capacity(footer_msgpack.len() + 8);
177 buf.extend_from_slice(&footer_msgpack);
178 buf.extend_from_slice(&footer_len.to_le_bytes());
179 buf.extend_from_slice(&footer_crc.to_le_bytes());
180 Ok(buf)
181 }
182
183 pub fn from_segment_tail(data: &[u8]) -> Result<Self, crate::error::ColumnarError> {
188 if data.len() < 8 {
189 return Err(crate::error::ColumnarError::TruncatedSegment {
190 expected: 8,
191 got: data.len(),
192 });
193 }
194
195 let tail = &data[data.len() - 8..];
196 let footer_len =
197 u32::from_le_bytes(tail[0..4].try_into().expect("4 bytes from slice")) as usize;
198 let stored_crc = u32::from_le_bytes(tail[4..8].try_into().expect("4 bytes from slice"));
199
200 let footer_start = data.len().checked_sub(8 + footer_len).ok_or(
201 crate::error::ColumnarError::TruncatedSegment {
202 expected: 8 + footer_len,
203 got: data.len(),
204 },
205 )?;
206
207 let footer_bytes = &data[footer_start..footer_start + footer_len];
208 let computed_crc = crc32c::crc32c(footer_bytes);
209
210 if computed_crc != stored_crc {
211 return Err(crate::error::ColumnarError::FooterCrcMismatch {
212 stored: stored_crc,
213 computed: computed_crc,
214 });
215 }
216
217 rmp_serde::from_slice(footer_bytes)
218 .map_err(|e| crate::error::ColumnarError::Serialization(e.to_string()))
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn header_roundtrip() {
228 let header = SegmentHeader::current();
229 let bytes = header.to_bytes();
230 let parsed = SegmentHeader::from_bytes(&bytes).expect("valid header");
231 assert_eq!(parsed, header);
232 }
233
234 #[test]
235 fn header_invalid_magic() {
236 let mut bytes = SegmentHeader::current().to_bytes();
237 bytes[0] = b'X';
238 assert!(matches!(
239 SegmentHeader::from_bytes(&bytes),
240 Err(crate::error::ColumnarError::InvalidMagic(_))
241 ));
242 }
243
244 #[test]
245 fn header_incompatible_major() {
246 let mut bytes = SegmentHeader::current().to_bytes();
247 bytes[4] = VERSION_MAJOR + 1; assert!(matches!(
249 SegmentHeader::from_bytes(&bytes),
250 Err(crate::error::ColumnarError::IncompatibleVersion { .. })
251 ));
252 }
253
254 #[test]
255 fn header_compatible_minor() {
256 let mut bytes = SegmentHeader::current().to_bytes();
257 bytes[5] = VERSION_MINOR + 5; let parsed = SegmentHeader::from_bytes(&bytes).expect("compatible minor");
259 assert_eq!(parsed.version_major, VERSION_MAJOR);
260 assert_eq!(parsed.version_minor, VERSION_MINOR + 5);
261 }
262
263 #[test]
264 fn footer_roundtrip() {
265 let footer = SegmentFooter {
266 schema_hash: 0xDEAD_BEEF_CAFE_1234,
267 column_count: 3,
268 row_count: 2048,
269 profile_tag: 0,
270 columns: vec![
271 ColumnMeta {
272 name: "id".into(),
273 offset: 7,
274 length: 512,
275 codec: nodedb_codec::ColumnCodec::DeltaFastLanesLz4,
276 block_count: 2,
277 block_stats: vec![
278 BlockStats {
279 min: 1.0,
280 max: 1024.0,
281 null_count: 0,
282 row_count: 1024,
283 },
284 BlockStats {
285 min: 1025.0,
286 max: 2048.0,
287 null_count: 0,
288 row_count: 1024,
289 },
290 ],
291 },
292 ColumnMeta {
293 name: "name".into(),
294 offset: 519,
295 length: 256,
296 codec: nodedb_codec::ColumnCodec::FsstLz4,
297 block_count: 2,
298 block_stats: vec![
299 BlockStats::non_numeric(0, 1024),
300 BlockStats::non_numeric(5, 1024),
301 ],
302 },
303 ColumnMeta {
304 name: "score".into(),
305 offset: 775,
306 length: 128,
307 codec: nodedb_codec::ColumnCodec::AlpFastLanesLz4,
308 block_count: 2,
309 block_stats: vec![
310 BlockStats {
311 min: 0.0,
312 max: 100.0,
313 null_count: 10,
314 row_count: 1024,
315 },
316 BlockStats {
317 min: 0.5,
318 max: 99.5,
319 null_count: 3,
320 row_count: 1024,
321 },
322 ],
323 },
324 ],
325 };
326
327 let footer_bytes = footer.to_bytes().expect("serialize");
329
330 let mut segment = Vec::new();
332 segment.extend_from_slice(&SegmentHeader::current().to_bytes());
333 segment.extend_from_slice(&vec![0u8; 896]); segment.extend_from_slice(&footer_bytes);
335
336 let parsed = SegmentFooter::from_segment_tail(&segment).expect("parse footer");
337 assert_eq!(parsed.schema_hash, footer.schema_hash);
338 assert_eq!(parsed.column_count, 3);
339 assert_eq!(parsed.row_count, 2048);
340 assert_eq!(parsed.columns.len(), 3);
341 assert_eq!(parsed.columns[0].name, "id");
342 assert_eq!(parsed.columns[1].name, "name");
343 assert_eq!(parsed.columns[2].name, "score");
344 }
345
346 #[test]
347 fn footer_crc_mismatch() {
348 let footer = SegmentFooter {
349 schema_hash: 0,
350 column_count: 0,
351 row_count: 0,
352 profile_tag: 0,
353 columns: vec![],
354 };
355 let mut bytes = footer.to_bytes().expect("serialize");
356 let len = bytes.len();
358 bytes[len - 1] ^= 0xFF;
359
360 assert!(matches!(
361 SegmentFooter::from_segment_tail(&bytes),
362 Err(crate::error::ColumnarError::FooterCrcMismatch { .. })
363 ));
364 }
365
366 #[test]
367 fn block_stats_predicate_skip() {
368 let stats = BlockStats {
369 min: 10.0,
370 max: 50.0,
371 null_count: 0,
372 row_count: 1024,
373 };
374
375 use crate::predicate::ScanPredicate;
376
377 assert!(ScanPredicate::gt(0, 60.0).can_skip_block(&stats));
379 assert!(!ScanPredicate::gt(0, 40.0).can_skip_block(&stats));
381 assert!(ScanPredicate::lt(0, 5.0).can_skip_block(&stats));
383 assert!(ScanPredicate::eq(0, 100.0).can_skip_block(&stats));
385 assert!(!ScanPredicate::eq(0, 30.0).can_skip_block(&stats));
387 }
388}