1use arrow::{
2 array::{
3 ArrayDataBuilder, Decimal128Array, Decimal256Array, GenericByteArray, OffsetBufferBuilder,
4 },
5 buffer::{Buffer, OffsetBuffer},
6 datatypes::ByteArrayType,
7};
8use bytes;
9use fsst::{Compressor, Decompressor, Symbol};
10use std::io::Result;
11use std::io::{Error, ErrorKind};
12use std::ops::Range;
13use std::sync::Arc;
14
15use crate::liquid_array::fix_len_byte_array::ArrowFixedLenByteArrayType;
16use crate::liquid_array::{LiquidByteViewArray, SqueezeIoHandler};
17
18mod sealed {
19 pub trait Sealed {}
20}
21
22#[derive(Clone)]
25pub(crate) struct RawFsstBuffer {
26 values: Buffer,
27 uncompressed_bytes: usize,
28}
29
30impl std::fmt::Debug for RawFsstBuffer {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("RawFsstBuffer")
33 .field("values_len", &self.values.len())
34 .field("uncompressed_bytes", &self.uncompressed_bytes)
35 .finish()
36 }
37}
38
39impl RawFsstBuffer {
40 pub(crate) fn from_parts(values: Buffer, uncompressed_bytes: usize) -> Self {
41 Self {
42 values,
43 uncompressed_bytes,
44 }
45 }
46
47 pub(crate) fn from_byte_slices<I, T>(
50 iter: I,
51 compressor: Arc<Compressor>,
52 compress_buffer: &mut Vec<u8>,
53 ) -> (Self, Vec<u32>)
54 where
55 I: Iterator<Item = Option<T>>,
56 T: AsRef<[u8]>,
57 {
58 let mut values_buffer = Vec::new();
59 let mut offsets = Vec::new();
60 let mut uncompressed_len = 0;
61
62 offsets.push(0u32);
63 for item in iter {
64 if let Some(bytes) = item {
65 let bytes = bytes.as_ref();
66 uncompressed_len += bytes.len();
67
68 compress_buffer.clear();
69 compress_buffer.reserve(bytes.len().saturating_mul(2));
72 unsafe {
73 compressor.compress_into(bytes, compress_buffer);
74 }
75
76 values_buffer.extend_from_slice(compress_buffer);
77 }
78 offsets.push(values_buffer.len() as u32);
79 }
80
81 values_buffer.shrink_to_fit();
82 let values_buffer = Buffer::from(values_buffer);
83 let raw_buffer = Self::from_parts(values_buffer, uncompressed_len);
84
85 (raw_buffer, offsets)
86 }
87
88 pub(crate) fn to_uncompressed(
89 &self,
90 decompressor: &Decompressor<'_>,
91 offsets: &[u32],
92 ) -> (Buffer, OffsetBuffer<i32>) {
93 let mut value_buffer: Vec<u8> = Vec::with_capacity(self.uncompressed_bytes + 8);
94 let num_values = offsets.len().saturating_sub(1);
95 let mut out_offsets: OffsetBufferBuilder<i32> = OffsetBufferBuilder::new(num_values);
96
97 for i in 0..num_values {
98 let start_offset = offsets[i];
99 let end_offset = offsets[i + 1];
100
101 if start_offset != end_offset {
102 let compressed_slice = self.get_compressed_slice(start_offset, end_offset);
103 let decompressed_len = decompressor
104 .decompress_into(compressed_slice, value_buffer.spare_capacity_mut());
105
106 let new_len = value_buffer.len() + decompressed_len;
107 debug_assert!(new_len <= value_buffer.capacity());
108 unsafe {
109 value_buffer.set_len(new_len);
110 }
111 out_offsets.push_length(decompressed_len);
112 } else {
113 out_offsets.push_length(0);
114 }
115 }
116
117 let buffer = Buffer::from(value_buffer);
118 (buffer, out_offsets.finish())
119 }
120
121 pub(crate) fn get_compressed_slice(&self, start_offset: u32, end_offset: u32) -> &[u8] {
123 let start = start_offset as usize;
124 let end = end_offset as usize;
125 debug_assert!(end <= self.values.len(), "Offset out of bounds");
126 debug_assert!(start <= end, "Invalid offset range");
127 &self.values.as_slice()[start..end]
128 }
129
130 pub(crate) fn values_len(&self) -> usize {
131 self.values.len()
132 }
133
134 pub(crate) fn get_memory_size(&self) -> usize {
135 self.values.len() + std::mem::size_of::<Self>()
136 }
137
138 pub(crate) fn to_bytes(&self) -> Vec<u8> {
139 let mut buffer = Vec::with_capacity(self.values.len() + 12);
140 buffer.extend_from_slice(&(self.uncompressed_bytes as u64).to_le_bytes());
141 buffer.extend_from_slice(&(self.values.len() as u32).to_le_bytes());
142 buffer.extend_from_slice(self.values.as_slice());
143 buffer
144 }
145
146 pub(crate) fn uncompressed_bytes(&self) -> usize {
147 self.uncompressed_bytes
148 }
149
150 pub(crate) fn from_bytes(bytes: bytes::Bytes) -> Self {
151 let uncompressed_bytes = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize;
152 let values_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
153 let values = bytes.slice(12..12 + values_len);
154 let values = Buffer::from(values);
155 Self::from_parts(values, uncompressed_bytes)
156 }
157}
158
159#[derive(Debug, Clone, Copy)]
161#[repr(C)]
162pub(crate) struct PrefixKey {
163 prefix7: [u8; 7],
164 len: u8,
166}
167
168impl PrefixKey {
169 pub(crate) const fn prefix_len() -> usize {
170 7
171 }
172
173 pub(crate) fn new(suffix_bytes: &[u8]) -> Self {
176 let mut prefix7 = [0u8; 7];
177 let copy_len = std::cmp::min(Self::prefix_len(), suffix_bytes.len());
178 if copy_len > 0 {
179 prefix7[..copy_len].copy_from_slice(&suffix_bytes[..copy_len]);
180 }
181 let len = if suffix_bytes.len() >= 255 {
182 255u8
183 } else {
184 suffix_bytes.len() as u8
185 };
186 Self { prefix7, len }
187 }
188
189 pub(crate) fn from_parts(prefix7: [u8; 7], len: u8) -> Self {
191 Self { prefix7, len }
192 }
193
194 #[inline]
195 pub(crate) fn prefix7(&self) -> &[u8; 7] {
196 &self.prefix7
197 }
198
199 #[inline]
200 pub(crate) fn len_byte(&self) -> u8 {
201 self.len
202 }
203
204 #[cfg(test)]
205 pub(crate) fn known_suffix_len(&self) -> Option<usize> {
206 if self.len == 255 {
207 None
208 } else {
209 Some(self.len as usize)
210 }
211 }
212}
213
214const _: () = if std::mem::size_of::<PrefixKey>() != 8 {
215 panic!("PrefixKey must be 8 bytes")
216};
217
218#[derive(Debug, Clone, Copy)]
219struct CompactOffsetHeader {
220 slope: i32,
221 intercept: i32,
222 offset_bytes: u8, }
224
225#[derive(Debug, Clone)]
226enum OffsetResiduals {
227 One(Arc<[i8]>),
228 Two(Arc<[i16]>),
229 Four(Arc<[i32]>),
230}
231
232impl OffsetResiduals {
233 fn len(&self) -> usize {
234 match self {
235 Self::One(values) => values.len(),
236 Self::Two(values) => values.len(),
237 Self::Four(values) => values.len(),
238 }
239 }
240
241 #[cfg(test)]
242 fn bytes_per(&self) -> usize {
243 match self {
244 Self::One(_) => 1,
245 Self::Two(_) => 2,
246 Self::Four(_) => 4,
247 }
248 }
249
250 fn get_i32(&self, index: usize) -> i32 {
251 match self {
252 Self::One(values) => values[index] as i32,
253 Self::Two(values) => values[index] as i32,
254 Self::Four(values) => values[index],
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
261pub(crate) struct CompactOffsets {
262 header: CompactOffsetHeader,
263 residuals: OffsetResiduals,
264}
265
266fn fit_line(offsets: &[u32]) -> (i32, i32) {
268 let n = offsets.len();
269 if n <= 1 {
270 return (0, offsets.first().copied().unwrap_or(0) as i32);
271 }
272
273 let n_f64 = n as f64;
274
275 let sum_x = (n * (n - 1) / 2) as f64;
277
278 let sum_y: f64 = offsets.iter().map(|&o| o as f64).sum();
280
281 let sum_xy: f64 = offsets
283 .iter()
284 .enumerate()
285 .map(|(i, &o)| i as f64 * o as f64)
286 .sum();
287
288 let sum_x_sq = (n * (n - 1) * (2 * n - 1) / 6) as f64;
290
291 let slope = (n_f64 * sum_xy - sum_x * sum_y) / (n_f64 * sum_x_sq - sum_x * sum_x);
293 let intercept = (sum_y - slope * sum_x) / n_f64;
294
295 (slope.round() as i32, intercept.round() as i32)
296}
297
298impl CompactOffsets {
299 pub(crate) fn from_offsets(offsets: &[u32]) -> Self {
300 if offsets.is_empty() {
301 return Self {
302 header: CompactOffsetHeader {
303 slope: 0,
304 intercept: 0,
305 offset_bytes: 1,
306 },
307 residuals: OffsetResiduals::One(Arc::new([])),
308 };
309 }
310
311 let (slope, intercept) = fit_line(offsets);
312
313 let mut offset_residuals: Vec<i32> = Vec::with_capacity(offsets.len());
314 let mut min_residual = i32::MAX;
315 let mut max_residual = i32::MIN;
316 for (index, &offset) in offsets.iter().enumerate() {
317 let predicted = slope * index as i32 + intercept;
318 let residual = offset as i32 - predicted;
319 offset_residuals.push(residual);
320 min_residual = min_residual.min(residual);
321 max_residual = max_residual.max(residual);
322 }
323
324 let offset_bytes = if min_residual >= i8::MIN as i32 && max_residual <= i8::MAX as i32 {
325 1
326 } else if min_residual >= i16::MIN as i32 && max_residual <= i16::MAX as i32 {
327 2
328 } else {
329 4
330 };
331
332 let residuals = match offset_bytes {
333 1 => OffsetResiduals::One(
334 offset_residuals
335 .iter()
336 .map(|&r| r as i8)
337 .collect::<Vec<_>>()
338 .into(),
339 ),
340 2 => OffsetResiduals::Two(
341 offset_residuals
342 .iter()
343 .map(|&r| r as i16)
344 .collect::<Vec<_>>()
345 .into(),
346 ),
347 4 => OffsetResiduals::Four(offset_residuals.into()),
348 _ => unreachable!("offset_bytes must be 1, 2, or 4"),
349 };
350
351 Self {
352 header: CompactOffsetHeader {
353 slope,
354 intercept,
355 offset_bytes,
356 },
357 residuals,
358 }
359 }
360
361 pub(crate) fn len(&self) -> usize {
362 self.residuals.len()
363 }
364
365 pub(crate) fn get_offset(&self, index: usize) -> u32 {
366 let predicted = self.header.slope * index as i32 + self.header.intercept;
367 (predicted + self.residuals.get_i32(index)) as u32
368 }
369
370 pub(crate) fn offsets(&self) -> Vec<u32> {
371 (0..self.len()).map(|i| self.get_offset(i)).collect()
372 }
373
374 pub(crate) fn memory_usage(&self) -> usize {
375 let header_size = std::mem::size_of::<CompactOffsetHeader>();
376 let residuals_size = match &self.residuals {
377 OffsetResiduals::One(values) => values.len() * std::mem::size_of::<i8>(),
378 OffsetResiduals::Two(values) => values.len() * std::mem::size_of::<i16>(),
379 OffsetResiduals::Four(values) => values.len() * std::mem::size_of::<i32>(),
380 };
381 header_size + residuals_size
382 }
383}
384
385pub(crate) fn empty_compact_offsets() -> CompactOffsets {
386 CompactOffsets::from_offsets(&[])
387}
388
389const SYMBOL_SIZE_BYTES: usize = std::mem::size_of::<Symbol>();
390
391pub(crate) fn train_compressor<'a, I>(iter: I) -> Compressor
392where
393 I: Iterator<Item = &'a [u8]>,
394{
395 let strings: Vec<&[u8]> = iter.collect();
396 fsst::Compressor::train(&strings)
397}
398
399#[derive(Clone)]
402pub struct FsstArray {
403 compressor: Arc<Compressor>,
404 raw: Arc<RawFsstBuffer>,
405 compact_offsets: CompactOffsets,
406}
407
408impl std::fmt::Debug for FsstArray {
409 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410 f.debug_struct("FsstBuffer")
411 .field("raw", &self.raw)
412 .field("compact_offsets", &"<CompactOffsets>")
413 .field("compressor", &"<Compressor>")
414 .finish()
415 }
416}
417
418impl FsstArray {
419 pub(crate) fn new(
420 raw: Arc<RawFsstBuffer>,
421 compact_offsets: CompactOffsets,
422 compressor: Arc<Compressor>,
423 ) -> Self {
424 Self {
425 compressor,
426 raw,
427 compact_offsets,
428 }
429 }
430
431 pub(crate) fn from_byte_offsets(
432 raw: Arc<RawFsstBuffer>,
433 byte_offsets: &[u32],
434 compressor: Arc<Compressor>,
435 ) -> Self {
436 Self::new(raw, CompactOffsets::from_offsets(byte_offsets), compressor)
437 }
438
439 pub(crate) fn raw_to_bytes(&self) -> Vec<u8> {
440 self.raw.to_bytes()
441 }
442
443 pub(crate) fn write_compact_offsets(&self, out: &mut Vec<u8>) {
444 self.compact_offsets.write_residuals(out)
445 }
446
447 pub fn train_compressor<'a>(input: impl Iterator<Item = &'a [u8]>) -> Compressor {
449 train_compressor(input)
450 }
451
452 pub fn from_byte_array_with_compressor<T: ByteArrayType>(
454 input: &GenericByteArray<T>,
455 compressor: Arc<Compressor>,
456 ) -> Self {
457 let iter = input.iter();
458 let mut compress_buffer = Vec::with_capacity(2 * 1024 * 1024);
459 let (raw, offsets) =
460 RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
461 Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
462 }
463
464 pub fn from_decimal128_array_with_compressor(
466 array: &Decimal128Array,
467 compressor: Arc<Compressor>,
468 ) -> Self {
469 let iter = array.iter().map(|v| v.map(|v| v.to_le_bytes()));
470 let mut compress_buffer = Vec::with_capacity(64);
471 let (raw, offsets) =
472 RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
473 Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
474 }
475
476 pub fn from_decimal256_array_with_compressor(
478 array: &Decimal256Array,
479 compressor: Arc<Compressor>,
480 ) -> Self {
481 let iter = array.iter().map(|v| v.map(|v| v.to_le_bytes()));
482 let mut compress_buffer = Vec::with_capacity(128);
483 let (raw, offsets) =
484 RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
485 Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
486 }
487
488 pub fn uncompressed_bytes(&self) -> usize {
490 <Self as FsstBacking>::uncompressed_bytes(self)
491 }
492
493 pub fn get_array_memory_size(&self) -> usize {
495 <Self as FsstBacking>::get_array_memory_size(self)
496 }
497
498 #[allow(clippy::len_without_is_empty)]
500 pub fn len(&self) -> usize {
501 self.compact_offsets.len().saturating_sub(1)
502 }
503
504 pub fn decompressor(&self) -> Decompressor<'_> {
506 self.compressor.decompressor()
507 }
508
509 pub fn compressor(&self) -> &Compressor {
511 &self.compressor
512 }
513
514 pub fn compressor_arc(&self) -> Arc<Compressor> {
516 self.compressor.clone()
517 }
518
519 pub fn to_bytes(&self, out: &mut Vec<u8>) {
521 out.extend_from_slice(&self.raw.to_bytes());
522 self.compact_offsets.write_residuals(out);
523 }
524
525 pub fn from_bytes(bytes: bytes::Bytes, compressor: Arc<Compressor>) -> Self {
527 if bytes.len() < 12 {
528 panic!("Input buffer too small for RawFsstBuffer header");
529 }
530
531 let raw_values_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
532 let raw_len = 12 + raw_values_len;
533 if raw_len > bytes.len() {
534 panic!("RawFsstBuffer extends beyond input buffer");
535 }
536
537 let raw = RawFsstBuffer::from_bytes(bytes.slice(0..raw_len));
538 let compact = decode_compact_offsets(&bytes[raw_len..]);
539
540 if compact.len() > 0 {
541 let last = compact.get_offset(compact.len().saturating_sub(1)) as usize;
542 debug_assert_eq!(
543 last,
544 raw.values_len(),
545 "offsets must end at raw values length"
546 );
547 }
548
549 Self::new(Arc::new(raw), compact, compressor)
550 }
551
552 pub fn to_arrow_byte_array<T: ByteArrayType<Offset = i32>>(&self) -> GenericByteArray<T> {
554 let (value_buffer, offsets) = self.to_uncompressed();
555 unsafe { GenericByteArray::<T>::new_unchecked(offsets, value_buffer, None) }
556 }
557
558 fn decompress_as_fixed_size_binary(&self, value_width: usize) -> Vec<u8> {
559 let decompressor = self.compressor.decompressor();
560 let mut value_buffer: Vec<u8> = Vec::with_capacity(self.len() * value_width + 8);
561
562 for i in 0..self.len() {
563 let compressed = self.get_compressed_slice(i);
564 let required = decompressor.max_decompression_capacity(compressed) + 8;
565 value_buffer.reserve(required);
566 let len = decompressor.decompress_into(compressed, value_buffer.spare_capacity_mut());
567 debug_assert!(len == value_width);
568 let new_len = value_buffer.len() + len;
569 unsafe {
570 value_buffer.set_len(new_len);
571 }
572 }
573 value_buffer
574 }
575
576 fn to_decimal_array_inner(&self, data_type: &ArrowFixedLenByteArrayType) -> Buffer {
577 let value_width = data_type.value_width();
578 Buffer::from(self.decompress_as_fixed_size_binary(value_width))
579 }
580
581 pub fn to_decimal128_array(&self, data_type: &ArrowFixedLenByteArrayType) -> Decimal128Array {
583 let value_buffer = self.to_decimal_array_inner(data_type);
584 let array_builder = ArrayDataBuilder::new(data_type.into())
585 .len(self.len())
586 .add_buffer(value_buffer);
587 let array_data = unsafe { array_builder.build_unchecked() };
588 Decimal128Array::from(array_data)
589 }
590
591 pub fn to_decimal256_array(&self, data_type: &ArrowFixedLenByteArrayType) -> Decimal256Array {
593 let value_buffer = self.to_decimal_array_inner(data_type);
594 let array_builder = ArrayDataBuilder::new(data_type.into())
595 .len(self.len())
596 .add_buffer(value_buffer);
597 let array_data = unsafe { array_builder.build_unchecked() };
598 Decimal256Array::from(array_data)
599 }
600
601 #[cfg(test)]
602 pub(crate) fn offsets_len(&self) -> usize {
603 self.compact_offsets.len()
604 }
605
606 #[cfg(test)]
607 pub(crate) fn offset_bytes(&self) -> u8 {
608 self.compact_offsets.header.offset_bytes
609 }
610
611 #[cfg(test)]
612 pub(crate) fn offsets(&self) -> Vec<u32> {
613 self.compact_offsets.offsets()
614 }
615}
616pub trait FsstBacking: std::fmt::Debug + Clone + sealed::Sealed {
618 fn uncompressed_bytes(&self) -> usize;
620
621 fn get_array_memory_size(&self) -> usize;
623}
624
625impl sealed::Sealed for FsstArray {}
626impl sealed::Sealed for DiskBuffer {}
627
628impl FsstArray {
629 pub(crate) fn to_uncompressed(&self) -> (Buffer, OffsetBuffer<i32>) {
630 let offsets = self.compact_offsets.offsets();
631 self.raw
632 .to_uncompressed(&self.compressor.decompressor(), &offsets)
633 }
634
635 pub(crate) fn get_compressed_slice(&self, dict_index: usize) -> &[u8] {
636 let start_offset = self.compact_offsets.get_offset(dict_index);
637 let end_offset = self.compact_offsets.get_offset(dict_index + 1);
638 self.raw.get_compressed_slice(start_offset, end_offset)
639 }
640
641 pub fn to_uncompressed_selected(&self, selected: &[usize]) -> (Buffer, OffsetBuffer<i32>) {
643 let decompressor = self.compressor.decompressor();
644 let mut value_buffer: Vec<u8> = Vec::with_capacity(self.uncompressed_bytes() + 8);
645 let mut out_offsets: OffsetBufferBuilder<i32> = OffsetBufferBuilder::new(selected.len());
646
647 for &dict_index in selected {
648 let start_offset = self.compact_offsets.get_offset(dict_index);
649 let end_offset = self.compact_offsets.get_offset(dict_index + 1);
650
651 let compressed_value = self.raw.get_compressed_slice(start_offset, end_offset);
652 let decompressed_len =
653 decompressor.decompress_into(compressed_value, value_buffer.spare_capacity_mut());
654 let new_len = value_buffer.len() + decompressed_len;
655 debug_assert!(new_len <= value_buffer.capacity());
656 unsafe {
657 value_buffer.set_len(new_len);
658 }
659 out_offsets.push_length(decompressed_len);
660 }
661
662 (Buffer::from(value_buffer), out_offsets.finish())
663 }
664}
665
666impl FsstBacking for FsstArray {
667 fn uncompressed_bytes(&self) -> usize {
668 self.raw.uncompressed_bytes()
669 }
670
671 fn get_array_memory_size(&self) -> usize {
672 self.raw.get_memory_size()
673 + self.compact_offsets.memory_usage()
674 + std::mem::size_of::<Self>()
675 }
676}
677
678#[derive(Clone)]
680pub struct DiskBuffer {
681 uncompressed_bytes: usize,
682 io: Arc<dyn SqueezeIoHandler>,
683 disk_range: Range<u64>,
684 compressor: Arc<Compressor>,
685}
686
687impl std::fmt::Debug for DiskBuffer {
688 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
689 f.debug_struct("DiskBuffer")
690 .field("uncompressed_bytes", &self.uncompressed_bytes)
691 .field("disk_range", &self.disk_range)
692 .field("io", &self.io)
693 .field("compressor", &"<Compressor>")
694 .finish()
695 }
696}
697
698impl DiskBuffer {
699 pub(crate) fn new(
700 uncompressed_bytes: usize,
701 io: Arc<dyn SqueezeIoHandler>,
702 disk_range: Range<u64>,
703 compressor: Arc<Compressor>,
704 ) -> Self {
705 Self {
706 uncompressed_bytes,
707 io,
708 disk_range,
709 compressor,
710 }
711 }
712
713 pub(crate) fn squeeze_io(&self) -> &Arc<dyn SqueezeIoHandler> {
714 &self.io
715 }
716
717 pub(crate) fn disk_range(&self) -> Range<u64> {
718 self.disk_range.clone()
719 }
720
721 pub(crate) fn compressor_arc(&self) -> Arc<Compressor> {
722 self.compressor.clone()
723 }
724}
725
726impl DiskBuffer {
727 pub(crate) async fn to_uncompressed(&self) -> (Buffer, OffsetBuffer<i32>) {
728 let bytes = self.io.read(Some(self.disk_range.clone())).await.unwrap();
729 let byte_view =
730 LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.compressor.clone());
731 byte_view.fsst_buffer.to_uncompressed()
732 }
733
734 pub(crate) async fn to_uncompressed_selected(
735 &self,
736 selected: &[usize],
737 ) -> (Buffer, OffsetBuffer<i32>) {
738 let bytes = self.io.read(Some(self.disk_range.clone())).await.unwrap();
739 let byte_view =
740 LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.compressor.clone());
741 let total_count = byte_view.prefix_keys.len();
742 self.io
743 .tracing_decompress_count(selected.len(), total_count);
744 byte_view.fsst_buffer.to_uncompressed_selected(selected)
745 }
746}
747
748impl FsstBacking for DiskBuffer {
749 fn uncompressed_bytes(&self) -> usize {
750 self.uncompressed_bytes
751 }
752
753 fn get_array_memory_size(&self) -> usize {
754 0
755 }
756}
757
758impl CompactOffsets {
759 fn write_residuals(&self, out: &mut Vec<u8>) {
760 out.extend_from_slice(&self.header.slope.to_le_bytes());
761 out.extend_from_slice(&self.header.intercept.to_le_bytes());
762 out.push(self.header.offset_bytes);
763
764 match &self.residuals {
765 OffsetResiduals::One(residuals) => {
766 out.extend(residuals.iter().map(|r| *r as u8));
767 }
768 OffsetResiduals::Two(residuals) => {
769 for r in residuals.iter() {
770 out.extend_from_slice(&r.to_le_bytes());
771 }
772 }
773 OffsetResiduals::Four(residuals) => {
774 for r in residuals.iter() {
775 out.extend_from_slice(&r.to_le_bytes());
776 }
777 }
778 }
779 }
780}
781
782pub(crate) fn decode_compact_offsets(bytes: &[u8]) -> CompactOffsets {
783 if bytes.len() < 9 {
784 panic!("CompactOffsets requires at least 9 bytes for header");
785 }
786
787 let slope = i32::from_le_bytes(bytes[0..4].try_into().unwrap());
788 let intercept = i32::from_le_bytes(bytes[4..8].try_into().unwrap());
789 let offset_bytes = bytes[8] as usize;
790 if !matches!(offset_bytes, 1 | 2 | 4) {
791 panic!("Invalid offset_bytes value: {}", offset_bytes);
792 }
793
794 let header = CompactOffsetHeader {
795 slope,
796 intercept,
797 offset_bytes: offset_bytes as u8,
798 };
799
800 let payload = &bytes[9..];
801 if !payload.len().is_multiple_of(offset_bytes) {
802 panic!("Invalid payload size for CompactOffsets");
803 }
804 let count = payload.len() / offset_bytes;
805
806 match offset_bytes {
807 1 => {
808 let residuals: Arc<[i8]> = payload.iter().map(|b| *b as i8).collect::<Vec<_>>().into();
809 CompactOffsets {
810 header,
811 residuals: OffsetResiduals::One(residuals),
812 }
813 }
814 2 => {
815 let mut residuals = Vec::with_capacity(count);
816 for i in 0..count {
817 let base = i * 2;
818 residuals.push(i16::from_le_bytes(
819 payload[base..base + 2].try_into().unwrap(),
820 ));
821 }
822 CompactOffsets {
823 header,
824 residuals: OffsetResiduals::Two(residuals.into()),
825 }
826 }
827 4 => {
828 let mut residuals = Vec::with_capacity(count);
829 for i in 0..count {
830 let base = i * 4;
831 residuals.push(i32::from_le_bytes(
832 payload[base..base + 4].try_into().unwrap(),
833 ));
834 }
835 CompactOffsets {
836 header,
837 residuals: OffsetResiduals::Four(residuals.into()),
838 }
839 }
840 _ => unreachable!("validated offset_bytes"),
841 }
842}
843
844pub fn save_symbol_table(compressor: Arc<Compressor>, buffer: &mut Vec<u8>) -> Result<()> {
851 let symbols = compressor.symbol_table();
852 let symbols_lengths = compressor.symbol_lengths();
853
854 if symbols.len() != symbols_lengths.len() {
855 return Err(Error::new(
856 ErrorKind::InvalidInput,
857 "Symbol table and symbol lengths have different lengths",
858 ));
859 }
860
861 if symbols.len() > u8::MAX as usize {
862 return Err(Error::new(
863 ErrorKind::InvalidInput,
864 "Symbol table too large",
865 ));
866 }
867
868 buffer.push(symbols.len() as u8);
869
870 for &len in symbols_lengths.iter() {
871 buffer.push(len);
872 }
873
874 for sym in symbols.iter() {
875 buffer.extend_from_slice(&sym.to_u64().to_le_bytes());
876 }
877
878 Ok(())
879}
880
881pub fn load_symbol_table(data: bytes::Bytes) -> Result<Compressor> {
883 if data.is_empty() {
884 return Err(Error::new(ErrorKind::InvalidInput, "Empty symbol table"));
885 }
886
887 let symbol_count = data[0] as usize;
888 let lengths_start = 1;
889 let lengths_end = lengths_start + symbol_count;
890 if lengths_end > data.len() {
891 return Err(Error::new(
892 ErrorKind::InvalidInput,
893 "Buffer too small for symbol lengths",
894 ));
895 }
896
897 let lengths = &data[lengths_start..lengths_end];
898 let symbols_start = lengths_end;
899 let symbols_end = symbols_start + symbol_count * SYMBOL_SIZE_BYTES;
900 if symbols_end > data.len() {
901 return Err(Error::new(
902 ErrorKind::InvalidInput,
903 "Buffer too small for symbols",
904 ));
905 }
906
907 let mut symbols = Vec::with_capacity(symbol_count);
908 for i in 0..symbol_count {
909 let base = symbols_start + i * SYMBOL_SIZE_BYTES;
910 let bytes: [u8; SYMBOL_SIZE_BYTES] =
911 data[base..base + SYMBOL_SIZE_BYTES].try_into().unwrap();
912 symbols.push(Symbol::from_slice(&bytes));
913 }
914
915 Ok(fsst::Compressor::rebuild_from(symbols, lengths))
916}
917
918#[cfg(test)]
919mod tests {
920 use super::*;
921 use arrow::{
922 array::{Array, Decimal128Builder, StringBuilder},
923 datatypes::DataType,
924 };
925
926 #[test]
927 fn test_compact_offset_view_round_trip() {
928 let small_offsets = vec![100u32, 105, 110, 115];
930 test_round_trip(&small_offsets, "small offsets");
931
932 let medium_offsets = vec![1000u32, 2000, 3000, 3500];
934 test_round_trip(&medium_offsets, "medium offsets");
935
936 let large_offsets = vec![100000u32, 200000, 300000, 310000];
938 test_round_trip(&large_offsets, "large offsets");
939
940 let mixed_offsets = vec![1000u32, 1010, 1020, 1030, 1040, 1050];
942 test_round_trip(&mixed_offsets, "mixed scenarios");
943
944 let empty_offsets: Vec<u32> = vec![0];
946 test_round_trip(&empty_offsets, "empty values");
947
948 let single_offset = vec![42u32, 50];
950 test_round_trip(&single_offset, "single offset");
951 }
952
953 fn test_round_trip(offsets: &[u32], test_name: &str) {
954 let compact_offsets = CompactOffsets::from_offsets(offsets);
955
956 assert_eq!(
957 offsets.len(),
958 compact_offsets.len(),
959 "Length mismatch in {}",
960 test_name
961 );
962 for (i, offset) in offsets.iter().enumerate() {
963 assert_eq!(
964 compact_offsets.get_offset(i),
965 *offset,
966 "Offset mismatch at index {} in {}",
967 i,
968 test_name
969 );
970 }
971
972 let mut bytes = Vec::new();
973 compact_offsets.write_residuals(&mut bytes);
974 let reconstructed = decode_compact_offsets(&bytes);
975
976 assert_eq!(
977 offsets.len(),
978 reconstructed.len(),
979 "Reconstructed length mismatch in {}",
980 test_name
981 );
982 for (i, o) in offsets.iter().enumerate() {
983 assert_eq!(*o, reconstructed.get_offset(i));
984 }
985 }
986
987 #[test]
988 fn test_compact_offset_view_memory_efficiency() {
989 let offsets = vec![1000u32, 1010, 1020, 1030, 1040];
991
992 let original_size = offsets.len() * std::mem::size_of::<u32>();
993 let compact_offsets = CompactOffsets::from_offsets(&offsets);
994 let compact_size = compact_offsets.memory_usage();
995
996 assert!(
998 compact_size <= original_size,
999 "Compact representation should not be larger"
1000 );
1001 }
1002
1003 #[test]
1004 fn test_compact_offset_view_struct_methods() {
1005 let key = PrefixKey::from_parts([1, 2, 3, 4, 5, 6, 7], 15);
1006 assert_eq!(key.prefix7(), &[1, 2, 3, 4, 5, 6, 7]);
1007 assert_eq!(key.len_byte(), 15);
1008 assert_eq!(key.known_suffix_len(), Some(15));
1009
1010 let unknown = PrefixKey::from_parts([7, 6, 5, 4, 3, 2, 1], 255);
1011 assert_eq!(unknown.len_byte(), 255);
1012 assert_eq!(unknown.known_suffix_len(), None);
1013
1014 let r1 = OffsetResiduals::One(vec![-42i8, 7].into());
1015 assert_eq!(r1.bytes_per(), 1);
1016 assert_eq!(r1.get_i32(0), -42);
1017
1018 let r2 = OffsetResiduals::Two(vec![12345i16].into());
1019 assert_eq!(r2.bytes_per(), 2);
1020 assert_eq!(r2.get_i32(0), 12345);
1021
1022 let r4 = OffsetResiduals::Four(vec![-1000000i32].into());
1023 assert_eq!(r4.bytes_per(), 4);
1024 assert_eq!(r4.get_i32(0), -1000000);
1025
1026 assert_eq!(PrefixKey::prefix_len(), 7);
1027 }
1028
1029 #[test]
1030 fn test_compact_offset_view_group_from_bytes_errors() {
1031 let short_bytes = vec![1, 2, 3]; let result = std::panic::catch_unwind(|| decode_compact_offsets(&short_bytes));
1034 assert!(result.is_err(), "Should panic with insufficient bytes");
1035
1036 let mut invalid_header = vec![0; 9];
1038 invalid_header[8] = 3; let result = std::panic::catch_unwind(|| decode_compact_offsets(&invalid_header));
1040 assert!(result.is_err(), "Should panic with invalid offset_bytes");
1041
1042 let mut misaligned_two_bytes = vec![0; 9 + 1]; misaligned_two_bytes[8] = 2; let result = std::panic::catch_unwind(|| decode_compact_offsets(&misaligned_two_bytes));
1046 assert!(
1047 result.is_err(),
1048 "Should panic with misaligned TwoBytes residuals"
1049 );
1050
1051 let mut misaligned_four_bytes = vec![0; 9 + 2]; misaligned_four_bytes[8] = 4; let result = std::panic::catch_unwind(|| decode_compact_offsets(&misaligned_four_bytes));
1055 assert!(
1056 result.is_err(),
1057 "Should panic with misaligned FourBytes residuals"
1058 );
1059 }
1060
1061 #[test]
1062 fn test_compact_offset_view_group_from_bytes_valid() {
1063 let offsets = vec![100u32, 101, 105];
1065 let original = CompactOffsets::from_offsets(&offsets);
1066
1067 let mut bytes = Vec::new();
1068 original.write_residuals(&mut bytes);
1069 let reconstructed = decode_compact_offsets(&bytes);
1070
1071 assert_eq!(offsets.len(), reconstructed.len());
1073 for (i, o) in offsets.iter().enumerate() {
1074 assert_eq!(*o, reconstructed.get_offset(i));
1075 }
1076 }
1077
1078 #[test]
1079 fn test_fsst_buffer_bytes_roundtrip() {
1080 let mut builder = StringBuilder::new();
1081 for i in 0..1000 {
1082 builder.append_value(format!("test string value {i}"));
1083 }
1084 let original = builder.finish();
1085
1086 let compressor =
1087 FsstArray::train_compressor(original.iter().flat_map(|s| s.map(|s| s.as_bytes())));
1088 let compressor_arc = Arc::new(compressor);
1089 let original_fsst =
1090 FsstArray::from_byte_array_with_compressor(&original, compressor_arc.clone());
1091
1092 let mut buffer = Vec::new();
1093 original_fsst.to_bytes(&mut buffer);
1094
1095 let bytes = bytes::Bytes::from(buffer);
1096 let deserialized = FsstArray::from_bytes(bytes, compressor_arc);
1097
1098 let original_strings = original_fsst.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1099 let deserialized_strings = deserialized.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1100 assert_eq!(original_strings.len(), deserialized_strings.len());
1101 for (orig, deser) in original_strings.iter().zip(deserialized_strings.iter()) {
1102 assert_eq!(orig, deser);
1103 }
1104 }
1105
1106 #[test]
1107 fn test_decimal_compression_smoke() {
1108 let mut builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(10, 2));
1109 for i in 0..4096 {
1110 builder.append_value(i128::from_le_bytes([(i % 16) as u8; 16]));
1111 }
1112 let original = builder.finish();
1113 let original_size = original.get_array_memory_size();
1114
1115 let values = original
1116 .iter()
1117 .filter_map(|v| v.map(|v| v.to_le_bytes()))
1118 .collect::<Vec<_>>();
1119 let compressor = FsstArray::train_compressor(values.iter().map(|b| b.as_slice()));
1120 let compressor_arc = Arc::new(compressor);
1121
1122 let fsst = FsstArray::from_decimal128_array_with_compressor(&original, compressor_arc);
1123 let compressed_size = fsst.get_array_memory_size();
1124 assert!(compressed_size < original_size);
1125 }
1126
1127 #[test]
1128 fn test_save_and_load_symbol_table_roundtrip() {
1129 let mut builder = StringBuilder::new();
1130 for i in 0..1000 {
1131 builder.append_value(format!("hello world {i}"));
1132 }
1133 let original = builder.finish();
1134
1135 let compressor =
1136 FsstArray::train_compressor(original.iter().flat_map(|s| s.map(|s| s.as_bytes())));
1137 let compressor_arc = Arc::new(compressor);
1138
1139 let mut bytes = Vec::new();
1140 save_symbol_table(compressor_arc.clone(), &mut bytes).unwrap();
1141 let reloaded = load_symbol_table(bytes::Bytes::from(bytes)).unwrap();
1142
1143 let fsst_original = FsstArray::from_byte_array_with_compressor(&original, compressor_arc);
1144 let fsst_reloaded =
1145 FsstArray::from_byte_array_with_compressor(&original, Arc::new(reloaded));
1146
1147 let a = fsst_original.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1148 let b = fsst_reloaded.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1149 assert_eq!(a, b);
1150 }
1151}