1use crate::{COLUMN_BLOCK_MAGIC, COLUMN_BLOCK_VERSION_V1};
8
9pub const COLUMN_BLOCK_HEADER_LEN: usize = 52;
10pub const COLUMN_BLOCK_DIR_ENTRY_LEN: usize = 54;
11pub const COLUMN_BLOCK_FOOTER_LEN: usize = 24;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ColumnBlockFrameError {
15 Truncated,
16 BadMagic([u8; 4]),
17 BadTailMagic([u8; 4]),
18 UnsupportedVersion(u16),
19 BadDirectory,
20 ChecksumMismatch { expected: u32, actual: u32 },
21}
22
23impl std::fmt::Display for ColumnBlockFrameError {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 Self::Truncated => write!(f, "column block truncated"),
27 Self::BadMagic(m) => write!(f, "bad column block magic: {m:?}"),
28 Self::BadTailMagic(m) => write!(f, "bad column block tail magic: {m:?}"),
29 Self::UnsupportedVersion(v) => write!(f, "unsupported column block version: {v}"),
30 Self::BadDirectory => write!(f, "column directory entry out of range"),
31 Self::ChecksumMismatch { expected, actual } => write!(
32 f,
33 "column block checksum mismatch: expected 0x{expected:08X}, got 0x{actual:08X}"
34 ),
35 }
36 }
37}
38
39impl std::error::Error for ColumnBlockFrameError {}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct ColumnBlockPart<'a> {
43 pub column_id: u32,
44 pub logical_type: u8,
45 pub codec_tag: u8,
46 pub stream: &'a [u8],
47 pub granule_index: &'a [u8],
48 pub granule_bloom: &'a [u8],
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct ColumnBlockColumn<'a> {
53 pub column_id: u32,
54 pub logical_type: u8,
55 pub codec_tag: u8,
56 pub stream: &'a [u8],
57 pub granule_index: Option<&'a [u8]>,
58 pub granule_bloom: Option<&'a [u8]>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct ColumnBlockFrame<'a> {
63 pub chunk_id: u64,
64 pub schema_ref: u64,
65 pub row_count: u64,
66 pub min_ts_ns: u64,
67 pub max_ts_ns: u64,
68 pub columns: Vec<ColumnBlockColumn<'a>>,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct ColumnBlockGranuleStats {
73 pub min: Vec<u8>,
74 pub max: Vec<u8>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct ColumnBlockGranuleIndex {
79 pub granule_size: u32,
80 pub value_width: u32,
81 pub granules: Vec<ColumnBlockGranuleStats>,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct ColumnBlockGranuleBloom<'a> {
86 pub granule_size: u32,
87 pub blooms: Vec<&'a [u8]>,
88}
89
90pub fn column_block_crc32(data: &[u8]) -> u32 {
91 let mut hasher = crc32fast::Hasher::new();
92 hasher.update(data);
93 hasher.finalize()
94}
95
96pub fn encode_column_block_frame(
97 chunk_id: u64,
98 schema_ref: u64,
99 row_count: u64,
100 min_ts_ns: u64,
101 max_ts_ns: u64,
102 columns: &[ColumnBlockPart<'_>],
103) -> Vec<u8> {
104 let column_count = columns.len();
105 let dir_off = COLUMN_BLOCK_HEADER_LEN;
106 let dir_len = column_count * COLUMN_BLOCK_DIR_ENTRY_LEN;
107 let streams_off = dir_off + dir_len;
108 let granule_region_off =
109 streams_off as u64 + columns.iter().map(|c| c.stream.len() as u64).sum::<u64>();
110 let bloom_region_off = granule_region_off
111 + columns
112 .iter()
113 .map(|c| c.granule_index.len() as u64)
114 .sum::<u64>();
115
116 let mut out = Vec::with_capacity(
117 streams_off
118 + columns.iter().map(|c| c.stream.len()).sum::<usize>()
119 + columns.iter().map(|c| c.granule_index.len()).sum::<usize>()
120 + columns.iter().map(|c| c.granule_bloom.len()).sum::<usize>()
121 + COLUMN_BLOCK_FOOTER_LEN,
122 );
123
124 out.extend_from_slice(&COLUMN_BLOCK_MAGIC);
125 out.extend_from_slice(&COLUMN_BLOCK_VERSION_V1.to_le_bytes());
126 out.extend_from_slice(&0u16.to_le_bytes());
127 out.extend_from_slice(&chunk_id.to_le_bytes());
128 out.extend_from_slice(&schema_ref.to_le_bytes());
129 out.extend_from_slice(&row_count.to_le_bytes());
130 out.extend_from_slice(&(column_count as u32).to_le_bytes());
131 out.extend_from_slice(&min_ts_ns.to_le_bytes());
132 out.extend_from_slice(&max_ts_ns.to_le_bytes());
133 debug_assert_eq!(out.len(), COLUMN_BLOCK_HEADER_LEN);
134
135 let mut cursor = streams_off as u64;
136 let mut granule_cursor = granule_region_off;
137 let mut bloom_cursor = bloom_region_off;
138 for col in columns {
139 out.extend_from_slice(&col.column_id.to_le_bytes());
140 out.push(col.logical_type);
141 out.push(col.codec_tag);
142 out.extend_from_slice(&cursor.to_le_bytes());
143 out.extend_from_slice(&(col.stream.len() as u64).to_le_bytes());
144 if col.granule_index.is_empty() {
145 out.extend_from_slice(&0u64.to_le_bytes());
146 out.extend_from_slice(&0u64.to_le_bytes());
147 } else {
148 out.extend_from_slice(&granule_cursor.to_le_bytes());
149 out.extend_from_slice(&(col.granule_index.len() as u64).to_le_bytes());
150 granule_cursor += col.granule_index.len() as u64;
151 }
152 if col.granule_bloom.is_empty() {
153 out.extend_from_slice(&0u64.to_le_bytes());
154 out.extend_from_slice(&0u64.to_le_bytes());
155 } else {
156 out.extend_from_slice(&bloom_cursor.to_le_bytes());
157 out.extend_from_slice(&(col.granule_bloom.len() as u64).to_le_bytes());
158 bloom_cursor += col.granule_bloom.len() as u64;
159 }
160 cursor += col.stream.len() as u64;
161 }
162 debug_assert_eq!(out.len(), streams_off);
163
164 for col in columns {
165 out.extend_from_slice(col.stream);
166 }
167 debug_assert_eq!(out.len() as u64, granule_region_off);
168
169 for col in columns {
170 out.extend_from_slice(col.granule_index);
171 }
172 debug_assert_eq!(out.len() as u64, bloom_region_off);
173
174 for col in columns {
175 out.extend_from_slice(col.granule_bloom);
176 }
177
178 let crc = column_block_crc32(&out);
179 out.extend_from_slice(&(dir_off as u64).to_le_bytes());
180 out.extend_from_slice(&(dir_len as u64).to_le_bytes());
181 out.extend_from_slice(&crc.to_le_bytes());
182 out.extend_from_slice(&COLUMN_BLOCK_MAGIC);
183 out
184}
185
186pub fn peek_column_block_version(bytes: &[u8]) -> Option<u16> {
187 if bytes.len() < COLUMN_BLOCK_HEADER_LEN + COLUMN_BLOCK_FOOTER_LEN {
188 return None;
189 }
190 let magic: [u8; 4] = bytes[0..4].try_into().ok()?;
191 if magic != COLUMN_BLOCK_MAGIC {
192 return None;
193 }
194 Some(u16::from_le_bytes(bytes[4..6].try_into().ok()?))
195}
196
197pub fn decode_column_block_frame(
198 bytes: &[u8],
199) -> Result<ColumnBlockFrame<'_>, ColumnBlockFrameError> {
200 if bytes.len() < COLUMN_BLOCK_HEADER_LEN + COLUMN_BLOCK_FOOTER_LEN {
201 return Err(ColumnBlockFrameError::Truncated);
202 }
203 let magic: [u8; 4] = bytes[0..4].try_into().expect("header length checked");
204 if magic != COLUMN_BLOCK_MAGIC {
205 return Err(ColumnBlockFrameError::BadMagic(magic));
206 }
207 let version = u16::from_le_bytes(bytes[4..6].try_into().expect("version length checked"));
208 if version != COLUMN_BLOCK_VERSION_V1 {
209 return Err(ColumnBlockFrameError::UnsupportedVersion(version));
210 }
211
212 let footer_start = bytes.len() - COLUMN_BLOCK_FOOTER_LEN;
213 let tail_magic: [u8; 4] = bytes[bytes.len() - 4..]
214 .try_into()
215 .expect("footer length checked");
216 if tail_magic != COLUMN_BLOCK_MAGIC {
217 return Err(ColumnBlockFrameError::BadTailMagic(tail_magic));
218 }
219 let dir_off = u64::from_le_bytes(
220 bytes[footer_start..footer_start + 8]
221 .try_into()
222 .expect("footer length checked"),
223 ) as usize;
224 let dir_len = u64::from_le_bytes(
225 bytes[footer_start + 8..footer_start + 16]
226 .try_into()
227 .expect("footer length checked"),
228 ) as usize;
229 let stored_crc = u32::from_le_bytes(
230 bytes[footer_start + 16..footer_start + 20]
231 .try_into()
232 .expect("footer length checked"),
233 );
234 let actual_crc = column_block_crc32(&bytes[..footer_start]);
235 if actual_crc != stored_crc {
236 return Err(ColumnBlockFrameError::ChecksumMismatch {
237 expected: stored_crc,
238 actual: actual_crc,
239 });
240 }
241
242 let chunk_id = u64::from_le_bytes(bytes[8..16].try_into().expect("header length checked"));
243 let schema_ref = u64::from_le_bytes(bytes[16..24].try_into().expect("header length checked"));
244 let row_count = u64::from_le_bytes(bytes[24..32].try_into().expect("header length checked"));
245 let column_count =
246 u32::from_le_bytes(bytes[32..36].try_into().expect("header length checked")) as usize;
247 let min_ts_ns = u64::from_le_bytes(bytes[36..44].try_into().expect("header length checked"));
248 let max_ts_ns = u64::from_le_bytes(bytes[44..52].try_into().expect("header length checked"));
249
250 if dir_off != COLUMN_BLOCK_HEADER_LEN
251 || dir_len != column_count * COLUMN_BLOCK_DIR_ENTRY_LEN
252 || dir_off + dir_len > footer_start
253 {
254 return Err(ColumnBlockFrameError::BadDirectory);
255 }
256
257 let mut columns = Vec::with_capacity(column_count);
258 for i in 0..column_count {
259 let base = dir_off + i * COLUMN_BLOCK_DIR_ENTRY_LEN;
260 let column_id = u32::from_le_bytes(
261 bytes[base..base + 4]
262 .try_into()
263 .expect("directory length checked"),
264 );
265 let logical_type = bytes[base + 4];
266 let codec_tag = bytes[base + 5];
267 let stream_offset = u64::from_le_bytes(
268 bytes[base + 6..base + 14]
269 .try_into()
270 .expect("directory length checked"),
271 ) as usize;
272 let stream_len = u64::from_le_bytes(
273 bytes[base + 14..base + 22]
274 .try_into()
275 .expect("directory length checked"),
276 ) as usize;
277 let granule_off = u64::from_le_bytes(
278 bytes[base + 22..base + 30]
279 .try_into()
280 .expect("directory length checked"),
281 ) as usize;
282 let granule_len = u64::from_le_bytes(
283 bytes[base + 30..base + 38]
284 .try_into()
285 .expect("directory length checked"),
286 ) as usize;
287 let bloom_off = u64::from_le_bytes(
288 bytes[base + 38..base + 46]
289 .try_into()
290 .expect("directory length checked"),
291 ) as usize;
292 let bloom_len = u64::from_le_bytes(
293 bytes[base + 46..base + 54]
294 .try_into()
295 .expect("directory length checked"),
296 ) as usize;
297
298 let stream_end =
299 checked_region_end(stream_offset, stream_len, dir_off, dir_len, footer_start)?;
300 let granule_index = if granule_len == 0 {
301 None
302 } else {
303 let end = checked_region_end(granule_off, granule_len, dir_off, dir_len, footer_start)?;
304 Some(&bytes[granule_off..end])
305 };
306 let granule_bloom = if bloom_len == 0 {
307 None
308 } else {
309 let end = checked_region_end(bloom_off, bloom_len, dir_off, dir_len, footer_start)?;
310 Some(&bytes[bloom_off..end])
311 };
312
313 columns.push(ColumnBlockColumn {
314 column_id,
315 logical_type,
316 codec_tag,
317 stream: &bytes[stream_offset..stream_end],
318 granule_index,
319 granule_bloom,
320 });
321 }
322
323 Ok(ColumnBlockFrame {
324 chunk_id,
325 schema_ref,
326 row_count,
327 min_ts_ns,
328 max_ts_ns,
329 columns,
330 })
331}
332
333fn checked_region_end(
334 offset: usize,
335 len: usize,
336 dir_off: usize,
337 dir_len: usize,
338 footer_start: usize,
339) -> Result<usize, ColumnBlockFrameError> {
340 let end = offset
341 .checked_add(len)
342 .ok_or(ColumnBlockFrameError::BadDirectory)?;
343 if offset < dir_off + dir_len || end > footer_start {
344 return Err(ColumnBlockFrameError::BadDirectory);
345 }
346 Ok(end)
347}
348
349pub fn encode_column_block_granule_index_blob(index: &ColumnBlockGranuleIndex) -> Vec<u8> {
350 let w = index.value_width as usize;
351 let mut out = Vec::with_capacity(12 + index.granules.len() * w * 2);
352 out.extend_from_slice(&index.granule_size.to_le_bytes());
353 out.extend_from_slice(&index.value_width.to_le_bytes());
354 out.extend_from_slice(&(index.granules.len() as u32).to_le_bytes());
355 for g in &index.granules {
356 out.extend_from_slice(&g.min);
357 out.extend_from_slice(&g.max);
358 }
359 out
360}
361
362pub fn decode_column_block_granule_index_blob(
363 bytes: &[u8],
364) -> Result<ColumnBlockGranuleIndex, ColumnBlockFrameError> {
365 if bytes.len() < 12 {
366 return Err(ColumnBlockFrameError::BadDirectory);
367 }
368 let granule_size =
369 u32::from_le_bytes(bytes[0..4].try_into().expect("index header length checked"));
370 let value_width =
371 u32::from_le_bytes(bytes[4..8].try_into().expect("index header length checked"));
372 let count = u32::from_le_bytes(
373 bytes[8..12]
374 .try_into()
375 .expect("index header length checked"),
376 ) as usize;
377 let w = value_width as usize;
378 if w == 0 {
379 return Err(ColumnBlockFrameError::BadDirectory);
380 }
381 let need = 12usize
382 .checked_add(
383 count
384 .checked_mul(w * 2)
385 .ok_or(ColumnBlockFrameError::BadDirectory)?,
386 )
387 .ok_or(ColumnBlockFrameError::BadDirectory)?;
388 if bytes.len() < need {
389 return Err(ColumnBlockFrameError::BadDirectory);
390 }
391 let mut granules = Vec::with_capacity(count);
392 let mut cur = 12;
393 for _ in 0..count {
394 let min = bytes[cur..cur + w].to_vec();
395 cur += w;
396 let max = bytes[cur..cur + w].to_vec();
397 cur += w;
398 granules.push(ColumnBlockGranuleStats { min, max });
399 }
400 Ok(ColumnBlockGranuleIndex {
401 granule_size,
402 value_width,
403 granules,
404 })
405}
406
407pub fn encode_column_block_granule_bloom_blob(granule_size: u32, blooms: &[&[u8]]) -> Vec<u8> {
408 let mut out = Vec::with_capacity(8 + blooms.iter().map(|b| b.len()).sum::<usize>());
409 out.extend_from_slice(&granule_size.to_le_bytes());
410 out.extend_from_slice(&(blooms.len() as u32).to_le_bytes());
411 for bloom in blooms {
412 out.extend_from_slice(bloom);
413 }
414 out
415}
416
417pub fn decode_column_block_granule_bloom_blob(
418 bytes: &[u8],
419) -> Result<ColumnBlockGranuleBloom<'_>, ColumnBlockFrameError> {
420 if bytes.len() < 8 {
421 return Err(ColumnBlockFrameError::BadDirectory);
422 }
423 let granule_size =
424 u32::from_le_bytes(bytes[0..4].try_into().expect("bloom header length checked"));
425 let count =
426 u32::from_le_bytes(bytes[4..8].try_into().expect("bloom header length checked")) as usize;
427 let mut blooms = Vec::with_capacity(count);
428 let mut cur = 8;
429 for _ in 0..count {
430 if cur + 4 > bytes.len() {
431 return Err(ColumnBlockFrameError::BadDirectory);
432 }
433 let num_blocks = u32::from_le_bytes(
434 bytes[cur..cur + 4]
435 .try_into()
436 .expect("bloom block-count length checked"),
437 ) as usize;
438 let bloom_len = 4usize
439 .checked_add(
440 num_blocks
441 .checked_mul(32)
442 .ok_or(ColumnBlockFrameError::BadDirectory)?,
443 )
444 .ok_or(ColumnBlockFrameError::BadDirectory)?;
445 let end = cur
446 .checked_add(bloom_len)
447 .ok_or(ColumnBlockFrameError::BadDirectory)?;
448 if end > bytes.len() {
449 return Err(ColumnBlockFrameError::BadDirectory);
450 }
451 blooms.push(&bytes[cur..end]);
452 cur = end;
453 }
454 Ok(ColumnBlockGranuleBloom {
455 granule_size,
456 blooms,
457 })
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn frame_round_trips_header_directory_and_slices() {
466 let stream = b"stream";
467 let granule = b"granule";
468 let bloom = b"\x01\0\0\0................................";
469 let encoded = encode_column_block_frame(
470 42,
471 7,
472 10,
473 1,
474 99,
475 &[ColumnBlockPart {
476 column_id: 3,
477 logical_type: 2,
478 codec_tag: 4,
479 stream,
480 granule_index: granule,
481 granule_bloom: bloom,
482 }],
483 );
484
485 assert_eq!(
486 peek_column_block_version(&encoded),
487 Some(COLUMN_BLOCK_VERSION_V1)
488 );
489 let decoded = decode_column_block_frame(&encoded).unwrap();
490 assert_eq!(decoded.chunk_id, 42);
491 assert_eq!(decoded.columns[0].column_id, 3);
492 assert_eq!(decoded.columns[0].stream, stream);
493 assert_eq!(decoded.columns[0].granule_index, Some(&granule[..]));
494 assert_eq!(decoded.columns[0].granule_bloom, Some(&bloom[..]));
495 }
496
497 #[test]
498 fn frame_rejects_checksum_mismatch() {
499 let mut encoded = encode_column_block_frame(1, 0, 0, 0, 0, &[]);
500 encoded[COLUMN_BLOCK_HEADER_LEN - 1] ^= 0xFF;
501 assert!(matches!(
502 decode_column_block_frame(&encoded),
503 Err(ColumnBlockFrameError::ChecksumMismatch { .. })
504 ));
505 }
506
507 #[test]
508 fn granule_index_blob_round_trips() {
509 let index = ColumnBlockGranuleIndex {
510 granule_size: 128,
511 value_width: 2,
512 granules: vec![ColumnBlockGranuleStats {
513 min: vec![1, 2],
514 max: vec![3, 4],
515 }],
516 };
517 let encoded = encode_column_block_granule_index_blob(&index);
518 assert_eq!(
519 decode_column_block_granule_index_blob(&encoded).unwrap(),
520 index
521 );
522 }
523
524 #[test]
525 fn granule_bloom_blob_slices_each_bloom_payload() {
526 let mut one = vec![0u8; 36];
527 one[0] = 1;
528 let encoded = encode_column_block_granule_bloom_blob(64, &[&one[..]]);
529 let decoded = decode_column_block_granule_bloom_blob(&encoded).unwrap();
530 assert_eq!(decoded.granule_size, 64);
531 assert_eq!(decoded.blooms, vec![&one[..]]);
532 }
533}