alopex_core/columnar/
segment.rs

1//! Columnar segment writer/reader with chunked IO and checksum verification.
2use std::fs::File;
3use std::io::{Read, Write};
4use std::path::Path;
5
6use crate::columnar::encoding::{
7    decode_column, encode_column, Column, Compression, Encoding, LogicalType,
8};
9use crate::columnar::error::{ColumnarError, Result};
10use crc32fast::Hasher;
11
12const MAGIC: &[u8] = b"ALXC";
13const VERSION: u16 = 1;
14const MAX_CHUNK_BYTES: usize = 16 * 1024 * 1024; // 16 MiB guard to avoid unbounded allocations.
15
16/// Metadata describing a segment.
17#[derive(Clone, Debug, PartialEq, Eq)]
18pub struct SegmentMeta {
19    /// Logical type of the stored column.
20    pub logical_type: LogicalType,
21    /// Encoding applied to each chunk.
22    pub encoding: Encoding,
23    /// Compression applied after encoding.
24    pub compression: Compression,
25    /// Maximum rows per chunk.
26    pub chunk_rows: usize,
27    /// Whether to append a checksum per chunk.
28    pub chunk_checksum: bool,
29}
30
31/// Writes a single-column segment to `path`, chunked by `meta.chunk_rows`.
32pub fn write_segment(path: &Path, column: &Column, meta: &SegmentMeta) -> Result<()> {
33    // v1 セグメントは Float32 と Fixed 長 251 以上をサポートしない。
34    match meta.logical_type {
35        LogicalType::Float32 => {
36            return Err(ColumnarError::InvalidFormat(
37                "segment v1 does not support Float32".into(),
38            ))
39        }
40        LogicalType::Fixed(len) if len >= 251 => {
41            return Err(ColumnarError::InvalidFormat(
42                "segment v1 fixed length must be <= 250".into(),
43            ))
44        }
45        _ => {}
46    }
47
48    let mut file = File::create(path)?;
49
50    // Header
51    file.write_all(MAGIC)?;
52    file.write_all(&VERSION.to_le_bytes())?;
53    let logical_byte = logical_to_byte(meta.logical_type)?;
54    file.write_all(&[
55        logical_byte,
56        encoding_to_byte(meta.encoding),
57        compression_to_byte(meta.compression),
58    ])?;
59    file.write_all(&(meta.chunk_rows as u32).to_le_bytes())?;
60    file.write_all(&[meta.chunk_checksum as u8])?;
61
62    let total_rows = column_len(column);
63    file.write_all(&(total_rows as u32).to_le_bytes())?;
64
65    // Chunked payload
66    let mut start = 0;
67    while start < total_rows {
68        let end = usize::min(start + meta.chunk_rows, total_rows);
69        let chunk = slice_column(column, start, end - start)?;
70        let encoded = encode_column(
71            &chunk,
72            meta.encoding,
73            meta.compression,
74            false,
75            meta.logical_type,
76        )?;
77        let mut checksum_bytes = [0u8; 4];
78        if meta.chunk_checksum {
79            let mut hasher = Hasher::new();
80            hasher.update(&encoded);
81            checksum_bytes = hasher.finalize().to_le_bytes();
82        }
83
84        file.write_all(&((end - start) as u32).to_le_bytes())?;
85        file.write_all(&(encoded.len() as u32).to_le_bytes())?;
86        file.write_all(&encoded)?;
87        if meta.chunk_checksum {
88            file.write_all(&checksum_bytes)?;
89        }
90        start = end;
91    }
92    Ok(())
93}
94
95/// Streaming reader for a columnar segment.
96pub struct SegmentReader {
97    meta: SegmentMeta,
98    file: File,
99    remaining_rows: usize,
100}
101
102impl SegmentReader {
103    /// Opens a segment file and validates the header.
104    pub fn open(path: &Path) -> Result<Self> {
105        let mut file = File::open(path)?;
106        let mut magic = [0u8; 4];
107        file.read_exact(&mut magic)?;
108        if magic != MAGIC {
109            return Err(ColumnarError::InvalidFormat("invalid segment magic".into()));
110        }
111        let mut version_bytes = [0u8; 2];
112        file.read_exact(&mut version_bytes)?;
113        let version = u16::from_le_bytes(version_bytes);
114        if version != VERSION {
115            return Err(ColumnarError::InvalidFormat(
116                "unsupported segment version".into(),
117            ));
118        }
119
120        let mut kind = [0u8; 3];
121        file.read_exact(&mut kind)?;
122        let logical_type = byte_to_logical(kind[0])?;
123        let encoding = byte_to_encoding(kind[1])?;
124        let compression = byte_to_compression(kind[2])?;
125
126        let mut chunk_rows_bytes = [0u8; 4];
127        file.read_exact(&mut chunk_rows_bytes)?;
128        let chunk_rows = u32::from_le_bytes(chunk_rows_bytes) as usize;
129
130        let mut checksum_flag = [0u8; 1];
131        file.read_exact(&mut checksum_flag)?;
132        let chunk_checksum = checksum_flag[0] != 0;
133
134        let mut total_rows_bytes = [0u8; 4];
135        file.read_exact(&mut total_rows_bytes)?;
136        let remaining_rows = u32::from_le_bytes(total_rows_bytes) as usize;
137
138        let meta = SegmentMeta {
139            logical_type,
140            encoding,
141            compression,
142            chunk_rows,
143            chunk_checksum,
144        };
145
146        Ok(Self {
147            meta,
148            file,
149            remaining_rows,
150        })
151    }
152
153    /// Returns an iterator that yields decoded column chunks.
154    pub fn iter(&mut self) -> ChunkIter<'_> {
155        ChunkIter { reader: self }
156    }
157}
158
159/// Iterator over column chunks. Buffers only one chunk at a time.
160pub struct ChunkIter<'a> {
161    reader: &'a mut SegmentReader,
162}
163
164impl<'a> Iterator for ChunkIter<'a> {
165    type Item = Result<Column>;
166
167    fn next(&mut self) -> Option<Self::Item> {
168        if self.reader.remaining_rows == 0 {
169            return None;
170        }
171
172        let mut row_bytes = [0u8; 4];
173        if let Err(e) = self.reader.file.read_exact(&mut row_bytes) {
174            return Some(Err(ColumnarError::InvalidFormat(format!(
175                "chunk rows read failed: {e}"
176            ))));
177        }
178        let rows = u32::from_le_bytes(row_bytes) as usize;
179        if rows == 0 || rows > self.reader.meta.chunk_rows {
180            return Some(Err(ColumnarError::CorruptedSegment {
181                reason: "chunk rows exceed declared limit".into(),
182            }));
183        }
184
185        let mut len_bytes = [0u8; 4];
186        if let Err(e) = self.reader.file.read_exact(&mut len_bytes) {
187            return Some(Err(ColumnarError::InvalidFormat(format!(
188                "chunk length read failed: {e}"
189            ))));
190        }
191        let len = u32::from_le_bytes(len_bytes) as usize;
192        if len > MAX_CHUNK_BYTES {
193            return Some(Err(ColumnarError::CorruptedSegment {
194                reason: "chunk encoded size exceeds limit".into(),
195            }));
196        }
197
198        let mut buf = vec![0u8; len];
199        if let Err(e) = self.reader.file.read_exact(&mut buf) {
200            return Some(Err(ColumnarError::InvalidFormat(format!(
201                "chunk payload read failed: {e}"
202            ))));
203        }
204
205        if self.reader.meta.chunk_checksum {
206            let mut crc_bytes = [0u8; 4];
207            if let Err(e) = self.reader.file.read_exact(&mut crc_bytes) {
208                return Some(Err(ColumnarError::InvalidFormat(format!(
209                    "chunk checksum read failed: {e}"
210                ))));
211            }
212            let expected = u32::from_le_bytes(crc_bytes);
213            let mut hasher = Hasher::new();
214            hasher.update(&buf);
215            let computed = hasher.finalize();
216            if expected != computed {
217                return Some(Err(ColumnarError::CorruptedSegment {
218                    reason: "checksum mismatch".into(),
219                }));
220            }
221        }
222
223        self.reader.remaining_rows = self.reader.remaining_rows.saturating_sub(rows);
224        Some(decode_column(
225            &buf,
226            self.reader.meta.logical_type,
227            self.reader.meta.encoding,
228            self.reader.meta.compression,
229            false,
230        ))
231    }
232}
233
234fn logical_to_byte(logical: LogicalType) -> Result<u8> {
235    match logical {
236        LogicalType::Int64 => Ok(0),
237        LogicalType::Float64 => Ok(1),
238        LogicalType::Bool => Ok(2),
239        LogicalType::Binary => Ok(3),
240        LogicalType::Float32 => Err(ColumnarError::InvalidFormat(
241            "segment v1 does not support Float32".into(),
242        )),
243        LogicalType::Fixed(len) => {
244            // 4..=254 are reserved for fixed lengths 0..=250. 255 is treated as
245            // a clamped legacy sentinel for >=251 to avoid panics.
246            if len >= 251 {
247                Ok(255)
248            } else {
249                Ok(4 + (len as u8))
250            }
251        }
252    }
253}
254
255fn byte_to_logical(byte: u8) -> Result<LogicalType> {
256    match byte {
257        0 => Ok(LogicalType::Int64),
258        1 => Ok(LogicalType::Float64),
259        2 => Ok(LogicalType::Bool),
260        3 => Ok(LogicalType::Binary),
261        255 => Ok(LogicalType::Fixed(251)),
262        b if b >= 4 => Ok(LogicalType::Fixed((b - 4) as u16)),
263        _ => Err(ColumnarError::InvalidFormat("unknown logical type".into())),
264    }
265}
266
267fn encoding_to_byte(enc: Encoding) -> u8 {
268    match enc {
269        Encoding::Plain => 0,
270        Encoding::Dictionary => 1,
271        Encoding::Rle => 2,
272        Encoding::Bitpack => 3,
273    }
274}
275
276fn byte_to_encoding(byte: u8) -> Result<Encoding> {
277    match byte {
278        0 => Ok(Encoding::Plain),
279        1 => Ok(Encoding::Dictionary),
280        2 => Ok(Encoding::Rle),
281        3 => Ok(Encoding::Bitpack),
282        _ => Err(ColumnarError::InvalidFormat("unknown encoding".into())),
283    }
284}
285
286fn compression_to_byte(comp: Compression) -> u8 {
287    match comp {
288        Compression::None => 0,
289        Compression::Lz4 => 1,
290    }
291}
292
293fn byte_to_compression(byte: u8) -> Result<Compression> {
294    match byte {
295        0 => Ok(Compression::None),
296        1 => Ok(Compression::Lz4),
297        _ => Err(ColumnarError::InvalidFormat("unknown compression".into())),
298    }
299}
300
301fn column_len(column: &Column) -> usize {
302    match column {
303        Column::Int64(v) => v.len(),
304        Column::Float32(v) => v.len(),
305        Column::Float64(v) => v.len(),
306        Column::Bool(v) => v.len(),
307        Column::Binary(v) => v.len(),
308        Column::Fixed { values, .. } => values.len(),
309    }
310}
311
312fn slice_column(column: &Column, start: usize, len: usize) -> Result<Column> {
313    match column {
314        Column::Int64(v) => Ok(Column::Int64(v[start..start + len].to_vec())),
315        Column::Float32(v) => Ok(Column::Float32(v[start..start + len].to_vec())),
316        Column::Float64(v) => Ok(Column::Float64(v[start..start + len].to_vec())),
317        Column::Bool(v) => Ok(Column::Bool(v[start..start + len].to_vec())),
318        Column::Binary(v) => Ok(Column::Binary(v[start..start + len].to_vec())),
319        Column::Fixed {
320            len: fixed_len,
321            values,
322        } => Ok(Column::Fixed {
323            len: *fixed_len,
324            values: values[start..start + len].to_vec(),
325        }),
326    }
327}
328
329#[cfg(all(test, not(target_arch = "wasm32")))]
330mod tests {
331    use super::*;
332    use tempfile::tempdir;
333
334    #[test]
335    fn segment_roundtrip_plain_int64() {
336        let dir = tempdir().unwrap();
337        let path = dir.path().join("seg.alx");
338        let meta = SegmentMeta {
339            logical_type: LogicalType::Int64,
340            encoding: Encoding::Plain,
341            compression: Compression::None,
342            chunk_rows: 2,
343            chunk_checksum: true,
344        };
345        let col = Column::Int64(vec![1, 2, 3, 4, 5]);
346        write_segment(&path, &col, &meta).unwrap();
347
348        let mut reader = SegmentReader::open(&path).unwrap();
349        let mut out = Vec::new();
350        for chunk in reader.iter() {
351            match chunk.unwrap() {
352                Column::Int64(vals) => out.extend(vals),
353                _ => panic!("expected int64"),
354            }
355        }
356        assert_eq!(out, vec![1, 2, 3, 4, 5]);
357    }
358
359    #[test]
360    fn checksum_failure_detected() {
361        let dir = tempdir().unwrap();
362        let path = dir.path().join("seg_bad.alx");
363        let meta = SegmentMeta {
364            logical_type: LogicalType::Int64,
365            encoding: Encoding::Plain,
366            compression: Compression::None,
367            chunk_rows: 3,
368            chunk_checksum: true,
369        };
370        let col = Column::Int64(vec![10, 20, 30]);
371        write_segment(&path, &col, &meta).unwrap();
372
373        // Corrupt a byte in the payload of the first chunk (after header + row_count + len).
374        let mut bytes = std::fs::read(&path).unwrap();
375        let header_len = 4 + 2 + 3 + 4 + 1 + 4; // magic + version + kind + chunk_rows + flag + total_rows
376        let payload_len =
377            u32::from_le_bytes(bytes[header_len + 4..header_len + 8].try_into().unwrap()) as usize;
378        let payload_start = header_len + 8;
379        if payload_len > 0 {
380            bytes[payload_start] ^= 0xAA;
381        } else {
382            // fallback: flip checksum byte
383            bytes[payload_start + payload_len] ^= 0xAA;
384        }
385        std::fs::write(&path, &bytes).unwrap();
386
387        let mut reader = SegmentReader::open(&path).unwrap();
388        let err = reader.iter().next().unwrap().unwrap_err();
389        assert!(matches!(err, ColumnarError::CorruptedSegment { .. }));
390    }
391
392    #[test]
393    fn chunk_rows_over_limit_is_rejected() {
394        let dir = tempdir().unwrap();
395        let path = dir.path().join("seg_over.alx");
396        let meta = SegmentMeta {
397            logical_type: LogicalType::Int64,
398            encoding: Encoding::Plain,
399            compression: Compression::None,
400            chunk_rows: 2,
401            chunk_checksum: false,
402        };
403        let col = Column::Int64(vec![1, 2, 3, 4]);
404        write_segment(&path, &col, &meta).unwrap();
405
406        // Bump first chunk row count to exceed declared chunk_rows.
407        let mut bytes = std::fs::read(&path).unwrap();
408        let header_len = 4 + 2 + 3 + 4 + 1 + 4;
409        let bad_rows: u32 = 10;
410        bytes[header_len..header_len + 4].copy_from_slice(&bad_rows.to_le_bytes());
411        std::fs::write(&path, &bytes).unwrap();
412
413        let mut reader = SegmentReader::open(&path).unwrap();
414        let err = reader.iter().next().unwrap().unwrap_err();
415        assert!(matches!(err, ColumnarError::CorruptedSegment { .. }));
416    }
417
418    #[test]
419    fn logical_type_byte_mapping_legacy_safe() {
420        // Fixed length below the boundary maps into 4..=254.
421        assert_eq!(logical_to_byte(LogicalType::Fixed(250)).unwrap(), 254);
422        assert_eq!(byte_to_logical(254).unwrap(), LogicalType::Fixed(250));
423
424        // Legacy sentinel 255 is kept for fixed >=251, decoded as clamped 251.
425        assert_eq!(logical_to_byte(LogicalType::Fixed(251)).unwrap(), 255);
426        assert_eq!(byte_to_logical(255).unwrap(), LogicalType::Fixed(251));
427    }
428
429    #[test]
430    fn float32_is_rejected_in_v1_segment() {
431        let dir = tempdir().unwrap();
432        let path = dir.path().join("seg_f32.alx");
433        let meta = SegmentMeta {
434            logical_type: LogicalType::Float32,
435            encoding: Encoding::Plain,
436            compression: Compression::None,
437            chunk_rows: 2,
438            chunk_checksum: false,
439        };
440        let col = Column::Float32(vec![1.0, 2.0]);
441        let err = write_segment(&path, &col, &meta).unwrap_err();
442        assert!(matches!(err, ColumnarError::InvalidFormat(_)));
443    }
444
445    #[test]
446    fn fixed_length_over_250_is_rejected_in_v1_segment() {
447        let dir = tempdir().unwrap();
448        let path = dir.path().join("seg_fixed_over.alx");
449        let meta = SegmentMeta {
450            logical_type: LogicalType::Fixed(300),
451            encoding: Encoding::Plain,
452            compression: Compression::None,
453            chunk_rows: 2,
454            chunk_checksum: false,
455        };
456        let col = Column::Fixed {
457            len: 300,
458            values: vec![vec![0u8; 300]],
459        };
460        let err = write_segment(&path, &col, &meta).unwrap_err();
461        assert!(matches!(err, ColumnarError::InvalidFormat(_)));
462    }
463
464    #[cfg(feature = "compression-lz4")]
465    #[test]
466    fn lz4_dictionary_roundtrip() {
467        let dir = tempdir().unwrap();
468        let path = dir.path().join("seg_dict.alx");
469        let meta = SegmentMeta {
470            logical_type: LogicalType::Binary,
471            encoding: Encoding::Dictionary,
472            compression: Compression::Lz4,
473            chunk_rows: 4,
474            chunk_checksum: false,
475        };
476        let col = Column::Binary(vec![
477            b"aa".to_vec(),
478            b"bb".to_vec(),
479            b"aa".to_vec(),
480            b"cc".to_vec(),
481        ]);
482        write_segment(&path, &col, &meta).unwrap();
483
484        let mut reader = SegmentReader::open(&path).unwrap();
485        let mut out = Vec::new();
486        for chunk in reader.iter() {
487            match chunk.unwrap() {
488                Column::Binary(vals) => out.extend(vals),
489                _ => panic!("expected binary"),
490            }
491        }
492        assert_eq!(
493            out,
494            vec![
495                b"aa".to_vec(),
496                b"bb".to_vec(),
497                b"aa".to_vec(),
498                b"cc".to_vec()
499            ]
500        );
501    }
502}