1use std::convert::TryInto;
3
4use crc32fast::Hasher;
5use serde::{Deserialize, Serialize};
6
7use crate::columnar::error::{ColumnarError, Result};
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
11pub enum LogicalType {
12 Int64,
14 Float32,
16 Float64,
18 Bool,
20 Binary,
22 Fixed(u16),
24}
25
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub enum Encoding {
29 Plain,
31 Dictionary,
33 Rle,
35 Bitpack,
37}
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
41pub enum Compression {
42 None,
44 Lz4,
46}
47
48#[derive(Debug, Clone, PartialEq)]
50pub enum Column {
51 Int64(Vec<i64>),
53 Float32(Vec<f32>),
55 Float64(Vec<f64>),
57 Bool(Vec<bool>),
59 Binary(Vec<Vec<u8>>),
61 Fixed {
63 len: usize,
65 values: Vec<Vec<u8>>,
67 },
68}
69
70pub fn encode_column(
74 column: &Column,
75 encoding: Encoding,
76 compression: Compression,
77 checksum: bool,
78 logical_type: LogicalType,
79) -> Result<Vec<u8>> {
80 validate_logical(column, logical_type)?;
81 let mut payload = match encoding {
82 Encoding::Plain => encode_plain(column)?,
83 Encoding::Dictionary => encode_dictionary(column)?,
84 Encoding::Rle => encode_rle(column)?,
85 Encoding::Bitpack => encode_bitpack(column)?,
86 };
87
88 if let Compression::Lz4 = compression {
89 #[cfg(feature = "compression-lz4")]
90 {
91 let orig_len: u32 =
92 payload
93 .len()
94 .try_into()
95 .map_err(|_| ColumnarError::CorruptedSegment {
96 reason: "payload too large for lz4".into(),
97 })?;
98 let compressed = lz4::block::compress(&payload, None, false).map_err(|e| {
99 ColumnarError::CorruptedSegment {
100 reason: e.to_string(),
101 }
102 })?;
103 let mut buf = Vec::with_capacity(4 + compressed.len());
104 buf.extend_from_slice(&orig_len.to_le_bytes());
105 buf.extend_from_slice(&compressed);
106 payload = buf;
107 }
108 #[cfg(not(feature = "compression-lz4"))]
109 {
110 return Err(ColumnarError::CorruptedSegment {
111 reason: "lz4 compression is disabled (feature compression-lz4)".into(),
112 });
113 }
114 }
115
116 if checksum {
117 let mut hasher = Hasher::new();
118 hasher.update(&payload);
119 let crc = hasher.finalize();
120 payload.extend_from_slice(&crc.to_le_bytes());
121 }
122
123 Ok(payload)
124}
125
126pub fn decode_column(
128 bytes: &[u8],
129 logical_type: LogicalType,
130 encoding: Encoding,
131 compression: Compression,
132 checksum: bool,
133) -> Result<Column> {
134 let data = if checksum {
135 if bytes.len() < 4 {
136 return Err(ColumnarError::CorruptedSegment {
137 reason: "checksum missing".into(),
138 });
139 }
140 let (content, crc_bytes) = bytes.split_at(bytes.len() - 4);
141 let expected = u32::from_le_bytes(crc_bytes.try_into().unwrap());
142 let mut hasher = Hasher::new();
143 hasher.update(content);
144 let computed = hasher.finalize();
145 if expected != computed {
146 return Err(ColumnarError::ChecksumMismatch);
147 }
148 content
149 } else {
150 bytes
151 };
152
153 let decompressed = match compression {
154 Compression::None => data.to_vec(),
155 Compression::Lz4 => {
156 #[cfg(feature = "compression-lz4")]
157 {
158 if data.len() < 4 {
159 return Err(ColumnarError::CorruptedSegment {
160 reason: "lz4 header too short".into(),
161 });
162 }
163 let orig_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as i32;
164 lz4::block::decompress(&data[4..], Some(orig_len)).map_err(|e| {
165 ColumnarError::CorruptedSegment {
166 reason: e.to_string(),
167 }
168 })?
169 }
170 #[cfg(not(feature = "compression-lz4"))]
171 {
172 return Err(ColumnarError::CorruptedSegment {
173 reason: "lz4 compression is disabled (feature compression-lz4)".into(),
174 });
175 }
176 }
177 };
178
179 match encoding {
180 Encoding::Plain => decode_plain(&decompressed, logical_type),
181 Encoding::Dictionary => decode_dictionary(&decompressed, logical_type),
182 Encoding::Rle => decode_rle(&decompressed, logical_type),
183 Encoding::Bitpack => decode_bitpack(&decompressed, logical_type),
184 }
185}
186
187fn validate_logical(column: &Column, logical: LogicalType) -> Result<()> {
188 match (column, logical) {
189 (Column::Int64(_), LogicalType::Int64)
190 | (Column::Float32(_), LogicalType::Float32)
191 | (Column::Float64(_), LogicalType::Float64)
192 | (Column::Bool(_), LogicalType::Bool)
193 | (Column::Binary(_), LogicalType::Binary) => Ok(()),
194 (Column::Fixed { len, .. }, LogicalType::Fixed(flen)) if *len == flen as usize => Ok(()),
195 (_, LogicalType::Fixed(_)) => Err(ColumnarError::CorruptedSegment {
196 reason: "fixed length mismatch".into(),
197 }),
198 _ => Err(ColumnarError::CorruptedSegment {
199 reason: "logical type mismatch".into(),
200 }),
201 }
202}
203
204fn encode_plain(column: &Column) -> Result<Vec<u8>> {
205 match column {
206 Column::Int64(values) => {
207 let mut buf = Vec::with_capacity(4 + values.len() * 8);
208 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
209 for v in values {
210 buf.extend_from_slice(&v.to_le_bytes());
211 }
212 Ok(buf)
213 }
214 Column::Float32(values) => {
215 let mut buf = Vec::with_capacity(4 + values.len() * 4);
216 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
217 for v in values {
218 buf.extend_from_slice(&v.to_le_bytes());
219 }
220 Ok(buf)
221 }
222 Column::Float64(values) => {
223 let mut buf = Vec::with_capacity(4 + values.len() * 8);
224 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
225 for v in values {
226 buf.extend_from_slice(&v.to_le_bytes());
227 }
228 Ok(buf)
229 }
230 Column::Bool(values) => {
231 let mut buf = Vec::with_capacity(4 + values.len());
232 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
233 for v in values {
234 buf.push(*v as u8);
235 }
236 Ok(buf)
237 }
238 Column::Binary(values) => encode_varlen(values),
239 Column::Fixed { len, values } => {
240 for v in values {
241 if v.len() != *len {
242 return Err(ColumnarError::CorruptedSegment {
243 reason: "fixed value length mismatch".into(),
244 });
245 }
246 }
247 let mut buf = Vec::with_capacity(6 + values.len() * *len);
248 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
249 buf.extend_from_slice(&(*len as u16).to_le_bytes());
250 for v in values {
251 buf.extend_from_slice(v);
252 }
253 Ok(buf)
254 }
255 }
256}
257
258fn encode_varlen(values: &[Vec<u8>]) -> Result<Vec<u8>> {
259 let mut buf = Vec::new();
260 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
261 for v in values {
262 let len: u32 = v
263 .len()
264 .try_into()
265 .map_err(|_| ColumnarError::CorruptedSegment {
266 reason: "value too long".into(),
267 })?;
268 buf.extend_from_slice(&len.to_le_bytes());
269 buf.extend_from_slice(v);
270 }
271 Ok(buf)
272}
273
274fn decode_plain(bytes: &[u8], logical: LogicalType) -> Result<Column> {
275 if bytes.len() < 4 {
276 return Err(ColumnarError::CorruptedSegment {
277 reason: "plain header too short".into(),
278 });
279 }
280 let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
281 let mut pos = 4;
282 match logical {
283 LogicalType::Int64 => {
284 if bytes.len() < pos + count * 8 {
285 return Err(ColumnarError::CorruptedSegment {
286 reason: "plain int64 truncated".into(),
287 });
288 }
289 let mut out = Vec::with_capacity(count);
290 for _ in 0..count {
291 let v = i64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
292 out.push(v);
293 pos += 8;
294 }
295 Ok(Column::Int64(out))
296 }
297 LogicalType::Float32 => {
298 if bytes.len() < pos + count * 4 {
299 return Err(ColumnarError::CorruptedSegment {
300 reason: "plain float32 truncated".into(),
301 });
302 }
303 let mut out = Vec::with_capacity(count);
304 for _ in 0..count {
305 let v = f32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
306 out.push(v);
307 pos += 4;
308 }
309 Ok(Column::Float32(out))
310 }
311 LogicalType::Float64 => {
312 if bytes.len() < pos + count * 8 {
313 return Err(ColumnarError::CorruptedSegment {
314 reason: "plain float64 truncated".into(),
315 });
316 }
317 let mut out = Vec::with_capacity(count);
318 for _ in 0..count {
319 let v = f64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
320 out.push(v);
321 pos += 8;
322 }
323 Ok(Column::Float64(out))
324 }
325 LogicalType::Bool => {
326 if bytes.len() < pos + count {
327 return Err(ColumnarError::CorruptedSegment {
328 reason: "plain bool truncated".into(),
329 });
330 }
331 let mut out = Vec::with_capacity(count);
332 for _ in 0..count {
333 out.push(bytes[pos] != 0);
334 pos += 1;
335 }
336 Ok(Column::Bool(out))
337 }
338 LogicalType::Binary => decode_varlen(&bytes[4..], count).map(Column::Binary),
339 LogicalType::Fixed(len) => {
340 if bytes.len() < pos + 2 {
341 return Err(ColumnarError::CorruptedSegment {
342 reason: "fixed header truncated".into(),
343 });
344 }
345 let stored_len = u16::from_le_bytes(bytes[pos..pos + 2].try_into().unwrap()) as usize;
346 pos += 2;
347 if stored_len as u16 != len {
348 return Err(ColumnarError::CorruptedSegment {
349 reason: "fixed length mismatch".into(),
350 });
351 }
352 let expected = pos + count * stored_len;
353 if bytes.len() < expected {
354 return Err(ColumnarError::CorruptedSegment {
355 reason: "fixed values truncated".into(),
356 });
357 }
358 let mut values = Vec::with_capacity(count);
359 for _ in 0..count {
360 let end = pos + stored_len;
361 values.push(bytes[pos..end].to_vec());
362 pos = end;
363 }
364 Ok(Column::Fixed {
365 len: stored_len,
366 values,
367 })
368 }
369 }
370}
371
372fn decode_varlen(bytes: &[u8], count: usize) -> Result<Vec<Vec<u8>>> {
373 let mut pos = 0;
374 let mut values = Vec::with_capacity(count);
375 for _ in 0..count {
376 if pos + 4 > bytes.len() {
377 return Err(ColumnarError::CorruptedSegment {
378 reason: "varlen length truncated".into(),
379 });
380 }
381 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
382 pos += 4;
383 if pos + len > bytes.len() {
384 return Err(ColumnarError::CorruptedSegment {
385 reason: "varlen value truncated".into(),
386 });
387 }
388 values.push(bytes[pos..pos + len].to_vec());
389 pos += len;
390 }
391 Ok(values)
392}
393
394fn encode_dictionary(column: &Column) -> Result<Vec<u8>> {
395 let values = match column {
396 Column::Binary(v) => v,
397 Column::Fixed { values, .. } => values,
398 _ => {
399 return Err(ColumnarError::CorruptedSegment {
400 reason: "dictionary encoding requires binary data".into(),
401 })
402 }
403 };
404
405 let mut dict: Vec<Vec<u8>> = Vec::new();
406 let mut indices = Vec::with_capacity(values.len());
407 for v in values {
408 if let Some((idx, _)) = dict.iter().enumerate().find(|(_, existing)| *existing == v) {
409 indices.push(idx as u32);
410 } else {
411 let idx = dict.len() as u32;
412 dict.push(v.clone());
413 indices.push(idx);
414 }
415 }
416
417 let mut buf = Vec::new();
418 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
419 buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
420 for entry in &dict {
421 let len: u32 = entry
422 .len()
423 .try_into()
424 .map_err(|_| ColumnarError::CorruptedSegment {
425 reason: "dict entry too long".into(),
426 })?;
427 buf.extend_from_slice(&len.to_le_bytes());
428 buf.extend_from_slice(entry);
429 }
430 for idx in indices {
431 buf.extend_from_slice(&idx.to_le_bytes());
432 }
433 Ok(buf)
434}
435
436fn decode_dictionary(bytes: &[u8], logical: LogicalType) -> Result<Column> {
437 if bytes.len() < 8 {
438 return Err(ColumnarError::CorruptedSegment {
439 reason: "dictionary header too short".into(),
440 });
441 }
442 let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
443 let dict_count = u32::from_le_bytes(bytes[4..8].try_into().unwrap()) as usize;
444
445 let mut pos = 8;
446 let mut dict = Vec::with_capacity(dict_count);
447 for _ in 0..dict_count {
448 if pos + 4 > bytes.len() {
449 return Err(ColumnarError::CorruptedSegment {
450 reason: "dict length truncated".into(),
451 });
452 }
453 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
454 pos += 4;
455 if pos + len > bytes.len() {
456 return Err(ColumnarError::CorruptedSegment {
457 reason: "dict entry truncated".into(),
458 });
459 }
460 dict.push(bytes[pos..pos + len].to_vec());
461 pos += len;
462 }
463
464 let expected_idx_bytes =
465 count
466 .checked_mul(4)
467 .ok_or_else(|| ColumnarError::CorruptedSegment {
468 reason: "index overflow".into(),
469 })?;
470 if pos + expected_idx_bytes > bytes.len() {
471 return Err(ColumnarError::CorruptedSegment {
472 reason: "dictionary indices truncated".into(),
473 });
474 }
475
476 let mut values = Vec::with_capacity(count);
477 for _ in 0..count {
478 let idx = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
479 pos += 4;
480 let entry = dict
481 .get(idx)
482 .ok_or_else(|| ColumnarError::CorruptedSegment {
483 reason: "dictionary index out of bounds".into(),
484 })?;
485 values.push(entry.clone());
486 }
487
488 match logical {
489 LogicalType::Binary => Ok(Column::Binary(values)),
490 LogicalType::Fixed(len) => {
491 for v in &values {
492 if v.len() != len as usize {
493 return Err(ColumnarError::CorruptedSegment {
494 reason: "fixed length mismatch".into(),
495 });
496 }
497 }
498 Ok(Column::Fixed {
499 len: len as usize,
500 values,
501 })
502 }
503 _ => Err(ColumnarError::CorruptedSegment {
504 reason: "dictionary logical mismatch".into(),
505 }),
506 }
507}
508
509fn encode_rle(column: &Column) -> Result<Vec<u8>> {
510 match column {
511 Column::Int64(values) => {
512 encode_rle_nums(values.iter().map(|v| v.to_le_bytes().to_vec()), 8)
513 }
514 Column::Float32(values) => {
515 encode_rle_nums(values.iter().map(|v| v.to_le_bytes().to_vec()), 4)
516 }
517 Column::Float64(values) => {
518 encode_rle_nums(values.iter().map(|v| v.to_le_bytes().to_vec()), 8)
519 }
520 Column::Bool(values) => {
521 let mut runs = Vec::new();
522 let mut iter = values.iter().copied();
523 if let Some(mut current) = iter.next() {
524 let mut len = 1u32;
525 for v in iter {
526 if v == current && len < u32::MAX {
527 len += 1;
528 } else {
529 runs.push((current as u8, len));
530 current = v;
531 len = 1;
532 }
533 }
534 runs.push((current as u8, len));
535 }
536 let mut buf = Vec::new();
537 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
538 buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
539 for (val, len) in runs {
540 buf.push(val);
541 buf.extend_from_slice(&len.to_le_bytes());
542 }
543 Ok(buf)
544 }
545 _ => Err(ColumnarError::CorruptedSegment {
546 reason: "rle only supports numeric/bool".into(),
547 }),
548 }
549}
550
551fn encode_rle_nums<I>(iter: I, width: usize) -> Result<Vec<u8>>
552where
553 I: Iterator<Item = Vec<u8>>,
554{
555 let mut runs: Vec<(Vec<u8>, u32)> = Vec::new();
556 let mut it = iter.peekable();
557 if let Some(first) = it.next() {
558 let mut current = first;
559 let mut len = 1u32;
560 for v in it {
561 if v == current && len < u32::MAX {
562 len += 1;
563 } else {
564 runs.push((current, len));
565 current = v;
566 len = 1;
567 }
568 }
569 runs.push((current, len));
570 }
571
572 let mut buf = Vec::new();
573 let total: u32 = runs.iter().map(|(_, l)| *l).sum();
574 buf.extend_from_slice(&total.to_le_bytes());
575 buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
576 for (val, len) in runs {
577 if val.len() != width {
578 return Err(ColumnarError::CorruptedSegment {
579 reason: "rle width mismatch".into(),
580 });
581 }
582 buf.extend_from_slice(&val);
583 buf.extend_from_slice(&len.to_le_bytes());
584 }
585 Ok(buf)
586}
587
588fn decode_rle(bytes: &[u8], logical: LogicalType) -> Result<Column> {
589 if bytes.len() < 8 {
590 return Err(ColumnarError::CorruptedSegment {
591 reason: "rle header too short".into(),
592 });
593 }
594 let total = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
595 let run_count = u32::from_le_bytes(bytes[4..8].try_into().unwrap()) as usize;
596 let mut pos = 8;
597
598 match logical {
599 LogicalType::Int64 | LogicalType::Float64 | LogicalType::Float32 => {
600 let width = if matches!(logical, LogicalType::Float32) {
601 4
602 } else {
603 8
604 };
605 let mut out: Vec<Vec<u8>> = Vec::with_capacity(run_count);
606 let mut lengths = Vec::with_capacity(run_count);
607 for _ in 0..run_count {
608 if pos + width + 4 > bytes.len() {
609 return Err(ColumnarError::CorruptedSegment {
610 reason: "rle numeric truncated".into(),
611 });
612 }
613 out.push(bytes[pos..pos + width].to_vec());
614 pos += width;
615 lengths.push(u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize);
616 pos += 4;
617 }
618 let mut values = Vec::with_capacity(total);
619 for (val_bytes, len) in out.into_iter().zip(lengths) {
620 for _ in 0..len {
621 let v = match logical {
622 LogicalType::Int64 => {
623 let val_arr: [u8; 8] = val_bytes.as_slice().try_into().unwrap();
624 ColumnValue::I64(i64::from_le_bytes(val_arr))
625 }
626 LogicalType::Float64 => {
627 let val_arr: [u8; 8] = val_bytes.as_slice().try_into().unwrap();
628 ColumnValue::F64(f64::from_le_bytes(val_arr))
629 }
630 LogicalType::Float32 => {
631 let val_arr: [u8; 4] = val_bytes.as_slice().try_into().unwrap();
632 ColumnValue::F32(f32::from_le_bytes(val_arr))
633 }
634 _ => unreachable!(),
635 };
636 values.push(v);
637 }
638 }
639 match logical {
640 LogicalType::Int64 => Ok(Column::Int64(
641 values
642 .into_iter()
643 .map(|v| match v {
644 ColumnValue::I64(x) => x,
645 _ => unreachable!(),
646 })
647 .collect(),
648 )),
649 LogicalType::Float32 => Ok(Column::Float32(
650 values
651 .into_iter()
652 .map(|v| match v {
653 ColumnValue::F32(x) => x,
654 _ => unreachable!(),
655 })
656 .collect(),
657 )),
658 LogicalType::Float64 => Ok(Column::Float64(
659 values
660 .into_iter()
661 .map(|v| match v {
662 ColumnValue::F64(x) => x,
663 _ => unreachable!(),
664 })
665 .collect(),
666 )),
667 _ => unreachable!(),
668 }
669 }
670 LogicalType::Bool => {
671 let mut runs = Vec::with_capacity(run_count);
672 for _ in 0..run_count {
673 if pos + 5 > bytes.len() {
674 return Err(ColumnarError::CorruptedSegment {
675 reason: "rle bool truncated".into(),
676 });
677 }
678 let val = bytes[pos] != 0;
679 pos += 1;
680 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
681 pos += 4;
682 runs.push((val, len));
683 }
684 let mut out = Vec::with_capacity(total);
685 for (val, len) in runs {
686 out.extend(std::iter::repeat_n(val, len));
687 }
688 Ok(Column::Bool(out))
689 }
690 _ => Err(ColumnarError::CorruptedSegment {
691 reason: "rle logical mismatch".into(),
692 }),
693 }
694}
695
696enum ColumnValue {
697 I64(i64),
698 F32(f32),
699 F64(f64),
700}
701
702fn encode_bitpack(column: &Column) -> Result<Vec<u8>> {
703 let values = match column {
704 Column::Bool(v) => v,
705 _ => {
706 return Err(ColumnarError::CorruptedSegment {
707 reason: "bitpack supports bool only".into(),
708 })
709 }
710 };
711 let count = values.len();
712 let mut buf = Vec::with_capacity(4 + count.div_ceil(8));
713 buf.extend_from_slice(&(count as u32).to_le_bytes());
714 let mut current = 0u8;
715 let mut bit = 0;
716 for v in values {
717 if *v {
718 current |= 1 << bit;
719 }
720 bit += 1;
721 if bit == 8 {
722 buf.push(current);
723 current = 0;
724 bit = 0;
725 }
726 }
727 if bit > 0 {
728 buf.push(current);
729 }
730 Ok(buf)
731}
732
733fn decode_bitpack(bytes: &[u8], logical: LogicalType) -> Result<Column> {
734 if logical != LogicalType::Bool {
735 return Err(ColumnarError::CorruptedSegment {
736 reason: "bitpack logical mismatch".into(),
737 });
738 }
739 if bytes.len() < 4 {
740 return Err(ColumnarError::CorruptedSegment {
741 reason: "bitpack header too short".into(),
742 });
743 }
744 let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
745 let needed = 4 + count.div_ceil(8);
746 if bytes.len() < needed {
747 return Err(ColumnarError::CorruptedSegment {
748 reason: "bitpack data truncated".into(),
749 });
750 }
751 let mut out = Vec::with_capacity(count);
752 for i in 0..count {
753 let byte = bytes[4 + (i / 8)];
754 let bit = i % 8;
755 out.push(byte & (1 << bit) != 0);
756 }
757 Ok(Column::Bool(out))
758}
759
760#[cfg(all(test, not(target_arch = "wasm32")))]
761mod tests {
762 use super::*;
763
764 #[test]
765 fn plain_int64_roundtrip() {
766 let col = Column::Int64(vec![1, -2, 3]);
767 let encoded = encode_column(
768 &col,
769 Encoding::Plain,
770 Compression::None,
771 true,
772 LogicalType::Int64,
773 )
774 .unwrap();
775 let decoded = decode_column(
776 &encoded,
777 LogicalType::Int64,
778 Encoding::Plain,
779 Compression::None,
780 true,
781 )
782 .unwrap();
783 assert_eq!(col, decoded);
784 }
785
786 #[cfg(feature = "compression-lz4")]
787 #[test]
788 fn dictionary_binary_roundtrip_lz4() {
789 let col = Column::Binary(vec![b"aa".to_vec(), b"bb".to_vec(), b"aa".to_vec()]);
790 let encoded = encode_column(
791 &col,
792 Encoding::Dictionary,
793 Compression::Lz4,
794 true,
795 LogicalType::Binary,
796 )
797 .unwrap();
798 let decoded = decode_column(
799 &encoded,
800 LogicalType::Binary,
801 Encoding::Dictionary,
802 Compression::Lz4,
803 true,
804 )
805 .unwrap();
806 assert_eq!(col, decoded);
807 }
808
809 #[test]
810 fn rle_bool_roundtrip() {
811 let col = Column::Bool(vec![true, true, true, false, false, true]);
812 let encoded = encode_column(
813 &col,
814 Encoding::Rle,
815 Compression::None,
816 false,
817 LogicalType::Bool,
818 )
819 .unwrap();
820 let decoded = decode_column(
821 &encoded,
822 LogicalType::Bool,
823 Encoding::Rle,
824 Compression::None,
825 false,
826 )
827 .unwrap();
828 assert_eq!(col, decoded);
829 }
830
831 #[test]
832 fn bitpack_bool_roundtrip() {
833 let col = Column::Bool(vec![
834 true, false, true, true, false, false, false, true, true,
835 ]);
836 let encoded = encode_column(
837 &col,
838 Encoding::Bitpack,
839 Compression::None,
840 true,
841 LogicalType::Bool,
842 )
843 .unwrap();
844 let decoded = decode_column(
845 &encoded,
846 LogicalType::Bool,
847 Encoding::Bitpack,
848 Compression::None,
849 true,
850 )
851 .unwrap();
852 assert_eq!(col, decoded);
853 }
854
855 #[test]
856 fn checksum_mismatch_detected() {
857 let col = Column::Int64(vec![42]);
858 let mut encoded = encode_column(
859 &col,
860 Encoding::Plain,
861 Compression::None,
862 true,
863 LogicalType::Int64,
864 )
865 .unwrap();
866 encoded[5] ^= 0xFF; let err = decode_column(
868 &encoded,
869 LogicalType::Int64,
870 Encoding::Plain,
871 Compression::None,
872 true,
873 )
874 .unwrap_err();
875 assert!(matches!(err, ColumnarError::ChecksumMismatch));
876 }
877}