lance_encoding/encodings/physical/
block.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Encodings based on traditional block compression schemes
5//!
6//! Traditional compressors take in a buffer and return a smaller buffer.  All encoding
7//! description is shoved into the compressed buffer and the entire buffer is needed to
8//! decompress any of the data.
9//!
10//! These encodings are not transparent, which limits our ability to use them.  In addition
11//! they are often quite expensive in CPU terms.
12//!
13//! However, they are effective and useful for some cases.  For example, when working with large
14//! variable length values (e.g. source code files) they can be very effective.
15//!
16//! The module introduces the `[BufferCompressor]` trait which describes the interface for a
17//! traditional block compressor.  It is implemented for the most common compression schemes
18//! (zstd, lz4, etc).
19//!
20//! There is not yet a mini-block variant of this compressor (but could easily be one) and the
21//! full zip variant works by applying compression on a per-value basis (which allows it to be
22//! transparent).
23
24use arrow_buffer::ArrowNativeType;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use std::str::FromStr;
29
30use crate::compression::{BlockCompressor, BlockDecompressor};
31use crate::encodings::physical::binary::{BinaryBlockDecompressor, VariableEncoder};
32use crate::format::{
33    pb21::{self, CompressiveEncoding},
34    ProtobufUtils21,
35};
36use crate::{
37    buffer::LanceBuffer,
38    compression::VariablePerValueDecompressor,
39    data::{BlockInfo, DataBlock, VariableWidthBlock},
40    encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock},
41};
42
43#[derive(Debug, Clone, Copy, PartialEq)]
44pub struct CompressionConfig {
45    pub(crate) scheme: CompressionScheme,
46    pub(crate) level: Option<i32>,
47}
48
49impl CompressionConfig {
50    pub(crate) fn new(scheme: CompressionScheme, level: Option<i32>) -> Self {
51        Self { scheme, level }
52    }
53}
54
55impl Default for CompressionConfig {
56    fn default() -> Self {
57        Self {
58            scheme: CompressionScheme::Lz4,
59            level: Some(0),
60        }
61    }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq)]
65pub enum CompressionScheme {
66    None,
67    Fsst,
68    Zstd,
69    Lz4,
70}
71
72impl TryFrom<CompressionScheme> for pb21::CompressionScheme {
73    type Error = Error;
74
75    fn try_from(scheme: CompressionScheme) -> Result<Self> {
76        match scheme {
77            CompressionScheme::Lz4 => Ok(Self::CompressionAlgorithmLz4),
78            CompressionScheme::Zstd => Ok(Self::CompressionAlgorithmZstd),
79            _ => Err(Error::invalid_input(
80                format!("Unsupported compression scheme: {:?}", scheme),
81                location!(),
82            )),
83        }
84    }
85}
86
87impl TryFrom<pb21::CompressionScheme> for CompressionScheme {
88    type Error = Error;
89
90    fn try_from(scheme: pb21::CompressionScheme) -> Result<Self> {
91        match scheme {
92            pb21::CompressionScheme::CompressionAlgorithmLz4 => Ok(Self::Lz4),
93            pb21::CompressionScheme::CompressionAlgorithmZstd => Ok(Self::Zstd),
94            _ => Err(Error::invalid_input(
95                format!("Unsupported compression scheme: {:?}", scheme),
96                location!(),
97            )),
98        }
99    }
100}
101
102impl std::fmt::Display for CompressionScheme {
103    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
104        let scheme_str = match self {
105            Self::Fsst => "fsst",
106            Self::Zstd => "zstd",
107            Self::None => "none",
108            Self::Lz4 => "lz4",
109        };
110        write!(f, "{}", scheme_str)
111    }
112}
113
114impl FromStr for CompressionScheme {
115    type Err = Error;
116
117    fn from_str(s: &str) -> Result<Self> {
118        match s {
119            "none" => Ok(Self::None),
120            "fsst" => Ok(Self::Fsst),
121            "zstd" => Ok(Self::Zstd),
122            "lz4" => Ok(Self::Lz4),
123            _ => Err(Error::invalid_input(
124                format!("Unknown compression scheme: {}", s),
125                location!(),
126            )),
127        }
128    }
129}
130
131pub trait BufferCompressor: std::fmt::Debug + Send + Sync {
132    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
133    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
134    fn config(&self) -> CompressionConfig;
135}
136
137#[cfg(feature = "zstd")]
138mod zstd {
139    use std::io::{Cursor, Write};
140
141    use super::*;
142
143    use ::zstd::bulk::decompress_to_buffer;
144    use ::zstd::stream::copy_decode;
145
146    #[derive(Debug, Default)]
147    pub struct ZstdBufferCompressor {
148        compression_level: i32,
149    }
150
151    impl ZstdBufferCompressor {
152        pub fn new(compression_level: i32) -> Self {
153            Self { compression_level }
154        }
155
156        // https://datatracker.ietf.org/doc/html/rfc8878
157        fn is_raw_stream_format(&self, input_buf: &[u8]) -> bool {
158            if input_buf.len() < 8 {
159                return true; // can't be length prefixed format if less than 8 bytes
160            }
161            // read the first 4 bytes as the magic number
162            let mut magic_buf = [0u8; 4];
163            magic_buf.copy_from_slice(&input_buf[..4]);
164            let magic = u32::from_le_bytes(magic_buf);
165
166            // see RFC 8878, section 3.1.1. Zstandard Frames, which defines the magic number
167            const ZSTD_MAGIC_NUMBER: u32 = 0xFD2FB528;
168            if magic == ZSTD_MAGIC_NUMBER {
169                // the compressed buffer starts like a Zstd frame.
170                // Per RFC 8878, the reserved bit (with Bit Number 3, the 4th bit) in the FHD (frame header descriptor) MUST be 0
171                // see section 3.1.1.1.1. 'Frame_Header_Descriptor' and section 3.1.1.1.1.4. 'Reserved Bit' for details
172                const FHD_BYTE_INDEX: usize = 4;
173                let fhd_byte = input_buf[FHD_BYTE_INDEX];
174                const FHD_RESERVED_BIT_MASK: u8 = 0b0001_0000;
175                let reserved_bit = fhd_byte & FHD_RESERVED_BIT_MASK;
176
177                if reserved_bit != 0 {
178                    // this bit is 1. This is NOT a valid zstd frame.
179                    // therefore, it must be length prefixed format where the length coincidentally
180                    // started with the magic number
181                    false
182                } else {
183                    // the reserved bit is 0. This is consistent with a valid Zstd frame.
184                    // treat it as raw stream format
185                    true
186                }
187            } else {
188                // doesn't start with the magic number, so it can't be the raw stream format
189                false
190            }
191        }
192
193        fn decompress_length_prefixed_zstd(
194            &self,
195            input_buf: &[u8],
196            output_buf: &mut Vec<u8>,
197        ) -> Result<()> {
198            const LENGTH_PREFIX_SIZE: usize = 8;
199            let mut len_buf = [0u8; LENGTH_PREFIX_SIZE];
200            len_buf.copy_from_slice(&input_buf[..LENGTH_PREFIX_SIZE]);
201
202            let uncompressed_len = u64::from_le_bytes(len_buf) as usize;
203
204            let start = output_buf.len();
205            output_buf.resize(start + uncompressed_len, 0);
206
207            let compressed_data = &input_buf[LENGTH_PREFIX_SIZE..];
208            decompress_to_buffer(compressed_data, &mut output_buf[start..])?;
209            Ok(())
210        }
211    }
212
213    impl BufferCompressor for ZstdBufferCompressor {
214        fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
215            output_buf.write_all(&(input_buf.len() as u64).to_le_bytes())?;
216            let mut encoder = ::zstd::stream::Encoder::new(output_buf, self.compression_level)?;
217
218            encoder.write_all(input_buf)?;
219            match encoder.finish() {
220                Ok(_) => Ok(()),
221                Err(e) => Err(e.into()),
222            }
223        }
224
225        fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
226            if input_buf.is_empty() {
227                return Ok(());
228            }
229
230            let is_raw_stream_format = self.is_raw_stream_format(input_buf);
231            if is_raw_stream_format {
232                copy_decode(Cursor::new(input_buf), output_buf)?;
233            } else {
234                self.decompress_length_prefixed_zstd(input_buf, output_buf)?;
235            }
236
237            Ok(())
238        }
239
240        fn config(&self) -> CompressionConfig {
241            CompressionConfig {
242                scheme: CompressionScheme::Zstd,
243                level: Some(self.compression_level),
244            }
245        }
246    }
247}
248
249#[cfg(feature = "lz4")]
250mod lz4 {
251    use super::*;
252
253    #[derive(Debug, Default)]
254    pub struct Lz4BufferCompressor {}
255
256    impl BufferCompressor for Lz4BufferCompressor {
257        fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
258            // Remember the starting position
259            let start_pos = output_buf.len();
260
261            // LZ4 needs space for the compressed data
262            let max_size = ::lz4::block::compress_bound(input_buf.len())?;
263            // Resize to ensure we have enough space (including 4 bytes for size header)
264            output_buf.resize(start_pos + max_size + 4, 0);
265
266            let compressed_size = ::lz4::block::compress_to_buffer(
267                input_buf,
268                None,
269                true,
270                &mut output_buf[start_pos..],
271            )
272            .map_err(|err| Error::Internal {
273                message: format!("LZ4 compression error: {}", err),
274                location: location!(),
275            })?;
276
277            // Truncate to actual size
278            output_buf.truncate(start_pos + compressed_size);
279            Ok(())
280        }
281
282        fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
283            // When prepend_size is true, LZ4 stores the uncompressed size in the first 4 bytes
284            // We can read this to know exactly how much space we need
285            if input_buf.len() < 4 {
286                return Err(Error::Internal {
287                    message: "LZ4 compressed data too short".to_string(),
288                    location: location!(),
289                });
290            }
291
292            // Read the uncompressed size from the first 4 bytes (little-endian)
293            let uncompressed_size =
294                u32::from_le_bytes([input_buf[0], input_buf[1], input_buf[2], input_buf[3]])
295                    as usize;
296
297            // Remember the starting position
298            let start_pos = output_buf.len();
299
300            // Resize to ensure we have the exact space needed
301            output_buf.resize(start_pos + uncompressed_size, 0);
302
303            // Now decompress directly into the buffer slice
304            let decompressed_size =
305                ::lz4::block::decompress_to_buffer(input_buf, None, &mut output_buf[start_pos..])
306                    .map_err(|err| Error::Internal {
307                    message: format!("LZ4 decompression error: {}", err),
308                    location: location!(),
309                })?;
310
311            // Truncate to actual decompressed size (should be same as uncompressed_size)
312            output_buf.truncate(start_pos + decompressed_size);
313
314            Ok(())
315        }
316
317        fn config(&self) -> CompressionConfig {
318            CompressionConfig {
319                scheme: CompressionScheme::Lz4,
320                level: None,
321            }
322        }
323    }
324}
325
326#[derive(Debug, Default)]
327pub struct NoopBufferCompressor {}
328
329impl BufferCompressor for NoopBufferCompressor {
330    fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
331        output_buf.extend_from_slice(input_buf);
332        Ok(())
333    }
334
335    fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
336        output_buf.extend_from_slice(input_buf);
337        Ok(())
338    }
339
340    fn config(&self) -> CompressionConfig {
341        CompressionConfig {
342            scheme: CompressionScheme::None,
343            level: None,
344        }
345    }
346}
347
348pub struct GeneralBufferCompressor {}
349
350impl GeneralBufferCompressor {
351    pub fn get_compressor(
352        compression_config: CompressionConfig,
353    ) -> Result<Box<dyn BufferCompressor>> {
354        match compression_config.scheme {
355            // FSST has its own compression path and isn't implemented as a generic buffer compressor
356            CompressionScheme::Fsst => Err(Error::InvalidInput {
357                source: "fsst is not usable as a general buffer compressor".into(),
358                location: location!(),
359            }),
360            CompressionScheme::Zstd => {
361                #[cfg(feature = "zstd")]
362                {
363                    Ok(Box::new(zstd::ZstdBufferCompressor::new(
364                        compression_config.level.unwrap_or(0),
365                    )))
366                }
367                #[cfg(not(feature = "zstd"))]
368                {
369                    Err(Error::InvalidInput {
370                        source: "package was not built with zstd support".into(),
371                        location: location!(),
372                    })
373                }
374            }
375            CompressionScheme::Lz4 => {
376                #[cfg(feature = "lz4")]
377                {
378                    Ok(Box::new(lz4::Lz4BufferCompressor::default()))
379                }
380                #[cfg(not(feature = "lz4"))]
381                {
382                    Err(Error::InvalidInput {
383                        source: "package was not built with lz4 support".into(),
384                        location: location!(),
385                    })
386                }
387            }
388            CompressionScheme::None => Ok(Box::new(NoopBufferCompressor {})),
389        }
390    }
391}
392
393/// A block decompressor that first applies general-purpose compression (LZ4/Zstd)
394/// before delegating to an inner block decompressor.
395#[derive(Debug)]
396pub struct GeneralBlockDecompressor {
397    inner: Box<dyn BlockDecompressor>,
398    compressor: Box<dyn BufferCompressor>,
399}
400
401impl GeneralBlockDecompressor {
402    pub fn try_new(
403        inner: Box<dyn BlockDecompressor>,
404        compression: CompressionConfig,
405    ) -> Result<Self> {
406        let compressor = GeneralBufferCompressor::get_compressor(compression)?;
407        Ok(Self { inner, compressor })
408    }
409}
410
411impl BlockDecompressor for GeneralBlockDecompressor {
412    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
413        let mut decompressed = Vec::new();
414        self.compressor.decompress(&data, &mut decompressed)?;
415        self.inner
416            .decompress(LanceBuffer::from(decompressed), num_values)
417    }
418}
419
420// An encoder which uses generic compression, such as zstd/lz4 to encode buffers
421#[derive(Debug)]
422pub struct CompressedBufferEncoder {
423    pub(crate) compressor: Box<dyn BufferCompressor>,
424}
425
426impl Default for CompressedBufferEncoder {
427    fn default() -> Self {
428        // Pick zstd if available, otherwise lz4, otherwise none
429        #[cfg(feature = "zstd")]
430        let (scheme, level) = (CompressionScheme::Zstd, Some(0));
431        #[cfg(all(feature = "lz4", not(feature = "zstd")))]
432        let (scheme, level) = (CompressionScheme::Lz4, None);
433        #[cfg(not(any(feature = "zstd", feature = "lz4")))]
434        let (scheme, level) = (CompressionScheme::None, None);
435
436        let compressor =
437            GeneralBufferCompressor::get_compressor(CompressionConfig { scheme, level }).unwrap();
438        Self { compressor }
439    }
440}
441
442impl CompressedBufferEncoder {
443    pub fn try_new(compression_config: CompressionConfig) -> Result<Self> {
444        let compressor = GeneralBufferCompressor::get_compressor(compression_config)?;
445        Ok(Self { compressor })
446    }
447
448    pub fn from_scheme(scheme: pb21::CompressionScheme) -> Result<Self> {
449        let scheme = CompressionScheme::try_from(scheme)?;
450        Ok(Self {
451            compressor: GeneralBufferCompressor::get_compressor(CompressionConfig {
452                scheme,
453                level: Some(0),
454            })?,
455        })
456    }
457}
458
459impl CompressedBufferEncoder {
460    pub fn per_value_compress<T: ArrowNativeType>(
461        &self,
462        data: &[u8],
463        offsets: &[T],
464        compressed: &mut Vec<u8>,
465    ) -> Result<LanceBuffer> {
466        let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
467        new_offsets.push(T::from_usize(0).unwrap());
468
469        for off in offsets.windows(2) {
470            let start = off[0].as_usize();
471            let end = off[1].as_usize();
472            self.compressor.compress(&data[start..end], compressed)?;
473            new_offsets.push(T::from_usize(compressed.len()).unwrap());
474        }
475
476        Ok(LanceBuffer::reinterpret_vec(new_offsets))
477    }
478
479    pub fn per_value_decompress<T: ArrowNativeType>(
480        &self,
481        data: &[u8],
482        offsets: &[T],
483        decompressed: &mut Vec<u8>,
484    ) -> Result<LanceBuffer> {
485        let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
486        new_offsets.push(T::from_usize(0).unwrap());
487
488        for off in offsets.windows(2) {
489            let start = off[0].as_usize();
490            let end = off[1].as_usize();
491            self.compressor
492                .decompress(&data[start..end], decompressed)?;
493            new_offsets.push(T::from_usize(decompressed.len()).unwrap());
494        }
495
496        Ok(LanceBuffer::reinterpret_vec(new_offsets))
497    }
498}
499
500impl PerValueCompressor for CompressedBufferEncoder {
501    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
502        let data_type = data.name();
503        let data = data.as_variable_width().ok_or(Error::Internal {
504            message: format!(
505                "Attempt to use CompressedBufferEncoder on data of type {}",
506                data_type
507            ),
508            location: location!(),
509        })?;
510
511        let data_bytes = &data.data;
512        let mut compressed = Vec::with_capacity(data_bytes.len());
513
514        let new_offsets = match data.bits_per_offset {
515            32 => self.per_value_compress::<u32>(
516                data_bytes,
517                &data.offsets.borrow_to_typed_slice::<u32>(),
518                &mut compressed,
519            )?,
520            64 => self.per_value_compress::<u64>(
521                data_bytes,
522                &data.offsets.borrow_to_typed_slice::<u64>(),
523                &mut compressed,
524            )?,
525            _ => unreachable!(),
526        };
527
528        let compressed = PerValueDataBlock::Variable(VariableWidthBlock {
529            bits_per_offset: data.bits_per_offset,
530            data: LanceBuffer::from(compressed),
531            offsets: new_offsets,
532            num_values: data.num_values,
533            block_info: BlockInfo::new(),
534        });
535
536        // TODO: Support setting the level
537        // TODO: Support underlying compression of data (e.g. defer to binary encoding for offset bitpacking)
538        let encoding = ProtobufUtils21::wrapped(
539            self.compressor.config(),
540            ProtobufUtils21::variable(
541                ProtobufUtils21::flat(data.bits_per_offset as u64, None),
542                None,
543            ),
544        )?;
545
546        Ok((compressed, encoding))
547    }
548}
549
550impl VariablePerValueDecompressor for CompressedBufferEncoder {
551    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
552        let data_bytes = &data.data;
553        let mut decompressed = Vec::with_capacity(data_bytes.len() * 2);
554
555        let new_offsets = match data.bits_per_offset {
556            32 => self.per_value_decompress(
557                data_bytes,
558                &data.offsets.borrow_to_typed_slice::<u32>(),
559                &mut decompressed,
560            )?,
561            64 => self.per_value_decompress(
562                data_bytes,
563                &data.offsets.borrow_to_typed_slice::<u64>(),
564                &mut decompressed,
565            )?,
566            _ => unreachable!(),
567        };
568        Ok(DataBlock::VariableWidth(VariableWidthBlock {
569            bits_per_offset: data.bits_per_offset,
570            data: LanceBuffer::from(decompressed),
571            offsets: new_offsets,
572            num_values: data.num_values,
573            block_info: BlockInfo::new(),
574        }))
575    }
576}
577
578impl BlockCompressor for CompressedBufferEncoder {
579    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
580        let encoded = match data {
581            DataBlock::FixedWidth(fixed_width) => fixed_width.data,
582            DataBlock::VariableWidth(variable_width) => {
583                // Wrap VariableEncoder to handle the encoding
584                let encoder = VariableEncoder::default();
585                BlockCompressor::compress(&encoder, DataBlock::VariableWidth(variable_width))?
586            }
587            _ => {
588                return Err(Error::InvalidInput {
589                    source: "Unsupported data block type".into(),
590                    location: location!(),
591                })
592            }
593        };
594
595        let mut compressed = Vec::new();
596        self.compressor.compress(&encoded, &mut compressed)?;
597        Ok(LanceBuffer::from(compressed))
598    }
599}
600
601impl BlockDecompressor for CompressedBufferEncoder {
602    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
603        let mut decompressed = Vec::new();
604        self.compressor.decompress(&data, &mut decompressed)?;
605
606        // Delegate to BinaryBlockDecompressor which handles the inline metadata
607        let inner_decoder = BinaryBlockDecompressor::default();
608        inner_decoder.decompress(LanceBuffer::from(decompressed), num_values)
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use std::str::FromStr;
616
617    use crate::encodings::physical::block::zstd::ZstdBufferCompressor;
618
619    #[test]
620    fn test_compression_scheme_from_str() {
621        assert_eq!(
622            CompressionScheme::from_str("none").unwrap(),
623            CompressionScheme::None
624        );
625        assert_eq!(
626            CompressionScheme::from_str("zstd").unwrap(),
627            CompressionScheme::Zstd
628        );
629    }
630
631    #[test]
632    fn test_compression_scheme_from_str_invalid() {
633        assert!(CompressionScheme::from_str("invalid").is_err());
634    }
635
636    #[cfg(feature = "zstd")]
637    mod zstd {
638        use std::io::Write;
639
640        use super::*;
641
642        #[test]
643        fn test_compress_zstd_with_length_prefixed() {
644            let compressor = ZstdBufferCompressor::new(0);
645            let input_data = b"Hello, world!";
646            let mut compressed_data = Vec::new();
647
648            compressor
649                .compress(input_data, &mut compressed_data)
650                .unwrap();
651            let mut decompressed_data = Vec::new();
652            compressor
653                .decompress(&compressed_data, &mut decompressed_data)
654                .unwrap();
655            assert_eq!(input_data, decompressed_data.as_slice());
656        }
657
658        #[test]
659        fn test_zstd_compress_decompress_multiple_times() {
660            let compressor = ZstdBufferCompressor::new(0);
661            let (input_data_1, input_data_2) = (b"Hello ", b"World");
662            let mut compressed_data = Vec::new();
663
664            compressor
665                .compress(input_data_1, &mut compressed_data)
666                .unwrap();
667            let compressed_length_1 = compressed_data.len();
668
669            compressor
670                .compress(input_data_2, &mut compressed_data)
671                .unwrap();
672
673            let mut decompressed_data = Vec::new();
674            compressor
675                .decompress(
676                    &compressed_data[..compressed_length_1],
677                    &mut decompressed_data,
678                )
679                .unwrap();
680
681            compressor
682                .decompress(
683                    &compressed_data[compressed_length_1..],
684                    &mut decompressed_data,
685                )
686                .unwrap();
687
688            // the output should contain both input_data_1 and input_data_2
689            assert_eq!(
690                decompressed_data.len(),
691                input_data_1.len() + input_data_2.len()
692            );
693            assert_eq!(
694                &decompressed_data[..input_data_1.len()],
695                input_data_1,
696                "First part of decompressed data should match input_1"
697            );
698            assert_eq!(
699                &decompressed_data[input_data_1.len()..],
700                input_data_2,
701                "Second part of decompressed data should match input_2"
702            );
703        }
704
705        #[test]
706        fn test_compress_zstd_raw_stream_format_and_decompress_with_length_prefixed() {
707            let compressor = ZstdBufferCompressor::new(0);
708            let input_data = b"Hello, world!";
709            let mut compressed_data = Vec::new();
710
711            // compress using raw stream format
712            let mut encoder = ::zstd::Encoder::new(&mut compressed_data, 0).unwrap();
713            encoder.write_all(input_data).unwrap();
714            encoder.finish().expect("failed to encode data with zstd");
715
716            // decompress using length prefixed format
717            let mut decompressed_data = Vec::new();
718            compressor
719                .decompress(&compressed_data, &mut decompressed_data)
720                .unwrap();
721            assert_eq!(input_data, decompressed_data.as_slice());
722        }
723    }
724
725    #[cfg(feature = "lz4")]
726    mod lz4 {
727        use std::{collections::HashMap, sync::Arc};
728
729        use arrow_schema::{DataType, Field};
730        use lance_datagen::array::{binary_prefix_plus_counter, utf8_prefix_plus_counter};
731
732        use super::*;
733
734        use crate::constants::DICT_SIZE_RATIO_META_KEY;
735        use crate::{
736            constants::{
737                COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY, STRUCTURAL_ENCODING_FULLZIP,
738                STRUCTURAL_ENCODING_META_KEY,
739            },
740            encodings::physical::block::lz4::Lz4BufferCompressor,
741            testing::{check_round_trip_encoding_generated, FnArrayGeneratorProvider, TestCases},
742            version::LanceFileVersion,
743        };
744
745        #[test]
746        fn test_lz4_compress_decompress() {
747            let compressor = Lz4BufferCompressor::default();
748            let input_data = b"Hello, world!";
749            let mut compressed_data = Vec::new();
750
751            compressor
752                .compress(input_data, &mut compressed_data)
753                .unwrap();
754            let mut decompressed_data = Vec::new();
755            compressor
756                .decompress(&compressed_data, &mut decompressed_data)
757                .unwrap();
758            assert_eq!(input_data, decompressed_data.as_slice());
759        }
760
761        #[test_log::test(tokio::test)]
762        async fn test_lz4_compress_round_trip() {
763            for data_type in &[
764                DataType::Utf8,
765                DataType::LargeUtf8,
766                DataType::Binary,
767                DataType::LargeBinary,
768            ] {
769                let field = Field::new("", data_type.clone(), false);
770                let mut field_meta = HashMap::new();
771                field_meta.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
772                // Some bad cardinality estimatation causes us to use dictionary encoding currently
773                // which causes the expected encoding check to fail.
774                field_meta.insert(DICT_DIVISOR_META_KEY.to_string(), "100000".to_string());
775                field_meta.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.0001".to_string());
776                // Also disable size-based dictionary encoding
777                field_meta.insert(
778                    STRUCTURAL_ENCODING_META_KEY.to_string(),
779                    STRUCTURAL_ENCODING_FULLZIP.to_string(),
780                );
781                let field = field.with_metadata(field_meta);
782                let test_cases = TestCases::basic()
783                    // Need to use large pages as small pages might be too small to compress
784                    .with_page_sizes(vec![1024 * 1024])
785                    .with_expected_encoding("zstd")
786                    .with_min_file_version(LanceFileVersion::V2_1);
787
788                // Can't use the default random provider because random data isn't compressible
789                // and we will fallback to uncompressed encoding
790                let datagen = Box::new(FnArrayGeneratorProvider::new(move || match data_type {
791                    DataType::Utf8 => utf8_prefix_plus_counter("compressme", false),
792                    DataType::Binary => {
793                        binary_prefix_plus_counter(Arc::from(b"compressme".to_owned()), false)
794                    }
795                    DataType::LargeUtf8 => utf8_prefix_plus_counter("compressme", true),
796                    DataType::LargeBinary => {
797                        binary_prefix_plus_counter(Arc::from(b"compressme".to_owned()), true)
798                    }
799                    _ => panic!("Unsupported data type: {:?}", data_type),
800                }));
801
802                check_round_trip_encoding_generated(field, datagen, test_cases).await;
803            }
804        }
805    }
806}