1use arrow_buffer::ArrowNativeType;
25use lance_core::{Error, Result};
26use snafu::location;
27
28use std::str::FromStr;
29
30use crate::compression::{BlockCompressor, BlockDecompressor};
31use crate::encodings::physical::binary::{BinaryBlockDecompressor, VariableEncoder};
32use crate::format::{
33 pb21::{self, CompressiveEncoding},
34 ProtobufUtils21,
35};
36use crate::{
37 buffer::LanceBuffer,
38 compression::VariablePerValueDecompressor,
39 data::{BlockInfo, DataBlock, VariableWidthBlock},
40 encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock},
41};
42
43#[derive(Debug, Clone, Copy, PartialEq)]
44pub struct CompressionConfig {
45 pub(crate) scheme: CompressionScheme,
46 pub(crate) level: Option<i32>,
47}
48
49impl CompressionConfig {
50 pub(crate) fn new(scheme: CompressionScheme, level: Option<i32>) -> Self {
51 Self { scheme, level }
52 }
53}
54
55impl Default for CompressionConfig {
56 fn default() -> Self {
57 Self {
58 scheme: CompressionScheme::Lz4,
59 level: Some(0),
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq)]
65pub enum CompressionScheme {
66 None,
67 Fsst,
68 Zstd,
69 Lz4,
70}
71
72impl TryFrom<CompressionScheme> for pb21::CompressionScheme {
73 type Error = Error;
74
75 fn try_from(scheme: CompressionScheme) -> Result<Self> {
76 match scheme {
77 CompressionScheme::Lz4 => Ok(Self::CompressionAlgorithmLz4),
78 CompressionScheme::Zstd => Ok(Self::CompressionAlgorithmZstd),
79 _ => Err(Error::invalid_input(
80 format!("Unsupported compression scheme: {:?}", scheme),
81 location!(),
82 )),
83 }
84 }
85}
86
87impl TryFrom<pb21::CompressionScheme> for CompressionScheme {
88 type Error = Error;
89
90 fn try_from(scheme: pb21::CompressionScheme) -> Result<Self> {
91 match scheme {
92 pb21::CompressionScheme::CompressionAlgorithmLz4 => Ok(Self::Lz4),
93 pb21::CompressionScheme::CompressionAlgorithmZstd => Ok(Self::Zstd),
94 _ => Err(Error::invalid_input(
95 format!("Unsupported compression scheme: {:?}", scheme),
96 location!(),
97 )),
98 }
99 }
100}
101
102impl std::fmt::Display for CompressionScheme {
103 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
104 let scheme_str = match self {
105 Self::Fsst => "fsst",
106 Self::Zstd => "zstd",
107 Self::None => "none",
108 Self::Lz4 => "lz4",
109 };
110 write!(f, "{}", scheme_str)
111 }
112}
113
114impl FromStr for CompressionScheme {
115 type Err = Error;
116
117 fn from_str(s: &str) -> Result<Self> {
118 match s {
119 "none" => Ok(Self::None),
120 "fsst" => Ok(Self::Fsst),
121 "zstd" => Ok(Self::Zstd),
122 "lz4" => Ok(Self::Lz4),
123 _ => Err(Error::invalid_input(
124 format!("Unknown compression scheme: {}", s),
125 location!(),
126 )),
127 }
128 }
129}
130
131pub trait BufferCompressor: std::fmt::Debug + Send + Sync {
132 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
133 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
134 fn config(&self) -> CompressionConfig;
135}
136
137#[cfg(feature = "zstd")]
138mod zstd {
139 use std::io::{Cursor, Write};
140
141 use super::*;
142
143 use ::zstd::bulk::decompress_to_buffer;
144 use ::zstd::stream::copy_decode;
145
146 #[derive(Debug, Default)]
147 pub struct ZstdBufferCompressor {
148 compression_level: i32,
149 }
150
151 impl ZstdBufferCompressor {
152 pub fn new(compression_level: i32) -> Self {
153 Self { compression_level }
154 }
155
156 fn is_raw_stream_format(&self, input_buf: &[u8]) -> bool {
158 if input_buf.len() < 8 {
159 return true; }
161 let mut magic_buf = [0u8; 4];
163 magic_buf.copy_from_slice(&input_buf[..4]);
164 let magic = u32::from_le_bytes(magic_buf);
165
166 const ZSTD_MAGIC_NUMBER: u32 = 0xFD2FB528;
168 if magic == ZSTD_MAGIC_NUMBER {
169 const FHD_BYTE_INDEX: usize = 4;
173 let fhd_byte = input_buf[FHD_BYTE_INDEX];
174 const FHD_RESERVED_BIT_MASK: u8 = 0b0001_0000;
175 let reserved_bit = fhd_byte & FHD_RESERVED_BIT_MASK;
176
177 if reserved_bit != 0 {
178 false
182 } else {
183 true
186 }
187 } else {
188 false
190 }
191 }
192
193 fn decompress_length_prefixed_zstd(
194 &self,
195 input_buf: &[u8],
196 output_buf: &mut Vec<u8>,
197 ) -> Result<()> {
198 const LENGTH_PREFIX_SIZE: usize = 8;
199 let mut len_buf = [0u8; LENGTH_PREFIX_SIZE];
200 len_buf.copy_from_slice(&input_buf[..LENGTH_PREFIX_SIZE]);
201
202 let uncompressed_len = u64::from_le_bytes(len_buf) as usize;
203
204 let start = output_buf.len();
205 output_buf.resize(start + uncompressed_len, 0);
206
207 let compressed_data = &input_buf[LENGTH_PREFIX_SIZE..];
208 decompress_to_buffer(compressed_data, &mut output_buf[start..])?;
209 Ok(())
210 }
211 }
212
213 impl BufferCompressor for ZstdBufferCompressor {
214 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
215 output_buf.write_all(&(input_buf.len() as u64).to_le_bytes())?;
216 let mut encoder = ::zstd::stream::Encoder::new(output_buf, self.compression_level)?;
217
218 encoder.write_all(input_buf)?;
219 match encoder.finish() {
220 Ok(_) => Ok(()),
221 Err(e) => Err(e.into()),
222 }
223 }
224
225 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
226 if input_buf.is_empty() {
227 return Ok(());
228 }
229
230 let is_raw_stream_format = self.is_raw_stream_format(input_buf);
231 if is_raw_stream_format {
232 copy_decode(Cursor::new(input_buf), output_buf)?;
233 } else {
234 self.decompress_length_prefixed_zstd(input_buf, output_buf)?;
235 }
236
237 Ok(())
238 }
239
240 fn config(&self) -> CompressionConfig {
241 CompressionConfig {
242 scheme: CompressionScheme::Zstd,
243 level: Some(self.compression_level),
244 }
245 }
246 }
247}
248
249#[cfg(feature = "lz4")]
250mod lz4 {
251 use super::*;
252
253 #[derive(Debug, Default)]
254 pub struct Lz4BufferCompressor {}
255
256 impl BufferCompressor for Lz4BufferCompressor {
257 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
258 let start_pos = output_buf.len();
260
261 let max_size = ::lz4::block::compress_bound(input_buf.len())?;
263 output_buf.resize(start_pos + max_size + 4, 0);
265
266 let compressed_size = ::lz4::block::compress_to_buffer(
267 input_buf,
268 None,
269 true,
270 &mut output_buf[start_pos..],
271 )
272 .map_err(|err| Error::Internal {
273 message: format!("LZ4 compression error: {}", err),
274 location: location!(),
275 })?;
276
277 output_buf.truncate(start_pos + compressed_size);
279 Ok(())
280 }
281
282 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
283 if input_buf.len() < 4 {
286 return Err(Error::Internal {
287 message: "LZ4 compressed data too short".to_string(),
288 location: location!(),
289 });
290 }
291
292 let uncompressed_size =
294 u32::from_le_bytes([input_buf[0], input_buf[1], input_buf[2], input_buf[3]])
295 as usize;
296
297 let start_pos = output_buf.len();
299
300 output_buf.resize(start_pos + uncompressed_size, 0);
302
303 let decompressed_size =
305 ::lz4::block::decompress_to_buffer(input_buf, None, &mut output_buf[start_pos..])
306 .map_err(|err| Error::Internal {
307 message: format!("LZ4 decompression error: {}", err),
308 location: location!(),
309 })?;
310
311 output_buf.truncate(start_pos + decompressed_size);
313
314 Ok(())
315 }
316
317 fn config(&self) -> CompressionConfig {
318 CompressionConfig {
319 scheme: CompressionScheme::Lz4,
320 level: None,
321 }
322 }
323 }
324}
325
326#[derive(Debug, Default)]
327pub struct NoopBufferCompressor {}
328
329impl BufferCompressor for NoopBufferCompressor {
330 fn compress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
331 output_buf.extend_from_slice(input_buf);
332 Ok(())
333 }
334
335 fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
336 output_buf.extend_from_slice(input_buf);
337 Ok(())
338 }
339
340 fn config(&self) -> CompressionConfig {
341 CompressionConfig {
342 scheme: CompressionScheme::None,
343 level: None,
344 }
345 }
346}
347
348pub struct GeneralBufferCompressor {}
349
350impl GeneralBufferCompressor {
351 pub fn get_compressor(
352 compression_config: CompressionConfig,
353 ) -> Result<Box<dyn BufferCompressor>> {
354 match compression_config.scheme {
355 CompressionScheme::Fsst => Err(Error::InvalidInput {
357 source: "fsst is not usable as a general buffer compressor".into(),
358 location: location!(),
359 }),
360 CompressionScheme::Zstd => {
361 #[cfg(feature = "zstd")]
362 {
363 Ok(Box::new(zstd::ZstdBufferCompressor::new(
364 compression_config.level.unwrap_or(0),
365 )))
366 }
367 #[cfg(not(feature = "zstd"))]
368 {
369 Err(Error::InvalidInput {
370 source: "package was not built with zstd support".into(),
371 location: location!(),
372 })
373 }
374 }
375 CompressionScheme::Lz4 => {
376 #[cfg(feature = "lz4")]
377 {
378 Ok(Box::new(lz4::Lz4BufferCompressor::default()))
379 }
380 #[cfg(not(feature = "lz4"))]
381 {
382 Err(Error::InvalidInput {
383 source: "package was not built with lz4 support".into(),
384 location: location!(),
385 })
386 }
387 }
388 CompressionScheme::None => Ok(Box::new(NoopBufferCompressor {})),
389 }
390 }
391}
392
393#[derive(Debug)]
396pub struct GeneralBlockDecompressor {
397 inner: Box<dyn BlockDecompressor>,
398 compressor: Box<dyn BufferCompressor>,
399}
400
401impl GeneralBlockDecompressor {
402 pub fn try_new(
403 inner: Box<dyn BlockDecompressor>,
404 compression: CompressionConfig,
405 ) -> Result<Self> {
406 let compressor = GeneralBufferCompressor::get_compressor(compression)?;
407 Ok(Self { inner, compressor })
408 }
409}
410
411impl BlockDecompressor for GeneralBlockDecompressor {
412 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
413 let mut decompressed = Vec::new();
414 self.compressor.decompress(&data, &mut decompressed)?;
415 self.inner
416 .decompress(LanceBuffer::from(decompressed), num_values)
417 }
418}
419
420#[derive(Debug)]
422pub struct CompressedBufferEncoder {
423 pub(crate) compressor: Box<dyn BufferCompressor>,
424}
425
426impl Default for CompressedBufferEncoder {
427 fn default() -> Self {
428 #[cfg(feature = "zstd")]
430 let (scheme, level) = (CompressionScheme::Zstd, Some(0));
431 #[cfg(all(feature = "lz4", not(feature = "zstd")))]
432 let (scheme, level) = (CompressionScheme::Lz4, None);
433 #[cfg(not(any(feature = "zstd", feature = "lz4")))]
434 let (scheme, level) = (CompressionScheme::None, None);
435
436 let compressor =
437 GeneralBufferCompressor::get_compressor(CompressionConfig { scheme, level }).unwrap();
438 Self { compressor }
439 }
440}
441
442impl CompressedBufferEncoder {
443 pub fn try_new(compression_config: CompressionConfig) -> Result<Self> {
444 let compressor = GeneralBufferCompressor::get_compressor(compression_config)?;
445 Ok(Self { compressor })
446 }
447
448 pub fn from_scheme(scheme: pb21::CompressionScheme) -> Result<Self> {
449 let scheme = CompressionScheme::try_from(scheme)?;
450 Ok(Self {
451 compressor: GeneralBufferCompressor::get_compressor(CompressionConfig {
452 scheme,
453 level: Some(0),
454 })?,
455 })
456 }
457}
458
459impl CompressedBufferEncoder {
460 pub fn per_value_compress<T: ArrowNativeType>(
461 &self,
462 data: &[u8],
463 offsets: &[T],
464 compressed: &mut Vec<u8>,
465 ) -> Result<LanceBuffer> {
466 let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
467 new_offsets.push(T::from_usize(0).unwrap());
468
469 for off in offsets.windows(2) {
470 let start = off[0].as_usize();
471 let end = off[1].as_usize();
472 self.compressor.compress(&data[start..end], compressed)?;
473 new_offsets.push(T::from_usize(compressed.len()).unwrap());
474 }
475
476 Ok(LanceBuffer::reinterpret_vec(new_offsets))
477 }
478
479 pub fn per_value_decompress<T: ArrowNativeType>(
480 &self,
481 data: &[u8],
482 offsets: &[T],
483 decompressed: &mut Vec<u8>,
484 ) -> Result<LanceBuffer> {
485 let mut new_offsets: Vec<T> = Vec::with_capacity(offsets.len());
486 new_offsets.push(T::from_usize(0).unwrap());
487
488 for off in offsets.windows(2) {
489 let start = off[0].as_usize();
490 let end = off[1].as_usize();
491 self.compressor
492 .decompress(&data[start..end], decompressed)?;
493 new_offsets.push(T::from_usize(decompressed.len()).unwrap());
494 }
495
496 Ok(LanceBuffer::reinterpret_vec(new_offsets))
497 }
498}
499
500impl PerValueCompressor for CompressedBufferEncoder {
501 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
502 let data_type = data.name();
503 let data = data.as_variable_width().ok_or(Error::Internal {
504 message: format!(
505 "Attempt to use CompressedBufferEncoder on data of type {}",
506 data_type
507 ),
508 location: location!(),
509 })?;
510
511 let data_bytes = &data.data;
512 let mut compressed = Vec::with_capacity(data_bytes.len());
513
514 let new_offsets = match data.bits_per_offset {
515 32 => self.per_value_compress::<u32>(
516 data_bytes,
517 &data.offsets.borrow_to_typed_slice::<u32>(),
518 &mut compressed,
519 )?,
520 64 => self.per_value_compress::<u64>(
521 data_bytes,
522 &data.offsets.borrow_to_typed_slice::<u64>(),
523 &mut compressed,
524 )?,
525 _ => unreachable!(),
526 };
527
528 let compressed = PerValueDataBlock::Variable(VariableWidthBlock {
529 bits_per_offset: data.bits_per_offset,
530 data: LanceBuffer::from(compressed),
531 offsets: new_offsets,
532 num_values: data.num_values,
533 block_info: BlockInfo::new(),
534 });
535
536 let encoding = ProtobufUtils21::wrapped(
539 self.compressor.config(),
540 ProtobufUtils21::variable(
541 ProtobufUtils21::flat(data.bits_per_offset as u64, None),
542 None,
543 ),
544 )?;
545
546 Ok((compressed, encoding))
547 }
548}
549
550impl VariablePerValueDecompressor for CompressedBufferEncoder {
551 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
552 let data_bytes = &data.data;
553 let mut decompressed = Vec::with_capacity(data_bytes.len() * 2);
554
555 let new_offsets = match data.bits_per_offset {
556 32 => self.per_value_decompress(
557 data_bytes,
558 &data.offsets.borrow_to_typed_slice::<u32>(),
559 &mut decompressed,
560 )?,
561 64 => self.per_value_decompress(
562 data_bytes,
563 &data.offsets.borrow_to_typed_slice::<u64>(),
564 &mut decompressed,
565 )?,
566 _ => unreachable!(),
567 };
568 Ok(DataBlock::VariableWidth(VariableWidthBlock {
569 bits_per_offset: data.bits_per_offset,
570 data: LanceBuffer::from(decompressed),
571 offsets: new_offsets,
572 num_values: data.num_values,
573 block_info: BlockInfo::new(),
574 }))
575 }
576}
577
578impl BlockCompressor for CompressedBufferEncoder {
579 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
580 let encoded = match data {
581 DataBlock::FixedWidth(fixed_width) => fixed_width.data,
582 DataBlock::VariableWidth(variable_width) => {
583 let encoder = VariableEncoder::default();
585 BlockCompressor::compress(&encoder, DataBlock::VariableWidth(variable_width))?
586 }
587 _ => {
588 return Err(Error::InvalidInput {
589 source: "Unsupported data block type".into(),
590 location: location!(),
591 })
592 }
593 };
594
595 let mut compressed = Vec::new();
596 self.compressor.compress(&encoded, &mut compressed)?;
597 Ok(LanceBuffer::from(compressed))
598 }
599}
600
601impl BlockDecompressor for CompressedBufferEncoder {
602 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
603 let mut decompressed = Vec::new();
604 self.compressor.decompress(&data, &mut decompressed)?;
605
606 let inner_decoder = BinaryBlockDecompressor::default();
608 inner_decoder.decompress(LanceBuffer::from(decompressed), num_values)
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use std::str::FromStr;
616
617 use crate::encodings::physical::block::zstd::ZstdBufferCompressor;
618
619 #[test]
620 fn test_compression_scheme_from_str() {
621 assert_eq!(
622 CompressionScheme::from_str("none").unwrap(),
623 CompressionScheme::None
624 );
625 assert_eq!(
626 CompressionScheme::from_str("zstd").unwrap(),
627 CompressionScheme::Zstd
628 );
629 }
630
631 #[test]
632 fn test_compression_scheme_from_str_invalid() {
633 assert!(CompressionScheme::from_str("invalid").is_err());
634 }
635
636 #[cfg(feature = "zstd")]
637 mod zstd {
638 use std::io::Write;
639
640 use super::*;
641
642 #[test]
643 fn test_compress_zstd_with_length_prefixed() {
644 let compressor = ZstdBufferCompressor::new(0);
645 let input_data = b"Hello, world!";
646 let mut compressed_data = Vec::new();
647
648 compressor
649 .compress(input_data, &mut compressed_data)
650 .unwrap();
651 let mut decompressed_data = Vec::new();
652 compressor
653 .decompress(&compressed_data, &mut decompressed_data)
654 .unwrap();
655 assert_eq!(input_data, decompressed_data.as_slice());
656 }
657
658 #[test]
659 fn test_zstd_compress_decompress_multiple_times() {
660 let compressor = ZstdBufferCompressor::new(0);
661 let (input_data_1, input_data_2) = (b"Hello ", b"World");
662 let mut compressed_data = Vec::new();
663
664 compressor
665 .compress(input_data_1, &mut compressed_data)
666 .unwrap();
667 let compressed_length_1 = compressed_data.len();
668
669 compressor
670 .compress(input_data_2, &mut compressed_data)
671 .unwrap();
672
673 let mut decompressed_data = Vec::new();
674 compressor
675 .decompress(
676 &compressed_data[..compressed_length_1],
677 &mut decompressed_data,
678 )
679 .unwrap();
680
681 compressor
682 .decompress(
683 &compressed_data[compressed_length_1..],
684 &mut decompressed_data,
685 )
686 .unwrap();
687
688 assert_eq!(
690 decompressed_data.len(),
691 input_data_1.len() + input_data_2.len()
692 );
693 assert_eq!(
694 &decompressed_data[..input_data_1.len()],
695 input_data_1,
696 "First part of decompressed data should match input_1"
697 );
698 assert_eq!(
699 &decompressed_data[input_data_1.len()..],
700 input_data_2,
701 "Second part of decompressed data should match input_2"
702 );
703 }
704
705 #[test]
706 fn test_compress_zstd_raw_stream_format_and_decompress_with_length_prefixed() {
707 let compressor = ZstdBufferCompressor::new(0);
708 let input_data = b"Hello, world!";
709 let mut compressed_data = Vec::new();
710
711 let mut encoder = ::zstd::Encoder::new(&mut compressed_data, 0).unwrap();
713 encoder.write_all(input_data).unwrap();
714 encoder.finish().expect("failed to encode data with zstd");
715
716 let mut decompressed_data = Vec::new();
718 compressor
719 .decompress(&compressed_data, &mut decompressed_data)
720 .unwrap();
721 assert_eq!(input_data, decompressed_data.as_slice());
722 }
723 }
724
725 #[cfg(feature = "lz4")]
726 mod lz4 {
727 use std::{collections::HashMap, sync::Arc};
728
729 use arrow_schema::{DataType, Field};
730 use lance_datagen::array::{binary_prefix_plus_counter, utf8_prefix_plus_counter};
731
732 use super::*;
733
734 use crate::constants::DICT_SIZE_RATIO_META_KEY;
735 use crate::{
736 constants::{
737 COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY, STRUCTURAL_ENCODING_FULLZIP,
738 STRUCTURAL_ENCODING_META_KEY,
739 },
740 encodings::physical::block::lz4::Lz4BufferCompressor,
741 testing::{check_round_trip_encoding_generated, FnArrayGeneratorProvider, TestCases},
742 version::LanceFileVersion,
743 };
744
745 #[test]
746 fn test_lz4_compress_decompress() {
747 let compressor = Lz4BufferCompressor::default();
748 let input_data = b"Hello, world!";
749 let mut compressed_data = Vec::new();
750
751 compressor
752 .compress(input_data, &mut compressed_data)
753 .unwrap();
754 let mut decompressed_data = Vec::new();
755 compressor
756 .decompress(&compressed_data, &mut decompressed_data)
757 .unwrap();
758 assert_eq!(input_data, decompressed_data.as_slice());
759 }
760
761 #[test_log::test(tokio::test)]
762 async fn test_lz4_compress_round_trip() {
763 for data_type in &[
764 DataType::Utf8,
765 DataType::LargeUtf8,
766 DataType::Binary,
767 DataType::LargeBinary,
768 ] {
769 let field = Field::new("", data_type.clone(), false);
770 let mut field_meta = HashMap::new();
771 field_meta.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
772 field_meta.insert(DICT_DIVISOR_META_KEY.to_string(), "100000".to_string());
775 field_meta.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.0001".to_string());
776 field_meta.insert(
778 STRUCTURAL_ENCODING_META_KEY.to_string(),
779 STRUCTURAL_ENCODING_FULLZIP.to_string(),
780 );
781 let field = field.with_metadata(field_meta);
782 let test_cases = TestCases::basic()
783 .with_page_sizes(vec![1024 * 1024])
785 .with_expected_encoding("zstd")
786 .with_min_file_version(LanceFileVersion::V2_1);
787
788 let datagen = Box::new(FnArrayGeneratorProvider::new(move || match data_type {
791 DataType::Utf8 => utf8_prefix_plus_counter("compressme", false),
792 DataType::Binary => {
793 binary_prefix_plus_counter(Arc::from(b"compressme".to_owned()), false)
794 }
795 DataType::LargeUtf8 => utf8_prefix_plus_counter("compressme", true),
796 DataType::LargeBinary => {
797 binary_prefix_plus_counter(Arc::from(b"compressme".to_owned()), true)
798 }
799 _ => panic!("Unsupported data type: {:?}", data_type),
800 }));
801
802 check_round_trip_encoding_generated(field, datagen, test_cases).await;
803 }
804 }
805 }
806}