Skip to main content

reddb_file/
column_block.rs

1//! Persisted `RDCC` column-block envelope.
2//!
3//! This module owns the byte frame: header, column directory, stream/index
4//! offsets, footer, and checksum. Runtime codec choice and query pruning stay
5//! outside this crate.
6
7use crate::{COLUMN_BLOCK_MAGIC, COLUMN_BLOCK_VERSION_V1};
8
9pub const COLUMN_BLOCK_HEADER_LEN: usize = 52;
10pub const COLUMN_BLOCK_DIR_ENTRY_LEN: usize = 54;
11pub const COLUMN_BLOCK_FOOTER_LEN: usize = 24;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ColumnBlockFrameError {
15    Truncated,
16    BadMagic([u8; 4]),
17    BadTailMagic([u8; 4]),
18    UnsupportedVersion(u16),
19    BadDirectory,
20    ChecksumMismatch { expected: u32, actual: u32 },
21}
22
23impl std::fmt::Display for ColumnBlockFrameError {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            Self::Truncated => write!(f, "column block truncated"),
27            Self::BadMagic(m) => write!(f, "bad column block magic: {m:?}"),
28            Self::BadTailMagic(m) => write!(f, "bad column block tail magic: {m:?}"),
29            Self::UnsupportedVersion(v) => write!(f, "unsupported column block version: {v}"),
30            Self::BadDirectory => write!(f, "column directory entry out of range"),
31            Self::ChecksumMismatch { expected, actual } => write!(
32                f,
33                "column block checksum mismatch: expected 0x{expected:08X}, got 0x{actual:08X}"
34            ),
35        }
36    }
37}
38
39impl std::error::Error for ColumnBlockFrameError {}
40
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct ColumnBlockPart<'a> {
43    pub column_id: u32,
44    pub logical_type: u8,
45    pub codec_tag: u8,
46    pub stream: &'a [u8],
47    pub granule_index: &'a [u8],
48    pub granule_bloom: &'a [u8],
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct ColumnBlockColumn<'a> {
53    pub column_id: u32,
54    pub logical_type: u8,
55    pub codec_tag: u8,
56    pub stream: &'a [u8],
57    pub granule_index: Option<&'a [u8]>,
58    pub granule_bloom: Option<&'a [u8]>,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct ColumnBlockFrame<'a> {
63    pub chunk_id: u64,
64    pub schema_ref: u64,
65    pub row_count: u64,
66    pub min_ts_ns: u64,
67    pub max_ts_ns: u64,
68    pub columns: Vec<ColumnBlockColumn<'a>>,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct ColumnBlockGranuleStats {
73    pub min: Vec<u8>,
74    pub max: Vec<u8>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct ColumnBlockGranuleIndex {
79    pub granule_size: u32,
80    pub value_width: u32,
81    pub granules: Vec<ColumnBlockGranuleStats>,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct ColumnBlockGranuleBloom<'a> {
86    pub granule_size: u32,
87    pub blooms: Vec<&'a [u8]>,
88}
89
90pub fn column_block_crc32(data: &[u8]) -> u32 {
91    let mut hasher = crc32fast::Hasher::new();
92    hasher.update(data);
93    hasher.finalize()
94}
95
96pub fn encode_column_block_frame(
97    chunk_id: u64,
98    schema_ref: u64,
99    row_count: u64,
100    min_ts_ns: u64,
101    max_ts_ns: u64,
102    columns: &[ColumnBlockPart<'_>],
103) -> Vec<u8> {
104    let column_count = columns.len();
105    let dir_off = COLUMN_BLOCK_HEADER_LEN;
106    let dir_len = column_count * COLUMN_BLOCK_DIR_ENTRY_LEN;
107    let streams_off = dir_off + dir_len;
108    let granule_region_off =
109        streams_off as u64 + columns.iter().map(|c| c.stream.len() as u64).sum::<u64>();
110    let bloom_region_off = granule_region_off
111        + columns
112            .iter()
113            .map(|c| c.granule_index.len() as u64)
114            .sum::<u64>();
115
116    let mut out = Vec::with_capacity(
117        streams_off
118            + columns.iter().map(|c| c.stream.len()).sum::<usize>()
119            + columns.iter().map(|c| c.granule_index.len()).sum::<usize>()
120            + columns.iter().map(|c| c.granule_bloom.len()).sum::<usize>()
121            + COLUMN_BLOCK_FOOTER_LEN,
122    );
123
124    out.extend_from_slice(&COLUMN_BLOCK_MAGIC);
125    out.extend_from_slice(&COLUMN_BLOCK_VERSION_V1.to_le_bytes());
126    out.extend_from_slice(&0u16.to_le_bytes());
127    out.extend_from_slice(&chunk_id.to_le_bytes());
128    out.extend_from_slice(&schema_ref.to_le_bytes());
129    out.extend_from_slice(&row_count.to_le_bytes());
130    out.extend_from_slice(&(column_count as u32).to_le_bytes());
131    out.extend_from_slice(&min_ts_ns.to_le_bytes());
132    out.extend_from_slice(&max_ts_ns.to_le_bytes());
133    debug_assert_eq!(out.len(), COLUMN_BLOCK_HEADER_LEN);
134
135    let mut cursor = streams_off as u64;
136    let mut granule_cursor = granule_region_off;
137    let mut bloom_cursor = bloom_region_off;
138    for col in columns {
139        out.extend_from_slice(&col.column_id.to_le_bytes());
140        out.push(col.logical_type);
141        out.push(col.codec_tag);
142        out.extend_from_slice(&cursor.to_le_bytes());
143        out.extend_from_slice(&(col.stream.len() as u64).to_le_bytes());
144        if col.granule_index.is_empty() {
145            out.extend_from_slice(&0u64.to_le_bytes());
146            out.extend_from_slice(&0u64.to_le_bytes());
147        } else {
148            out.extend_from_slice(&granule_cursor.to_le_bytes());
149            out.extend_from_slice(&(col.granule_index.len() as u64).to_le_bytes());
150            granule_cursor += col.granule_index.len() as u64;
151        }
152        if col.granule_bloom.is_empty() {
153            out.extend_from_slice(&0u64.to_le_bytes());
154            out.extend_from_slice(&0u64.to_le_bytes());
155        } else {
156            out.extend_from_slice(&bloom_cursor.to_le_bytes());
157            out.extend_from_slice(&(col.granule_bloom.len() as u64).to_le_bytes());
158            bloom_cursor += col.granule_bloom.len() as u64;
159        }
160        cursor += col.stream.len() as u64;
161    }
162    debug_assert_eq!(out.len(), streams_off);
163
164    for col in columns {
165        out.extend_from_slice(col.stream);
166    }
167    debug_assert_eq!(out.len() as u64, granule_region_off);
168
169    for col in columns {
170        out.extend_from_slice(col.granule_index);
171    }
172    debug_assert_eq!(out.len() as u64, bloom_region_off);
173
174    for col in columns {
175        out.extend_from_slice(col.granule_bloom);
176    }
177
178    let crc = column_block_crc32(&out);
179    out.extend_from_slice(&(dir_off as u64).to_le_bytes());
180    out.extend_from_slice(&(dir_len as u64).to_le_bytes());
181    out.extend_from_slice(&crc.to_le_bytes());
182    out.extend_from_slice(&COLUMN_BLOCK_MAGIC);
183    out
184}
185
186pub fn peek_column_block_version(bytes: &[u8]) -> Option<u16> {
187    if bytes.len() < COLUMN_BLOCK_HEADER_LEN + COLUMN_BLOCK_FOOTER_LEN {
188        return None;
189    }
190    let magic: [u8; 4] = bytes[0..4].try_into().ok()?;
191    if magic != COLUMN_BLOCK_MAGIC {
192        return None;
193    }
194    Some(u16::from_le_bytes(bytes[4..6].try_into().ok()?))
195}
196
197pub fn decode_column_block_frame(
198    bytes: &[u8],
199) -> Result<ColumnBlockFrame<'_>, ColumnBlockFrameError> {
200    if bytes.len() < COLUMN_BLOCK_HEADER_LEN + COLUMN_BLOCK_FOOTER_LEN {
201        return Err(ColumnBlockFrameError::Truncated);
202    }
203    let magic: [u8; 4] = bytes[0..4].try_into().expect("header length checked");
204    if magic != COLUMN_BLOCK_MAGIC {
205        return Err(ColumnBlockFrameError::BadMagic(magic));
206    }
207    let version = u16::from_le_bytes(bytes[4..6].try_into().expect("version length checked"));
208    if version != COLUMN_BLOCK_VERSION_V1 {
209        return Err(ColumnBlockFrameError::UnsupportedVersion(version));
210    }
211
212    let footer_start = bytes.len() - COLUMN_BLOCK_FOOTER_LEN;
213    let tail_magic: [u8; 4] = bytes[bytes.len() - 4..]
214        .try_into()
215        .expect("footer length checked");
216    if tail_magic != COLUMN_BLOCK_MAGIC {
217        return Err(ColumnBlockFrameError::BadTailMagic(tail_magic));
218    }
219    let dir_off = u64::from_le_bytes(
220        bytes[footer_start..footer_start + 8]
221            .try_into()
222            .expect("footer length checked"),
223    ) as usize;
224    let dir_len = u64::from_le_bytes(
225        bytes[footer_start + 8..footer_start + 16]
226            .try_into()
227            .expect("footer length checked"),
228    ) as usize;
229    let stored_crc = u32::from_le_bytes(
230        bytes[footer_start + 16..footer_start + 20]
231            .try_into()
232            .expect("footer length checked"),
233    );
234    let actual_crc = column_block_crc32(&bytes[..footer_start]);
235    if actual_crc != stored_crc {
236        return Err(ColumnBlockFrameError::ChecksumMismatch {
237            expected: stored_crc,
238            actual: actual_crc,
239        });
240    }
241
242    let chunk_id = u64::from_le_bytes(bytes[8..16].try_into().expect("header length checked"));
243    let schema_ref = u64::from_le_bytes(bytes[16..24].try_into().expect("header length checked"));
244    let row_count = u64::from_le_bytes(bytes[24..32].try_into().expect("header length checked"));
245    let column_count =
246        u32::from_le_bytes(bytes[32..36].try_into().expect("header length checked")) as usize;
247    let min_ts_ns = u64::from_le_bytes(bytes[36..44].try_into().expect("header length checked"));
248    let max_ts_ns = u64::from_le_bytes(bytes[44..52].try_into().expect("header length checked"));
249
250    if dir_off != COLUMN_BLOCK_HEADER_LEN
251        || dir_len != column_count * COLUMN_BLOCK_DIR_ENTRY_LEN
252        || dir_off + dir_len > footer_start
253    {
254        return Err(ColumnBlockFrameError::BadDirectory);
255    }
256
257    let mut columns = Vec::with_capacity(column_count);
258    for i in 0..column_count {
259        let base = dir_off + i * COLUMN_BLOCK_DIR_ENTRY_LEN;
260        let column_id = u32::from_le_bytes(
261            bytes[base..base + 4]
262                .try_into()
263                .expect("directory length checked"),
264        );
265        let logical_type = bytes[base + 4];
266        let codec_tag = bytes[base + 5];
267        let stream_offset = u64::from_le_bytes(
268            bytes[base + 6..base + 14]
269                .try_into()
270                .expect("directory length checked"),
271        ) as usize;
272        let stream_len = u64::from_le_bytes(
273            bytes[base + 14..base + 22]
274                .try_into()
275                .expect("directory length checked"),
276        ) as usize;
277        let granule_off = u64::from_le_bytes(
278            bytes[base + 22..base + 30]
279                .try_into()
280                .expect("directory length checked"),
281        ) as usize;
282        let granule_len = u64::from_le_bytes(
283            bytes[base + 30..base + 38]
284                .try_into()
285                .expect("directory length checked"),
286        ) as usize;
287        let bloom_off = u64::from_le_bytes(
288            bytes[base + 38..base + 46]
289                .try_into()
290                .expect("directory length checked"),
291        ) as usize;
292        let bloom_len = u64::from_le_bytes(
293            bytes[base + 46..base + 54]
294                .try_into()
295                .expect("directory length checked"),
296        ) as usize;
297
298        let stream_end =
299            checked_region_end(stream_offset, stream_len, dir_off, dir_len, footer_start)?;
300        let granule_index = if granule_len == 0 {
301            None
302        } else {
303            let end = checked_region_end(granule_off, granule_len, dir_off, dir_len, footer_start)?;
304            Some(&bytes[granule_off..end])
305        };
306        let granule_bloom = if bloom_len == 0 {
307            None
308        } else {
309            let end = checked_region_end(bloom_off, bloom_len, dir_off, dir_len, footer_start)?;
310            Some(&bytes[bloom_off..end])
311        };
312
313        columns.push(ColumnBlockColumn {
314            column_id,
315            logical_type,
316            codec_tag,
317            stream: &bytes[stream_offset..stream_end],
318            granule_index,
319            granule_bloom,
320        });
321    }
322
323    Ok(ColumnBlockFrame {
324        chunk_id,
325        schema_ref,
326        row_count,
327        min_ts_ns,
328        max_ts_ns,
329        columns,
330    })
331}
332
333fn checked_region_end(
334    offset: usize,
335    len: usize,
336    dir_off: usize,
337    dir_len: usize,
338    footer_start: usize,
339) -> Result<usize, ColumnBlockFrameError> {
340    let end = offset
341        .checked_add(len)
342        .ok_or(ColumnBlockFrameError::BadDirectory)?;
343    if offset < dir_off + dir_len || end > footer_start {
344        return Err(ColumnBlockFrameError::BadDirectory);
345    }
346    Ok(end)
347}
348
349pub fn encode_column_block_granule_index_blob(index: &ColumnBlockGranuleIndex) -> Vec<u8> {
350    let w = index.value_width as usize;
351    let mut out = Vec::with_capacity(12 + index.granules.len() * w * 2);
352    out.extend_from_slice(&index.granule_size.to_le_bytes());
353    out.extend_from_slice(&index.value_width.to_le_bytes());
354    out.extend_from_slice(&(index.granules.len() as u32).to_le_bytes());
355    for g in &index.granules {
356        out.extend_from_slice(&g.min);
357        out.extend_from_slice(&g.max);
358    }
359    out
360}
361
362pub fn decode_column_block_granule_index_blob(
363    bytes: &[u8],
364) -> Result<ColumnBlockGranuleIndex, ColumnBlockFrameError> {
365    if bytes.len() < 12 {
366        return Err(ColumnBlockFrameError::BadDirectory);
367    }
368    let granule_size =
369        u32::from_le_bytes(bytes[0..4].try_into().expect("index header length checked"));
370    let value_width =
371        u32::from_le_bytes(bytes[4..8].try_into().expect("index header length checked"));
372    let count = u32::from_le_bytes(
373        bytes[8..12]
374            .try_into()
375            .expect("index header length checked"),
376    ) as usize;
377    let w = value_width as usize;
378    if w == 0 {
379        return Err(ColumnBlockFrameError::BadDirectory);
380    }
381    let need = 12usize
382        .checked_add(
383            count
384                .checked_mul(w * 2)
385                .ok_or(ColumnBlockFrameError::BadDirectory)?,
386        )
387        .ok_or(ColumnBlockFrameError::BadDirectory)?;
388    if bytes.len() < need {
389        return Err(ColumnBlockFrameError::BadDirectory);
390    }
391    let mut granules = Vec::with_capacity(count);
392    let mut cur = 12;
393    for _ in 0..count {
394        let min = bytes[cur..cur + w].to_vec();
395        cur += w;
396        let max = bytes[cur..cur + w].to_vec();
397        cur += w;
398        granules.push(ColumnBlockGranuleStats { min, max });
399    }
400    Ok(ColumnBlockGranuleIndex {
401        granule_size,
402        value_width,
403        granules,
404    })
405}
406
407pub fn encode_column_block_granule_bloom_blob(granule_size: u32, blooms: &[&[u8]]) -> Vec<u8> {
408    let mut out = Vec::with_capacity(8 + blooms.iter().map(|b| b.len()).sum::<usize>());
409    out.extend_from_slice(&granule_size.to_le_bytes());
410    out.extend_from_slice(&(blooms.len() as u32).to_le_bytes());
411    for bloom in blooms {
412        out.extend_from_slice(bloom);
413    }
414    out
415}
416
417pub fn decode_column_block_granule_bloom_blob(
418    bytes: &[u8],
419) -> Result<ColumnBlockGranuleBloom<'_>, ColumnBlockFrameError> {
420    if bytes.len() < 8 {
421        return Err(ColumnBlockFrameError::BadDirectory);
422    }
423    let granule_size =
424        u32::from_le_bytes(bytes[0..4].try_into().expect("bloom header length checked"));
425    let count =
426        u32::from_le_bytes(bytes[4..8].try_into().expect("bloom header length checked")) as usize;
427    let mut blooms = Vec::with_capacity(count);
428    let mut cur = 8;
429    for _ in 0..count {
430        if cur + 4 > bytes.len() {
431            return Err(ColumnBlockFrameError::BadDirectory);
432        }
433        let num_blocks = u32::from_le_bytes(
434            bytes[cur..cur + 4]
435                .try_into()
436                .expect("bloom block-count length checked"),
437        ) as usize;
438        let bloom_len = 4usize
439            .checked_add(
440                num_blocks
441                    .checked_mul(32)
442                    .ok_or(ColumnBlockFrameError::BadDirectory)?,
443            )
444            .ok_or(ColumnBlockFrameError::BadDirectory)?;
445        let end = cur
446            .checked_add(bloom_len)
447            .ok_or(ColumnBlockFrameError::BadDirectory)?;
448        if end > bytes.len() {
449            return Err(ColumnBlockFrameError::BadDirectory);
450        }
451        blooms.push(&bytes[cur..end]);
452        cur = end;
453    }
454    Ok(ColumnBlockGranuleBloom {
455        granule_size,
456        blooms,
457    })
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    #[test]
465    fn frame_round_trips_header_directory_and_slices() {
466        let stream = b"stream";
467        let granule = b"granule";
468        let bloom = b"\x01\0\0\0................................";
469        let encoded = encode_column_block_frame(
470            42,
471            7,
472            10,
473            1,
474            99,
475            &[ColumnBlockPart {
476                column_id: 3,
477                logical_type: 2,
478                codec_tag: 4,
479                stream,
480                granule_index: granule,
481                granule_bloom: bloom,
482            }],
483        );
484
485        assert_eq!(
486            peek_column_block_version(&encoded),
487            Some(COLUMN_BLOCK_VERSION_V1)
488        );
489        let decoded = decode_column_block_frame(&encoded).unwrap();
490        assert_eq!(decoded.chunk_id, 42);
491        assert_eq!(decoded.columns[0].column_id, 3);
492        assert_eq!(decoded.columns[0].stream, stream);
493        assert_eq!(decoded.columns[0].granule_index, Some(&granule[..]));
494        assert_eq!(decoded.columns[0].granule_bloom, Some(&bloom[..]));
495    }
496
497    #[test]
498    fn frame_rejects_checksum_mismatch() {
499        let mut encoded = encode_column_block_frame(1, 0, 0, 0, 0, &[]);
500        encoded[COLUMN_BLOCK_HEADER_LEN - 1] ^= 0xFF;
501        assert!(matches!(
502            decode_column_block_frame(&encoded),
503            Err(ColumnBlockFrameError::ChecksumMismatch { .. })
504        ));
505    }
506
507    #[test]
508    fn granule_index_blob_round_trips() {
509        let index = ColumnBlockGranuleIndex {
510            granule_size: 128,
511            value_width: 2,
512            granules: vec![ColumnBlockGranuleStats {
513                min: vec![1, 2],
514                max: vec![3, 4],
515            }],
516        };
517        let encoded = encode_column_block_granule_index_blob(&index);
518        assert_eq!(
519            decode_column_block_granule_index_blob(&encoded).unwrap(),
520            index
521        );
522    }
523
524    #[test]
525    fn granule_bloom_blob_slices_each_bloom_payload() {
526        let mut one = vec![0u8; 36];
527        one[0] = 1;
528        let encoded = encode_column_block_granule_bloom_blob(64, &[&one[..]]);
529        let decoded = decode_column_block_granule_bloom_blob(&encoded).unwrap();
530        assert_eq!(decoded.granule_size, 64);
531        assert_eq!(decoded.blooms, vec![&one[..]]);
532    }
533}