Skip to main content

lance_encoding/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Basic encodings for variable width data
5//!
6//! These are not compression but represent the "leaf" encodings for variable length data
7//! where we simply match the data with the rules of the structural encoding.
8//!
9//! These encodings are transparent since we aren't actually doing any compression.  No information
10//! is needed in the encoding description.
11
12use arrow_array::OffsetSizeTrait;
13use byteorder::{ByteOrder, LittleEndian};
14use core::panic;
15use snafu::location;
16
17use crate::compression::{
18    BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
19};
20
21use crate::buffer::LanceBuffer;
22use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
23use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
24use crate::encodings::logical::primitive::miniblock::{
25    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_VALUES,
26};
27use crate::format::pb21::compressive_encoding::Compression;
28use crate::format::pb21::CompressiveEncoding;
29use crate::format::{pb21, ProtobufUtils21};
30
31use lance_core::utils::bit::pad_bytes_to;
32use lance_core::{Error, Result};
33
34#[derive(Debug)]
35pub struct BinaryMiniBlockEncoder {
36    minichunk_size: i64,
37}
38
39impl Default for BinaryMiniBlockEncoder {
40    fn default() -> Self {
41        Self {
42            minichunk_size: *AIM_MINICHUNK_SIZE,
43        }
44    }
45}
46
47const DEFAULT_AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
48
49pub static AIM_MINICHUNK_SIZE: std::sync::LazyLock<i64> = std::sync::LazyLock::new(|| {
50    std::env::var("LANCE_BINARY_MINIBLOCK_CHUNK_SIZE")
51        .unwrap_or_else(|_| DEFAULT_AIM_MINICHUNK_SIZE.to_string())
52        .parse::<i64>()
53        .unwrap_or(DEFAULT_AIM_MINICHUNK_SIZE)
54});
55
56// Make it to support both u32 and u64
57fn chunk_offsets<N: OffsetSizeTrait>(
58    offsets: &[N],
59    data: &[u8],
60    alignment: usize,
61    minichunk_size: i64,
62) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
63    #[derive(Debug)]
64    struct ChunkInfo {
65        chunk_start_offset_in_orig_idx: usize,
66        chunk_last_offset_in_orig_idx: usize,
67        // the bytes in every chunk starts at `chunk.bytes_start_offset`
68        bytes_start_offset: usize,
69        // every chunk is padded to 8 bytes.
70        // we need to interpret every chunk as &[u32] so we need it to padded at least to 4 bytes,
71        // this field can actually be eliminated and I can use `num_bytes` in `MiniBlockChunk` to compute
72        // the `output_total_bytes`.
73        padded_chunk_size: usize,
74    }
75
76    let byte_width: usize = N::get_byte_width();
77    let mut chunks_info = vec![];
78    let mut chunks = vec![];
79    let mut last_offset_in_orig_idx = 0;
80    loop {
81        let this_last_offset_in_orig_idx =
82            search_next_offset_idx(offsets, last_offset_in_orig_idx, minichunk_size);
83
84        let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
85        let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
86        let this_chunk_size =
87            (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
88
89        let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
90        debug_assert!(padded_chunk_size > 0);
91
92        let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
93        chunks_info.push(ChunkInfo {
94            chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
95            chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
96            bytes_start_offset: this_chunk_bytes_start_offset,
97            padded_chunk_size,
98        });
99        chunks.push(MiniBlockChunk {
100            log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
101                0
102            } else {
103                num_values_in_this_chunk.trailing_zeros() as u8
104            },
105            buffer_sizes: vec![padded_chunk_size as u32],
106        });
107        if this_last_offset_in_orig_idx == offsets.len() - 1 {
108            break;
109        }
110        last_offset_in_orig_idx = this_last_offset_in_orig_idx;
111    }
112
113    let output_total_bytes = chunks_info
114        .iter()
115        .map(|chunk_info| chunk_info.padded_chunk_size)
116        .sum::<usize>();
117
118    let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
119
120    for chunk in chunks_info {
121        let this_chunk_offsets: Vec<N> = offsets
122            [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
123            .iter()
124            .map(|offset| {
125                *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
126                    + N::from_usize(chunk.bytes_start_offset).unwrap()
127            })
128            .collect();
129
130        let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
131        output.extend_from_slice(&this_chunk_offsets);
132
133        let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
134            .to_usize()
135            .unwrap();
136        let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
137            .to_usize()
138            .unwrap();
139        output.extend_from_slice(&data[start_in_orig..end_in_orig]);
140
141        // pad this chunk to make it align to desired bytes.
142        const PAD_BYTE: u8 = 72;
143        let pad_len = pad_bytes_to(output.len(), alignment);
144
145        // Compare with usize literal to avoid type mismatch with N
146        if pad_len > 0_usize {
147            output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
148        }
149    }
150    (vec![LanceBuffer::reinterpret_vec(output)], chunks)
151}
152
153// search for the next offset index to cut the values into a chunk.
154// this function incrementally peek the number of values in a chunk,
155// each time multiplies the number of values by 2.
156// It returns the offset_idx in `offsets` that belongs to this chunk.
157fn search_next_offset_idx<N: OffsetSizeTrait>(
158    offsets: &[N],
159    last_offset_idx: usize,
160    minichunk_size: i64,
161) -> usize {
162    let mut num_values = 1;
163    let mut new_num_values = num_values * 2;
164    loop {
165        if last_offset_idx + new_num_values >= offsets.len() {
166            let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
167            // existing bytes plus the new offset size
168            let new_size = existing_bytes
169                + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
170            if new_size.to_i64().unwrap() <= minichunk_size {
171                // case 1: can fit the rest of all data into a miniblock
172                return offsets.len() - 1;
173            } else {
174                // case 2: can only fit the last tried `num_values` into a miniblock
175                return last_offset_idx + num_values;
176            }
177        }
178        let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
179        let new_size =
180            existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
181        if new_size.to_i64().unwrap() <= minichunk_size {
182            if new_num_values * 2 > MAX_MINIBLOCK_VALUES as usize {
183                // hit the max number of values limit
184                break;
185            }
186            num_values = new_num_values;
187            new_num_values *= 2;
188        } else {
189            break;
190        }
191    }
192    last_offset_idx + num_values
193}
194
195impl BinaryMiniBlockEncoder {
196    pub fn new(minichunk_size: Option<i64>) -> Self {
197        Self {
198            minichunk_size: minichunk_size.unwrap_or(*AIM_MINICHUNK_SIZE),
199        }
200    }
201
202    // put binary data into chunks, every chunk is less than or equal to `minichunk_size`.
203    // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes.
204    // the offsets in the chunk points to the bytes offset in this chunk.
205    fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
206        // TODO: Support compression of offsets
207        // TODO: Support general compression of data
208        match data.bits_per_offset {
209            32 => {
210                let offsets = data.offsets.borrow_to_typed_slice::<i32>();
211                let (buffers, chunks) =
212                    chunk_offsets(offsets.as_ref(), &data.data, 4, self.minichunk_size);
213                (
214                    MiniBlockCompressed {
215                        data: buffers,
216                        chunks,
217                        num_values: data.num_values,
218                    },
219                    ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
220                )
221            }
222            64 => {
223                let offsets = data.offsets.borrow_to_typed_slice::<i64>();
224                let (buffers, chunks) =
225                    chunk_offsets(offsets.as_ref(), &data.data, 8, self.minichunk_size);
226                (
227                    MiniBlockCompressed {
228                        data: buffers,
229                        chunks,
230                        num_values: data.num_values,
231                    },
232                    ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
233                )
234            }
235            _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
236        }
237    }
238}
239
240impl MiniBlockCompressor for BinaryMiniBlockEncoder {
241    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
242        match data {
243            DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
244            _ => Err(Error::InvalidInput {
245                source: format!(
246                    "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
247                    data.name()
248                )
249                .into(),
250                location: location!(),
251            }),
252        }
253    }
254}
255
256#[derive(Debug)]
257pub struct BinaryMiniBlockDecompressor {
258    bits_per_offset: u8,
259}
260
261impl BinaryMiniBlockDecompressor {
262    pub fn new(bits_per_offset: u8) -> Self {
263        assert!(bits_per_offset == 32 || bits_per_offset == 64);
264        Self { bits_per_offset }
265    }
266
267    pub fn from_variable(variable: &pb21::Variable) -> Self {
268        if let Compression::Flat(flat) = variable
269            .offsets
270            .as_ref()
271            .unwrap()
272            .compression
273            .as_ref()
274            .unwrap()
275        {
276            Self {
277                bits_per_offset: flat.bits_per_value as u8,
278            }
279        } else {
280            panic!("Unsupported offsets compression: {:?}", variable.offsets);
281        }
282    }
283}
284
285impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
286    // decompress a MiniBlock of binary data, the num_values must be less than or equal
287    // to the number of values this MiniBlock has, BinaryMiniBlock doesn't store `the number of values`
288    // it has so assertion can not be done here and the caller of `decompress` must ensure
289    // `num_values` <= number of values in the chunk.
290    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
291        assert_eq!(data.len(), 1);
292        let data = data.into_iter().next().unwrap();
293
294        if self.bits_per_offset == 64 {
295            // offset and at least one value
296            assert!(data.len() >= 16);
297
298            let offsets_buffer = data.borrow_to_typed_slice::<u64>();
299            let offsets = offsets_buffer.as_ref();
300
301            let result_offsets = offsets[0..(num_values + 1) as usize]
302                .iter()
303                .map(|offset| offset - offsets[0])
304                .collect::<Vec<u64>>();
305
306            Ok(DataBlock::VariableWidth(VariableWidthBlock {
307                data: LanceBuffer::from(
308                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
309                ),
310                offsets: LanceBuffer::reinterpret_vec(result_offsets),
311                bits_per_offset: 64,
312                num_values,
313                block_info: BlockInfo::new(),
314            }))
315        } else {
316            // offset and at least one value
317            assert!(data.len() >= 8);
318
319            let offsets_buffer = data.borrow_to_typed_slice::<u32>();
320            let offsets = offsets_buffer.as_ref();
321
322            let result_offsets = offsets[0..(num_values + 1) as usize]
323                .iter()
324                .map(|offset| offset - offsets[0])
325                .collect::<Vec<u32>>();
326
327            Ok(DataBlock::VariableWidth(VariableWidthBlock {
328                data: LanceBuffer::from(
329                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
330                ),
331                offsets: LanceBuffer::reinterpret_vec(result_offsets),
332                bits_per_offset: 32,
333                num_values,
334                block_info: BlockInfo::new(),
335            }))
336        }
337    }
338}
339
340/// Most basic encoding for variable-width data which does no compression at all
341/// The DataBlock memory layout looks like below:
342///
343/// | bits_per_offset           | bytes_start_offset        | offsets data | bytes data |
344/// | ------------------------- | ------------------------- | ------------ | ---------- |
345/// | <bits_per_offset>/8 bytes | <bits_per_offset>/8 bytes | offsets_len  | data_len   |
346///
347/// It's used in VariableEncoder and BinaryBlockDecompressor
348///
349#[derive(Debug, Default)]
350pub struct VariableEncoder {}
351
352impl BlockCompressor for VariableEncoder {
353    fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
354        match data {
355            DataBlock::VariableWidth(ref mut variable_width_data) => {
356                match variable_width_data.bits_per_offset {
357                    32 => {
358                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
359                        let offsets = offsets.as_ref();
360                        // The first 4 bytes store the bits per offset, the next 4 bytes store the start
361                        // offset of the bytes data, then offsets data, then bytes data.
362                        let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
363
364                        let output_total_bytes =
365                            bytes_start_offset as usize + variable_width_data.data.len();
366                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
367
368                        // Store bit_per_offset info
369                        output.extend_from_slice(&(32_u32).to_le_bytes());
370
371                        // store `bytes_start_offset` in the next 4 bytes of output buffer
372                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
373
374                        // store offsets
375                        output.extend_from_slice(&variable_width_data.offsets);
376
377                        // store bytes
378                        output.extend_from_slice(&variable_width_data.data);
379                        Ok(LanceBuffer::from(output))
380                    }
381                    64 => {
382                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
383                        let offsets = offsets.as_ref();
384                        // The first 8 bytes store the bits per offset, the next 8 bytes store the start
385                        // offset of the bytes data, then offsets data, then bytes data.
386                        let bytes_start_offset = 8 + 8 + std::mem::size_of_val(offsets) as u64;
387
388                        let output_total_bytes =
389                            bytes_start_offset as usize + variable_width_data.data.len();
390                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
391
392                        // Store bit_per_offset info
393                        output.extend_from_slice(&(64_u64).to_le_bytes());
394
395                        // store `bytes_start_offset` in the next 8 bytes of output buffer
396                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
397
398                        // store offsets
399                        output.extend_from_slice(&variable_width_data.offsets);
400
401                        // store bytes
402                        output.extend_from_slice(&variable_width_data.data);
403                        Ok(LanceBuffer::from(output))
404                    }
405                    _ => {
406                        panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
407                variable_width_data.bits_per_offset);
408                    }
409                }
410            }
411            _ => {
412                panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
413            }
414        }
415    }
416}
417
418impl PerValueCompressor for VariableEncoder {
419    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
420        let DataBlock::VariableWidth(variable) = data else {
421            panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
422        };
423
424        let encoding = ProtobufUtils21::variable(
425            ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
426            None,
427        );
428        Ok((PerValueDataBlock::Variable(variable), encoding))
429    }
430}
431
432#[derive(Debug, Default)]
433pub struct VariableDecoder {}
434
435impl VariablePerValueDecompressor for VariableDecoder {
436    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
437        Ok(DataBlock::VariableWidth(data))
438    }
439}
440
441#[derive(Debug, Default)]
442pub struct BinaryBlockDecompressor {}
443
444impl BlockDecompressor for BinaryBlockDecompressor {
445    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
446        // In older (not quite stable) versions we stored the bits per offset as a single byte and then the num_values
447        // as four bytes.  However, this led to alignment problems and was wasteful since we already store the num_values
448        // in higher layers.
449        //
450        // In the standard scheme we use 4 bytes for the bits per offset and 4 bytes for the bytes_start_offset and we
451        // rely on the passed in num_values to be correct.
452
453        // This isn't perfect but it's probably good enough and the best I think we can do.  The bits per offset will
454        // never be more than 255 and it's little endian so the last 3 bytes will always be 0.  These will be the least
455        // significant 3 bytes of the number of values in the old scheme.  It's pretty unlikely these are all 0 (that would
456        // mean there are at least 16M values in a single page) so we'll use this to determine if the old scheme is used.
457        let is_old_scheme = data[1] != 0 || data[2] != 0 || data[3] != 0;
458
459        let (bits_per_offset, bytes_start_offset, offset_start) = if is_old_scheme {
460            // Old scheme
461            let bits_per_offset = data[0];
462            match bits_per_offset {
463                32 => {
464                    debug_assert_eq!(LittleEndian::read_u32(&data[1..5]), num_values as u32);
465                    let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
466                    (bits_per_offset, bytes_start_offset as u64, 9)
467                }
468                64 => {
469                    debug_assert_eq!(LittleEndian::read_u64(&data[1..9]), num_values);
470                    let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
471                    (bits_per_offset, bytes_start_offset, 17)
472                }
473                _ => {
474                    return Err(Error::InvalidInput {
475                        source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
476                        location: location!(),
477                    });
478                }
479            }
480        } else {
481            // Standard scheme
482            let bits_per_offset = LittleEndian::read_u32(&data[0..4]) as u8;
483            match bits_per_offset {
484                32 => {
485                    let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
486                    (bits_per_offset, bytes_start_offset as u64, 8)
487                }
488                64 => {
489                    let bytes_start_offset = LittleEndian::read_u64(&data[8..16]);
490                    (bits_per_offset, bytes_start_offset, 16)
491                }
492                _ => {
493                    return Err(Error::InvalidInput {
494                        source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
495                        location: location!(),
496                    });
497                }
498            }
499        };
500
501        // the next `bytes_start_offset - offset_start` stores the offsets.
502        let offsets =
503            data.slice_with_length(offset_start, bytes_start_offset as usize - offset_start);
504
505        // the rest are the binary bytes.
506        let data = data.slice_with_length(
507            bytes_start_offset as usize,
508            data.len() - bytes_start_offset as usize,
509        );
510
511        Ok(DataBlock::VariableWidth(VariableWidthBlock {
512            data,
513            offsets,
514            bits_per_offset,
515            num_values,
516            block_info: BlockInfo::new(),
517        }))
518    }
519}
520
521#[cfg(test)]
522pub mod tests {
523    use arrow_array::{
524        builder::{LargeStringBuilder, StringBuilder},
525        ArrayRef, StringArray,
526    };
527    use arrow_schema::{DataType, Field};
528
529    use crate::{
530        constants::{
531            COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
532            STRUCTURAL_ENCODING_MINIBLOCK,
533        },
534        testing::check_specific_random,
535    };
536    use rstest::rstest;
537    use std::{collections::HashMap, sync::Arc, vec};
538
539    use crate::{
540        testing::{
541            check_basic_random, check_round_trip_encoding_of_data, FnArrayGeneratorProvider,
542            TestCases,
543        },
544        version::LanceFileVersion,
545    };
546
547    #[test_log::test(tokio::test)]
548    async fn test_utf8_binary() {
549        let field = Field::new("", DataType::Utf8, false);
550        check_specific_random(
551            field,
552            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
553        )
554        .await;
555    }
556
557    #[rstest]
558    #[test_log::test(tokio::test)]
559    async fn test_binary(
560        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
561        structural_encoding: &str,
562        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
563    ) {
564        let mut field_metadata = HashMap::new();
565        field_metadata.insert(
566            STRUCTURAL_ENCODING_META_KEY.to_string(),
567            structural_encoding.into(),
568        );
569
570        let field = Field::new("", data_type, false).with_metadata(field_metadata);
571        check_basic_random(field).await;
572    }
573
574    #[rstest]
575    #[test_log::test(tokio::test)]
576    async fn test_binary_fsst(
577        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
578        structural_encoding: &str,
579        #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
580    ) {
581        let mut field_metadata = HashMap::new();
582        field_metadata.insert(
583            STRUCTURAL_ENCODING_META_KEY.to_string(),
584            structural_encoding.into(),
585        );
586        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
587        let field = Field::new("", data_type, true).with_metadata(field_metadata);
588        // TODO (https://github.com/lance-format/lance/issues/4783)
589        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
590        check_specific_random(field, test_cases).await;
591    }
592
593    #[rstest]
594    #[test_log::test(tokio::test)]
595    async fn test_fsst_large_binary(
596        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
597        structural_encoding: &str,
598        #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
599    ) {
600        let mut field_metadata = HashMap::new();
601        field_metadata.insert(
602            STRUCTURAL_ENCODING_META_KEY.to_string(),
603            structural_encoding.into(),
604        );
605        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
606        let field = Field::new("", data_type, true).with_metadata(field_metadata);
607        check_specific_random(
608            field,
609            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
610        )
611        .await;
612    }
613
614    #[test_log::test(tokio::test)]
615    async fn test_large_binary() {
616        let field = Field::new("", DataType::LargeBinary, true);
617        check_basic_random(field).await;
618    }
619
620    #[test_log::test(tokio::test)]
621    async fn test_large_utf8() {
622        let field = Field::new("", DataType::LargeUtf8, true);
623        check_basic_random(field).await;
624    }
625
626    #[rstest]
627    #[test_log::test(tokio::test)]
628    async fn test_small_strings(
629        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
630        structural_encoding: &str,
631    ) {
632        use crate::testing::check_basic_generated;
633
634        let mut field_metadata = HashMap::new();
635        field_metadata.insert(
636            STRUCTURAL_ENCODING_META_KEY.to_string(),
637            structural_encoding.into(),
638        );
639        let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
640        check_basic_generated(
641            field,
642            Box::new(FnArrayGeneratorProvider::new(move || {
643                lance_datagen::array::utf8_prefix_plus_counter("user_", /*is_large=*/ false)
644            })),
645        )
646        .await;
647    }
648
649    #[rstest]
650    #[test_log::test(tokio::test)]
651    async fn test_simple_binary(
652        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
653        structural_encoding: &str,
654        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
655    ) {
656        let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
657        let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
658
659        let mut field_metadata = HashMap::new();
660        field_metadata.insert(
661            STRUCTURAL_ENCODING_META_KEY.to_string(),
662            structural_encoding.into(),
663        );
664
665        let test_cases = TestCases::default()
666            .with_range(0..2)
667            .with_range(0..3)
668            .with_range(1..3)
669            .with_indices(vec![0, 1, 3, 4]);
670        check_round_trip_encoding_of_data(
671            vec![Arc::new(string_array)],
672            &test_cases,
673            field_metadata,
674        )
675        .await;
676    }
677
678    #[test_log::test(tokio::test)]
679    async fn test_sliced_utf8() {
680        let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
681        let string_array = string_array.slice(1, 3);
682
683        let test_cases = TestCases::default()
684            .with_range(0..1)
685            .with_range(0..2)
686            .with_range(1..2);
687        check_round_trip_encoding_of_data(
688            vec![Arc::new(string_array)],
689            &test_cases,
690            HashMap::new(),
691        )
692        .await;
693    }
694
695    #[test_log::test(tokio::test)]
696    async fn test_bigger_than_max_page_size() {
697        // Create an array with one single 32MiB string
698        let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
699        let string_array = StringArray::from(vec![
700            Some(big_string),
701            Some("abc".to_string()),
702            None,
703            None,
704            Some("xyz".to_string()),
705        ]);
706
707        // Drop the max page size to 1MiB
708        let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
709
710        check_round_trip_encoding_of_data(
711            vec![Arc::new(string_array)],
712            &test_cases,
713            HashMap::new(),
714        )
715        .await;
716
717        // This is a regression testing the case where a page with X rows is split into Y parts
718        // where the number of parts is not evenly divisible by the number of rows.  In this
719        // case we are splitting 90 rows into 4 parts.
720        let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
721        let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
722
723        check_round_trip_encoding_of_data(
724            vec![Arc::new(string_array)],
725            &TestCases::default(),
726            HashMap::new(),
727        )
728        .await;
729    }
730
731    #[test_log::test(tokio::test)]
732    async fn test_empty_strings() {
733        // Scenario 1: Some strings are empty
734
735        let values = [Some("abc"), Some(""), None];
736        // Test empty list at beginning, middle, and end
737        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
738            let mut string_builder = StringBuilder::new();
739            for idx in order {
740                string_builder.append_option(values[idx]);
741            }
742            let string_array = Arc::new(string_builder.finish());
743            let test_cases = TestCases::default()
744                .with_indices(vec![1])
745                .with_indices(vec![0])
746                .with_indices(vec![2])
747                .with_indices(vec![0, 1]);
748            check_round_trip_encoding_of_data(
749                vec![string_array.clone()],
750                &test_cases,
751                HashMap::new(),
752            )
753            .await;
754            let test_cases = test_cases.with_batch_size(1);
755            check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
756                .await;
757        }
758
759        // Scenario 2: All strings are empty
760
761        // When encoding an array of empty strings there are no bytes to encode
762        // which is strange and we want to ensure we handle it
763        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
764
765        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
766        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
767            .await;
768        let test_cases = test_cases.with_batch_size(1);
769        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
770    }
771
772    #[test_log::test(tokio::test)]
773    #[ignore] // This test is quite slow in debug mode
774    async fn test_jumbo_string() {
775        // This is an overflow test.  We have a list of lists where each list
776        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
777        // offsets range
778        let mut string_builder = LargeStringBuilder::new();
779        // a 1 MiB string
780        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
781        for _ in 0..5000 {
782            string_builder.append_option(Some(&giant_string));
783        }
784        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
785        let arrs = vec![giant_array];
786
787        // // We can't validate because our validation relies on concatenating all input arrays
788        let test_cases = TestCases::default().without_validation();
789        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
790    }
791
792    #[rstest]
793    #[test_log::test(tokio::test)]
794    async fn test_binary_dictionary_encoding(
795        #[values(true, false)] with_nulls: bool,
796        #[values(100, 500, 35000)] dict_size: u32,
797    ) {
798        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
799        let strings = (0..dict_size)
800            .map(|i| i.to_string())
801            .collect::<Vec<String>>();
802
803        let repeated_strings: Vec<_> = strings
804            .iter()
805            .cycle()
806            .take(70000)
807            .enumerate()
808            .map(|(i, s)| {
809                if with_nulls && i % 7 == 0 {
810                    None
811                } else {
812                    Some(s.clone())
813                }
814            })
815            .collect();
816        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
817        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
818    }
819
820    #[test_log::test(tokio::test)]
821    async fn test_binary_encoding_verification() {
822        use lance_datagen::{ByteCount, RowCount};
823
824        let test_cases = TestCases::default()
825            .with_expected_encoding("variable")
826            .with_min_file_version(LanceFileVersion::V2_1);
827
828        // Test both automatic selection and explicit configuration
829        // 1. Test automatic binary encoding selection (small strings that won't trigger FSST)
830        let arr_small = lance_datagen::gen_batch()
831            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
832            .into_batch_rows(RowCount::from(1000))
833            .unwrap()
834            .column(0)
835            .clone();
836        check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
837
838        // 2. Test explicit "none" compression to force binary encoding
839        let metadata_explicit =
840            HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
841        let arr_large = lance_datagen::gen_batch()
842            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
843            .into_batch_rows(RowCount::from(2000))
844            .unwrap()
845            .column(0)
846            .clone();
847        check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
848    }
849
850    #[test]
851    fn test_binary_miniblock_with_misaligned_buffer() {
852        use super::BinaryMiniBlockDecompressor;
853        use crate::buffer::LanceBuffer;
854        use crate::compression::MiniBlockDecompressor;
855        use crate::data::DataBlock;
856
857        // Test case 1: u32 offsets
858        {
859            let decompressor = BinaryMiniBlockDecompressor {
860                bits_per_offset: 32,
861            };
862
863            // Create test data with u32 offsets
864            // BinaryMiniBlock format: all offsets followed by all string data
865            // Need to ensure total size is divisible by 4 for u32
866            let mut test_data = Vec::new();
867
868            // Offsets section (3 offsets for 2 values + 1 end offset)
869            test_data.extend_from_slice(&12u32.to_le_bytes()); // offset to start of strings (after offsets)
870            test_data.extend_from_slice(&15u32.to_le_bytes()); // offset to second string
871            test_data.extend_from_slice(&20u32.to_le_bytes()); // offset to end
872
873            // String data section
874            test_data.extend_from_slice(b"ABCXYZ"); // 6 bytes of string data
875            test_data.extend_from_slice(&[0, 0]); // 2 bytes padding to make total 20 bytes (divisible by 4)
876
877            // Create a misaligned buffer by adding padding and slicing
878            let mut padded = Vec::with_capacity(test_data.len() + 1);
879            padded.push(0xFF); // Padding byte to misalign
880            padded.extend_from_slice(&test_data);
881
882            let bytes = bytes::Bytes::from(padded);
883            let misaligned = bytes.slice(1..); // Skip first byte to create misalignment
884
885            // Create LanceBuffer with bytes_per_value=1 to bypass alignment check
886            let buffer = LanceBuffer::from_bytes(misaligned, 1);
887
888            // Verify the buffer is actually misaligned
889            let ptr = buffer.as_ref().as_ptr();
890            assert_ne!(
891                ptr.align_offset(4),
892                0,
893                "Test setup: buffer should be misaligned for u32"
894            );
895
896            // Decompress with misaligned buffer - should work with borrow_to_typed_slice
897            let result = decompressor.decompress(vec![buffer], 2);
898            assert!(
899                result.is_ok(),
900                "Decompression should succeed with misaligned buffer"
901            );
902
903            // Verify the data is correct
904            if let Ok(DataBlock::VariableWidth(block)) = result {
905                assert_eq!(block.num_values, 2);
906                // Data should be the strings (including padding from the original buffer)
907                assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
908            } else {
909                panic!("Expected VariableWidth block");
910            }
911        }
912
913        // Test case 2: u64 offsets
914        {
915            let decompressor = BinaryMiniBlockDecompressor {
916                bits_per_offset: 64,
917            };
918
919            // Create test data with u64 offsets
920            let mut test_data = Vec::new();
921
922            // Offsets section (3 offsets for 2 values + 1 end offset)
923            test_data.extend_from_slice(&24u64.to_le_bytes()); // offset to start of strings (after offsets)
924            test_data.extend_from_slice(&29u64.to_le_bytes()); // offset to second string
925            test_data.extend_from_slice(&40u64.to_le_bytes()); // offset to end (divisible by 8)
926
927            // String data section
928            test_data.extend_from_slice(b"HelloWorld"); // 10 bytes of string data
929            test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); // 6 bytes padding to make total 40 bytes (divisible by 8)
930
931            // Create misaligned buffer
932            let mut padded = Vec::with_capacity(test_data.len() + 3);
933            padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); // 3 bytes padding for misalignment
934            padded.extend_from_slice(&test_data);
935
936            let bytes = bytes::Bytes::from(padded);
937            let misaligned = bytes.slice(3..); // Skip 3 bytes
938
939            let buffer = LanceBuffer::from_bytes(misaligned, 1);
940
941            // Verify misalignment for u64
942            let ptr = buffer.as_ref().as_ptr();
943            assert_ne!(
944                ptr.align_offset(8),
945                0,
946                "Test setup: buffer should be misaligned for u64"
947            );
948
949            // Decompress should succeed
950            let result = decompressor.decompress(vec![buffer], 2);
951            assert!(
952                result.is_ok(),
953                "Decompression should succeed with misaligned u64 buffer"
954            );
955
956            if let Ok(DataBlock::VariableWidth(block)) = result {
957                assert_eq!(block.num_values, 2);
958                // Data should be the strings (including padding from the original buffer)
959                assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
960            } else {
961                panic!("Expected VariableWidth block");
962            }
963        }
964    }
965}