1use std::fs::File;
3use std::io::{Read, Write};
4use std::path::Path;
5
6use crate::columnar::encoding::{
7 decode_column, encode_column, Column, Compression, Encoding, LogicalType,
8};
9use crate::columnar::error::{ColumnarError, Result};
10use crc32fast::Hasher;
11
12const MAGIC: &[u8] = b"ALXC";
13const VERSION: u16 = 1;
14const MAX_CHUNK_BYTES: usize = 16 * 1024 * 1024; #[derive(Clone, Debug, PartialEq, Eq)]
18pub struct SegmentMeta {
19 pub logical_type: LogicalType,
21 pub encoding: Encoding,
23 pub compression: Compression,
25 pub chunk_rows: usize,
27 pub chunk_checksum: bool,
29}
30
31pub fn write_segment(path: &Path, column: &Column, meta: &SegmentMeta) -> Result<()> {
33 match meta.logical_type {
35 LogicalType::Float32 => {
36 return Err(ColumnarError::InvalidFormat(
37 "segment v1 does not support Float32".into(),
38 ))
39 }
40 LogicalType::Fixed(len) if len >= 251 => {
41 return Err(ColumnarError::InvalidFormat(
42 "segment v1 fixed length must be <= 250".into(),
43 ))
44 }
45 _ => {}
46 }
47
48 let mut file = File::create(path)?;
49
50 file.write_all(MAGIC)?;
52 file.write_all(&VERSION.to_le_bytes())?;
53 let logical_byte = logical_to_byte(meta.logical_type)?;
54 file.write_all(&[
55 logical_byte,
56 encoding_to_byte(meta.encoding),
57 compression_to_byte(meta.compression),
58 ])?;
59 file.write_all(&(meta.chunk_rows as u32).to_le_bytes())?;
60 file.write_all(&[meta.chunk_checksum as u8])?;
61
62 let total_rows = column_len(column);
63 file.write_all(&(total_rows as u32).to_le_bytes())?;
64
65 let mut start = 0;
67 while start < total_rows {
68 let end = usize::min(start + meta.chunk_rows, total_rows);
69 let chunk = slice_column(column, start, end - start)?;
70 let encoded = encode_column(
71 &chunk,
72 meta.encoding,
73 meta.compression,
74 false,
75 meta.logical_type,
76 )?;
77 let mut checksum_bytes = [0u8; 4];
78 if meta.chunk_checksum {
79 let mut hasher = Hasher::new();
80 hasher.update(&encoded);
81 checksum_bytes = hasher.finalize().to_le_bytes();
82 }
83
84 file.write_all(&((end - start) as u32).to_le_bytes())?;
85 file.write_all(&(encoded.len() as u32).to_le_bytes())?;
86 file.write_all(&encoded)?;
87 if meta.chunk_checksum {
88 file.write_all(&checksum_bytes)?;
89 }
90 start = end;
91 }
92 Ok(())
93}
94
95pub struct SegmentReader {
97 meta: SegmentMeta,
98 file: File,
99 remaining_rows: usize,
100}
101
102impl SegmentReader {
103 pub fn open(path: &Path) -> Result<Self> {
105 let mut file = File::open(path)?;
106 let mut magic = [0u8; 4];
107 file.read_exact(&mut magic)?;
108 if magic != MAGIC {
109 return Err(ColumnarError::InvalidFormat("invalid segment magic".into()));
110 }
111 let mut version_bytes = [0u8; 2];
112 file.read_exact(&mut version_bytes)?;
113 let version = u16::from_le_bytes(version_bytes);
114 if version != VERSION {
115 return Err(ColumnarError::InvalidFormat(
116 "unsupported segment version".into(),
117 ));
118 }
119
120 let mut kind = [0u8; 3];
121 file.read_exact(&mut kind)?;
122 let logical_type = byte_to_logical(kind[0])?;
123 let encoding = byte_to_encoding(kind[1])?;
124 let compression = byte_to_compression(kind[2])?;
125
126 let mut chunk_rows_bytes = [0u8; 4];
127 file.read_exact(&mut chunk_rows_bytes)?;
128 let chunk_rows = u32::from_le_bytes(chunk_rows_bytes) as usize;
129
130 let mut checksum_flag = [0u8; 1];
131 file.read_exact(&mut checksum_flag)?;
132 let chunk_checksum = checksum_flag[0] != 0;
133
134 let mut total_rows_bytes = [0u8; 4];
135 file.read_exact(&mut total_rows_bytes)?;
136 let remaining_rows = u32::from_le_bytes(total_rows_bytes) as usize;
137
138 let meta = SegmentMeta {
139 logical_type,
140 encoding,
141 compression,
142 chunk_rows,
143 chunk_checksum,
144 };
145
146 Ok(Self {
147 meta,
148 file,
149 remaining_rows,
150 })
151 }
152
153 pub fn iter(&mut self) -> ChunkIter<'_> {
155 ChunkIter { reader: self }
156 }
157}
158
159pub struct ChunkIter<'a> {
161 reader: &'a mut SegmentReader,
162}
163
164impl<'a> Iterator for ChunkIter<'a> {
165 type Item = Result<Column>;
166
167 fn next(&mut self) -> Option<Self::Item> {
168 if self.reader.remaining_rows == 0 {
169 return None;
170 }
171
172 let mut row_bytes = [0u8; 4];
173 if let Err(e) = self.reader.file.read_exact(&mut row_bytes) {
174 return Some(Err(ColumnarError::InvalidFormat(format!(
175 "chunk rows read failed: {e}"
176 ))));
177 }
178 let rows = u32::from_le_bytes(row_bytes) as usize;
179 if rows == 0 || rows > self.reader.meta.chunk_rows {
180 return Some(Err(ColumnarError::CorruptedSegment {
181 reason: "chunk rows exceed declared limit".into(),
182 }));
183 }
184
185 let mut len_bytes = [0u8; 4];
186 if let Err(e) = self.reader.file.read_exact(&mut len_bytes) {
187 return Some(Err(ColumnarError::InvalidFormat(format!(
188 "chunk length read failed: {e}"
189 ))));
190 }
191 let len = u32::from_le_bytes(len_bytes) as usize;
192 if len > MAX_CHUNK_BYTES {
193 return Some(Err(ColumnarError::CorruptedSegment {
194 reason: "chunk encoded size exceeds limit".into(),
195 }));
196 }
197
198 let mut buf = vec![0u8; len];
199 if let Err(e) = self.reader.file.read_exact(&mut buf) {
200 return Some(Err(ColumnarError::InvalidFormat(format!(
201 "chunk payload read failed: {e}"
202 ))));
203 }
204
205 if self.reader.meta.chunk_checksum {
206 let mut crc_bytes = [0u8; 4];
207 if let Err(e) = self.reader.file.read_exact(&mut crc_bytes) {
208 return Some(Err(ColumnarError::InvalidFormat(format!(
209 "chunk checksum read failed: {e}"
210 ))));
211 }
212 let expected = u32::from_le_bytes(crc_bytes);
213 let mut hasher = Hasher::new();
214 hasher.update(&buf);
215 let computed = hasher.finalize();
216 if expected != computed {
217 return Some(Err(ColumnarError::CorruptedSegment {
218 reason: "checksum mismatch".into(),
219 }));
220 }
221 }
222
223 self.reader.remaining_rows = self.reader.remaining_rows.saturating_sub(rows);
224 Some(decode_column(
225 &buf,
226 self.reader.meta.logical_type,
227 self.reader.meta.encoding,
228 self.reader.meta.compression,
229 false,
230 ))
231 }
232}
233
234fn logical_to_byte(logical: LogicalType) -> Result<u8> {
235 match logical {
236 LogicalType::Int64 => Ok(0),
237 LogicalType::Float64 => Ok(1),
238 LogicalType::Bool => Ok(2),
239 LogicalType::Binary => Ok(3),
240 LogicalType::Float32 => Err(ColumnarError::InvalidFormat(
241 "segment v1 does not support Float32".into(),
242 )),
243 LogicalType::Fixed(len) => {
244 if len >= 251 {
247 Ok(255)
248 } else {
249 Ok(4 + (len as u8))
250 }
251 }
252 }
253}
254
255fn byte_to_logical(byte: u8) -> Result<LogicalType> {
256 match byte {
257 0 => Ok(LogicalType::Int64),
258 1 => Ok(LogicalType::Float64),
259 2 => Ok(LogicalType::Bool),
260 3 => Ok(LogicalType::Binary),
261 255 => Ok(LogicalType::Fixed(251)),
262 b if b >= 4 => Ok(LogicalType::Fixed((b - 4) as u16)),
263 _ => Err(ColumnarError::InvalidFormat("unknown logical type".into())),
264 }
265}
266
267fn encoding_to_byte(enc: Encoding) -> u8 {
268 match enc {
269 Encoding::Plain => 0,
270 Encoding::Dictionary => 1,
271 Encoding::Rle => 2,
272 Encoding::Bitpack => 3,
273 }
274}
275
276fn byte_to_encoding(byte: u8) -> Result<Encoding> {
277 match byte {
278 0 => Ok(Encoding::Plain),
279 1 => Ok(Encoding::Dictionary),
280 2 => Ok(Encoding::Rle),
281 3 => Ok(Encoding::Bitpack),
282 _ => Err(ColumnarError::InvalidFormat("unknown encoding".into())),
283 }
284}
285
286fn compression_to_byte(comp: Compression) -> u8 {
287 match comp {
288 Compression::None => 0,
289 Compression::Lz4 => 1,
290 }
291}
292
293fn byte_to_compression(byte: u8) -> Result<Compression> {
294 match byte {
295 0 => Ok(Compression::None),
296 1 => Ok(Compression::Lz4),
297 _ => Err(ColumnarError::InvalidFormat("unknown compression".into())),
298 }
299}
300
301fn column_len(column: &Column) -> usize {
302 match column {
303 Column::Int64(v) => v.len(),
304 Column::Float32(v) => v.len(),
305 Column::Float64(v) => v.len(),
306 Column::Bool(v) => v.len(),
307 Column::Binary(v) => v.len(),
308 Column::Fixed { values, .. } => values.len(),
309 }
310}
311
312fn slice_column(column: &Column, start: usize, len: usize) -> Result<Column> {
313 match column {
314 Column::Int64(v) => Ok(Column::Int64(v[start..start + len].to_vec())),
315 Column::Float32(v) => Ok(Column::Float32(v[start..start + len].to_vec())),
316 Column::Float64(v) => Ok(Column::Float64(v[start..start + len].to_vec())),
317 Column::Bool(v) => Ok(Column::Bool(v[start..start + len].to_vec())),
318 Column::Binary(v) => Ok(Column::Binary(v[start..start + len].to_vec())),
319 Column::Fixed {
320 len: fixed_len,
321 values,
322 } => Ok(Column::Fixed {
323 len: *fixed_len,
324 values: values[start..start + len].to_vec(),
325 }),
326 }
327}
328
329#[cfg(all(test, not(target_arch = "wasm32")))]
330mod tests {
331 use super::*;
332 use tempfile::tempdir;
333
334 #[test]
335 fn segment_roundtrip_plain_int64() {
336 let dir = tempdir().unwrap();
337 let path = dir.path().join("seg.alx");
338 let meta = SegmentMeta {
339 logical_type: LogicalType::Int64,
340 encoding: Encoding::Plain,
341 compression: Compression::None,
342 chunk_rows: 2,
343 chunk_checksum: true,
344 };
345 let col = Column::Int64(vec![1, 2, 3, 4, 5]);
346 write_segment(&path, &col, &meta).unwrap();
347
348 let mut reader = SegmentReader::open(&path).unwrap();
349 let mut out = Vec::new();
350 for chunk in reader.iter() {
351 match chunk.unwrap() {
352 Column::Int64(vals) => out.extend(vals),
353 _ => panic!("expected int64"),
354 }
355 }
356 assert_eq!(out, vec![1, 2, 3, 4, 5]);
357 }
358
359 #[test]
360 fn checksum_failure_detected() {
361 let dir = tempdir().unwrap();
362 let path = dir.path().join("seg_bad.alx");
363 let meta = SegmentMeta {
364 logical_type: LogicalType::Int64,
365 encoding: Encoding::Plain,
366 compression: Compression::None,
367 chunk_rows: 3,
368 chunk_checksum: true,
369 };
370 let col = Column::Int64(vec![10, 20, 30]);
371 write_segment(&path, &col, &meta).unwrap();
372
373 let mut bytes = std::fs::read(&path).unwrap();
375 let header_len = 4 + 2 + 3 + 4 + 1 + 4; let payload_len =
377 u32::from_le_bytes(bytes[header_len + 4..header_len + 8].try_into().unwrap()) as usize;
378 let payload_start = header_len + 8;
379 if payload_len > 0 {
380 bytes[payload_start] ^= 0xAA;
381 } else {
382 bytes[payload_start + payload_len] ^= 0xAA;
384 }
385 std::fs::write(&path, &bytes).unwrap();
386
387 let mut reader = SegmentReader::open(&path).unwrap();
388 let err = reader.iter().next().unwrap().unwrap_err();
389 assert!(matches!(err, ColumnarError::CorruptedSegment { .. }));
390 }
391
392 #[test]
393 fn chunk_rows_over_limit_is_rejected() {
394 let dir = tempdir().unwrap();
395 let path = dir.path().join("seg_over.alx");
396 let meta = SegmentMeta {
397 logical_type: LogicalType::Int64,
398 encoding: Encoding::Plain,
399 compression: Compression::None,
400 chunk_rows: 2,
401 chunk_checksum: false,
402 };
403 let col = Column::Int64(vec![1, 2, 3, 4]);
404 write_segment(&path, &col, &meta).unwrap();
405
406 let mut bytes = std::fs::read(&path).unwrap();
408 let header_len = 4 + 2 + 3 + 4 + 1 + 4;
409 let bad_rows: u32 = 10;
410 bytes[header_len..header_len + 4].copy_from_slice(&bad_rows.to_le_bytes());
411 std::fs::write(&path, &bytes).unwrap();
412
413 let mut reader = SegmentReader::open(&path).unwrap();
414 let err = reader.iter().next().unwrap().unwrap_err();
415 assert!(matches!(err, ColumnarError::CorruptedSegment { .. }));
416 }
417
418 #[test]
419 fn logical_type_byte_mapping_legacy_safe() {
420 assert_eq!(logical_to_byte(LogicalType::Fixed(250)).unwrap(), 254);
422 assert_eq!(byte_to_logical(254).unwrap(), LogicalType::Fixed(250));
423
424 assert_eq!(logical_to_byte(LogicalType::Fixed(251)).unwrap(), 255);
426 assert_eq!(byte_to_logical(255).unwrap(), LogicalType::Fixed(251));
427 }
428
429 #[test]
430 fn float32_is_rejected_in_v1_segment() {
431 let dir = tempdir().unwrap();
432 let path = dir.path().join("seg_f32.alx");
433 let meta = SegmentMeta {
434 logical_type: LogicalType::Float32,
435 encoding: Encoding::Plain,
436 compression: Compression::None,
437 chunk_rows: 2,
438 chunk_checksum: false,
439 };
440 let col = Column::Float32(vec![1.0, 2.0]);
441 let err = write_segment(&path, &col, &meta).unwrap_err();
442 assert!(matches!(err, ColumnarError::InvalidFormat(_)));
443 }
444
445 #[test]
446 fn fixed_length_over_250_is_rejected_in_v1_segment() {
447 let dir = tempdir().unwrap();
448 let path = dir.path().join("seg_fixed_over.alx");
449 let meta = SegmentMeta {
450 logical_type: LogicalType::Fixed(300),
451 encoding: Encoding::Plain,
452 compression: Compression::None,
453 chunk_rows: 2,
454 chunk_checksum: false,
455 };
456 let col = Column::Fixed {
457 len: 300,
458 values: vec![vec![0u8; 300]],
459 };
460 let err = write_segment(&path, &col, &meta).unwrap_err();
461 assert!(matches!(err, ColumnarError::InvalidFormat(_)));
462 }
463
464 #[cfg(feature = "compression-lz4")]
465 #[test]
466 fn lz4_dictionary_roundtrip() {
467 let dir = tempdir().unwrap();
468 let path = dir.path().join("seg_dict.alx");
469 let meta = SegmentMeta {
470 logical_type: LogicalType::Binary,
471 encoding: Encoding::Dictionary,
472 compression: Compression::Lz4,
473 chunk_rows: 4,
474 chunk_checksum: false,
475 };
476 let col = Column::Binary(vec![
477 b"aa".to_vec(),
478 b"bb".to_vec(),
479 b"aa".to_vec(),
480 b"cc".to_vec(),
481 ]);
482 write_segment(&path, &col, &meta).unwrap();
483
484 let mut reader = SegmentReader::open(&path).unwrap();
485 let mut out = Vec::new();
486 for chunk in reader.iter() {
487 match chunk.unwrap() {
488 Column::Binary(vals) => out.extend(vals),
489 _ => panic!("expected binary"),
490 }
491 }
492 assert_eq!(
493 out,
494 vec![
495 b"aa".to_vec(),
496 b"bb".to_vec(),
497 b"aa".to_vec(),
498 b"cc".to_vec()
499 ]
500 );
501 }
502}