lance_encoding/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Basic encodings for variable width data
5//!
6//! These are not compression but represent the "leaf" encodings for variable length data
7//! where we simply match the data with the rules of the structural encoding.
8//!
9//! These encodings are transparent since we aren't actually doing any compression.  No information
10//! is needed in the encoding description.
11
12use arrow_array::OffsetSizeTrait;
13use bytemuck::{cast_slice, try_cast_slice};
14use byteorder::{ByteOrder, LittleEndian};
15use core::panic;
16use snafu::location;
17
18use crate::compression::{
19    BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
20};
21
22use crate::buffer::LanceBuffer;
23use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
24use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
25use crate::encodings::logical::primitive::miniblock::{
26    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
27};
28use crate::format::{pb, ProtobufUtils};
29
30use lance_core::utils::bit::pad_bytes_to;
31use lance_core::{Error, Result};
32
33#[derive(Debug, Default)]
34pub struct BinaryMiniBlockEncoder {}
35
36const AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
37
38// Make it to support both u32 and u64
39fn chunk_offsets<N: OffsetSizeTrait>(
40    offsets: &[N],
41    data: &[u8],
42    alignment: usize,
43) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
44    #[derive(Debug)]
45    struct ChunkInfo {
46        chunk_start_offset_in_orig_idx: usize,
47        chunk_last_offset_in_orig_idx: usize,
48        // the bytes in every chunk starts at `chunk.bytes_start_offset`
49        bytes_start_offset: usize,
50        // every chunk is padded to 8 bytes.
51        // we need to interpret every chunk as &[u32] so we need it to padded at least to 4 bytes,
52        // this field can actually be eliminated and I can use `num_bytes` in `MiniBlockChunk` to compute
53        // the `output_total_bytes`.
54        padded_chunk_size: usize,
55    }
56
57    let byte_width: usize = N::get_byte_width();
58    let mut chunks_info = vec![];
59    let mut chunks = vec![];
60    let mut last_offset_in_orig_idx = 0;
61    loop {
62        let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx);
63
64        let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
65        let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
66        let this_chunk_size =
67            (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
68
69        let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
70
71        let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
72        chunks_info.push(ChunkInfo {
73            chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
74            chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
75            bytes_start_offset: this_chunk_bytes_start_offset,
76            padded_chunk_size,
77        });
78        chunks.push(MiniBlockChunk {
79            log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
80                0
81            } else {
82                num_values_in_this_chunk.trailing_zeros() as u8
83            },
84            buffer_sizes: vec![padded_chunk_size as u16],
85        });
86        if this_last_offset_in_orig_idx == offsets.len() - 1 {
87            break;
88        }
89        last_offset_in_orig_idx = this_last_offset_in_orig_idx;
90    }
91
92    let output_total_bytes = chunks_info
93        .iter()
94        .map(|chunk_info| chunk_info.padded_chunk_size)
95        .sum::<usize>();
96
97    let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
98
99    for chunk in chunks_info {
100        let this_chunk_offsets: Vec<N> = offsets
101            [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
102            .iter()
103            .map(|offset| {
104                *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
105                    + N::from_usize(chunk.bytes_start_offset).unwrap()
106            })
107            .collect();
108
109        let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
110        output.extend_from_slice(&this_chunk_offsets);
111
112        let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
113            .to_usize()
114            .unwrap();
115        let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
116            .to_usize()
117            .unwrap();
118        output.extend_from_slice(&data[start_in_orig..end_in_orig]);
119
120        // pad this chunk to make it align to desired bytes.
121        const PAD_BYTE: u8 = 72;
122        let pad_len = pad_bytes_to(output.len(), alignment);
123
124        // Compare with usize literal to avoid type mismatch with N
125        if pad_len > 0_usize {
126            output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
127        }
128    }
129    (vec![LanceBuffer::reinterpret_vec(output)], chunks)
130}
131
132// search for the next offset index to cut the values into a chunk.
133// this function incrementally peek the number of values in a chunk,
134// each time multiplies the number of values by 2.
135// It returns the offset_idx in `offsets` that belongs to this chunk.
136fn search_next_offset_idx<N: OffsetSizeTrait>(offsets: &[N], last_offset_idx: usize) -> usize {
137    let mut num_values = 1;
138    let mut new_num_values = num_values * 2;
139    loop {
140        if last_offset_idx + new_num_values >= offsets.len() {
141            let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
142            // existing bytes plus the new offset size
143            let new_size = existing_bytes
144                + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
145            if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
146                // case 1: can fit the rest of all data into a miniblock
147                return offsets.len() - 1;
148            } else {
149                // case 2: can only fit the last tried `num_values` into a miniblock
150                return last_offset_idx + num_values;
151            }
152        }
153        let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
154        let new_size =
155            existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
156        if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
157            num_values = new_num_values;
158            new_num_values *= 2;
159        } else {
160            break;
161        }
162    }
163    last_offset_idx + new_num_values
164}
165
166impl BinaryMiniBlockEncoder {
167    // put binary data into chunks, every chunk is less than or equal to `AIM_MINICHUNK_SIZE`.
168    // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes.
169    // the offsets in the chunk points to the bytes offset in this chunk.
170    fn chunk_data(
171        &self,
172        mut data: VariableWidthBlock,
173    ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
174        match data.bits_per_offset {
175            32 => {
176                let offsets = data.offsets.borrow_to_typed_slice::<i32>();
177                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4);
178                (
179                    MiniBlockCompressed {
180                        data: buffers,
181                        chunks,
182                        num_values: data.num_values,
183                    },
184                    ProtobufUtils::variable(32),
185                )
186            }
187            64 => {
188                let offsets = data.offsets.borrow_to_typed_slice::<i64>();
189                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8);
190                (
191                    MiniBlockCompressed {
192                        data: buffers,
193                        chunks,
194                        num_values: data.num_values,
195                    },
196                    ProtobufUtils::variable(64),
197                )
198            }
199            _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
200        }
201    }
202}
203
204impl MiniBlockCompressor for BinaryMiniBlockEncoder {
205    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
206        match data {
207            DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
208            _ => Err(Error::InvalidInput {
209                source: format!(
210                    "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
211                    data.name()
212                )
213                .into(),
214                location: location!(),
215            }),
216        }
217    }
218}
219
220#[derive(Debug)]
221pub struct BinaryMiniBlockDecompressor {
222    bits_per_offset: u8,
223}
224
225impl BinaryMiniBlockDecompressor {
226    pub fn new(bits_per_offset: u8) -> Self {
227        assert!(bits_per_offset == 32 || bits_per_offset == 64);
228        Self { bits_per_offset }
229    }
230
231    pub fn from_variable(variable: &pb::Variable) -> Self {
232        Self {
233            bits_per_offset: variable.bits_per_offset as u8,
234        }
235    }
236}
237
238impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
239    // decompress a MiniBlock of binary data, the num_values must be less than or equal
240    // to the number of values this MiniBlock has, BinaryMiniBlock doesn't store `the number of values`
241    // it has so assertion can not be done here and the caller of `decompress` must ensure
242    // `num_values` <= number of values in the chunk.
243    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
244        assert_eq!(data.len(), 1);
245        let data = data.into_iter().next().unwrap();
246
247        if self.bits_per_offset == 64 {
248            // offset and at least one value
249            assert!(data.len() >= 16);
250
251            let offsets: &[u64] = try_cast_slice(&data)
252                .expect("casting buffer failed during BinaryMiniBlock decompression");
253
254            let result_offsets = offsets[0..(num_values + 1) as usize]
255                .iter()
256                .map(|offset| offset - offsets[0])
257                .collect::<Vec<u64>>();
258
259            Ok(DataBlock::VariableWidth(VariableWidthBlock {
260                data: LanceBuffer::Owned(
261                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
262                ),
263                offsets: LanceBuffer::reinterpret_vec(result_offsets),
264                bits_per_offset: 64,
265                num_values,
266                block_info: BlockInfo::new(),
267            }))
268        } else {
269            // offset and at least one value
270            assert!(data.len() >= 8);
271
272            let offsets: &[u32] = try_cast_slice(&data)
273                .expect("casting buffer failed during BinaryMiniBlock decompression");
274
275            let result_offsets = offsets[0..(num_values + 1) as usize]
276                .iter()
277                .map(|offset| offset - offsets[0])
278                .collect::<Vec<u32>>();
279
280            Ok(DataBlock::VariableWidth(VariableWidthBlock {
281                data: LanceBuffer::Owned(
282                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
283                ),
284                offsets: LanceBuffer::reinterpret_vec(result_offsets),
285                bits_per_offset: 32,
286                num_values,
287                block_info: BlockInfo::new(),
288            }))
289        }
290    }
291}
292
293/// Most basic encoding for variable-width data which does no compression at all
294/// The DataBlock memory layout looks like below:
295/// ----------------------------------------------------------------------------------------
296/// | bits_per_offset | number of values  | bytes_start_offset | offsets data | bytes data
297/// ----------------------------------------------------------------------------------------
298/// |       1 byte    |<bits_per_offset>/8|<bits_per_offset>/8 |  offsets_len | dat_len
299/// ----------------------------------------------------------------------------------------
300/// It's used in VariableEncoder and BinaryBlockDecompressor
301///
302#[derive(Debug, Default)]
303pub struct VariableEncoder {}
304
305impl BlockCompressor for VariableEncoder {
306    fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
307        match data {
308            DataBlock::VariableWidth(ref mut variable_width_data) => {
309                let num_values: u64 = variable_width_data.num_values;
310                match variable_width_data.bits_per_offset {
311                    32 => {
312                        let num_values: u32 = num_values
313            .try_into()
314            .expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
315
316                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
317                        let offsets = offsets.as_ref();
318                        // the first bit stores the bits_per_offset, the next 4 bytes store the number of values,
319                        // then 4 bytes for bytes_start_offset,
320                        // then offsets data, then bytes data.
321                        let bytes_start_offset = 1 + 4 + 4 + std::mem::size_of_val(offsets) as u32;
322
323                        let output_total_bytes =
324                            bytes_start_offset as usize + variable_width_data.data.len();
325                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
326
327                        // Store bit_per_offset info
328                        output.push(32_u8);
329
330                        // store `num_values` in the first 4 bytes of output buffer
331                        output.extend_from_slice(&(num_values).to_le_bytes());
332
333                        // store `bytes_start_offset` in the next 4 bytes of output buffer
334                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
335
336                        // store offsets
337                        output.extend_from_slice(cast_slice(offsets));
338
339                        // store bytes
340                        output.extend_from_slice(&variable_width_data.data);
341                        Ok(LanceBuffer::Owned(output))
342                    }
343                    64 => {
344                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
345                        let offsets = offsets.as_ref();
346                        // the first bit stores the bits_per_offset, the next 8 bytes store the number of values,
347                        // then 8 bytes for bytes_start_offset,
348                        // then offsets data, then bytes data.
349
350                        let bytes_start_offset = 1 + 8 + 8 + std::mem::size_of_val(offsets) as u64;
351
352                        let output_total_bytes =
353                            bytes_start_offset as usize + variable_width_data.data.len();
354                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
355
356                        // Store bit_per_offset info
357                        output.push(64_u8);
358
359                        // store `num_values` in the first 8 bytes of output buffer
360                        output.extend_from_slice(&(num_values).to_le_bytes());
361
362                        // store `bytes_start_offset` in the next 8 bytes of output buffer
363                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
364
365                        // store offsets
366                        output.extend_from_slice(cast_slice(offsets));
367
368                        // store bytes
369                        output.extend_from_slice(&variable_width_data.data);
370                        Ok(LanceBuffer::Owned(output))
371                    }
372                    _ => {
373                        panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
374                variable_width_data.bits_per_offset);
375                    }
376                }
377            }
378            _ => {
379                panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
380            }
381        }
382    }
383}
384
385impl PerValueCompressor for VariableEncoder {
386    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
387        let DataBlock::VariableWidth(variable) = data else {
388            panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
389        };
390
391        let encoding = ProtobufUtils::variable(variable.bits_per_offset);
392        Ok((PerValueDataBlock::Variable(variable), encoding))
393    }
394}
395
396#[derive(Debug, Default)]
397pub struct VariableDecoder {}
398
399impl VariablePerValueDecompressor for VariableDecoder {
400    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
401        Ok(DataBlock::VariableWidth(data))
402    }
403}
404
405#[derive(Debug, Default)]
406pub struct BinaryBlockDecompressor {}
407
408impl BlockDecompressor for BinaryBlockDecompressor {
409    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
410        // The first byte contains bytes_per_offset info
411        let bits_per_offset = data[0];
412        match bits_per_offset {
413            32 => {
414                // the next 4 bytes in the BinaryBlock compressed buffer stores the num_values this block has.
415                let stored_num_values = LittleEndian::read_u32(&data[1..5]);
416                debug_assert_eq!(num_values, stored_num_values as u64);
417
418                // the next 4 bytes in the BinaryBlock compressed buffer stores the bytes_start_offset.
419                let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
420
421                // the next `bytes_start_offset - 9` stores the offsets.
422                let offsets = data.slice_with_length(9, bytes_start_offset as usize - 9);
423
424                // the rest are the binary bytes.
425                let data = data.slice_with_length(
426                    bytes_start_offset as usize,
427                    data.len() - bytes_start_offset as usize,
428                );
429
430                Ok(DataBlock::VariableWidth(VariableWidthBlock {
431                    data,
432                    offsets,
433                    bits_per_offset: 32,
434                    num_values,
435                    block_info: BlockInfo::new(),
436                }))
437            }
438            64 => {
439                // the next 8 bytes in the BinaryBlock compressed buffer stores the num_values this block has.
440                let stored_num_values = LittleEndian::read_u64(&data[1..9]);
441                debug_assert_eq!(num_values, stored_num_values);
442
443                // the next 8 bytes in the BinaryBlock compressed buffer stores the bytes_start_offset.
444                let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
445
446                // the next `bytes_start_offset - 17` stores the offsets.
447                let offsets = data.slice_with_length(17, bytes_start_offset as usize - 17);
448
449                // the rest are the binary bytes.
450                let data = data.slice_with_length(
451                    bytes_start_offset as usize,
452                    data.len() - bytes_start_offset as usize,
453                );
454
455                Ok(DataBlock::VariableWidth(VariableWidthBlock {
456                    data,
457                    offsets,
458                    bits_per_offset: 64,
459                    num_values,
460                    block_info: BlockInfo::new(),
461                }))
462            }
463            _ => panic!("Unsupported bits_per_offset={}", bits_per_offset),
464        }
465    }
466}
467
468#[cfg(test)]
469pub mod tests {
470    use arrow_array::{
471        builder::{LargeStringBuilder, StringBuilder},
472        ArrayRef, StringArray,
473    };
474    use arrow_schema::{DataType, Field};
475
476    use lance_core::datatypes::{
477        COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
478        STRUCTURAL_ENCODING_MINIBLOCK,
479    };
480    use rstest::rstest;
481    use std::{collections::HashMap, sync::Arc, vec};
482
483    use crate::{
484        testing::{
485            check_round_trip_encoding_generated, check_round_trip_encoding_of_data,
486            check_round_trip_encoding_random, FnArrayGeneratorProvider, TestCases,
487        },
488        version::LanceFileVersion,
489    };
490
491    #[rstest]
492    #[test_log::test(tokio::test)]
493    async fn test_utf8_binary(
494        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
495    ) {
496        let field = Field::new("", DataType::Utf8, false);
497        check_round_trip_encoding_random(field, version).await;
498    }
499
500    #[rstest]
501    #[test_log::test(tokio::test)]
502    async fn test_binary(
503        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
504        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
505        structural_encoding: &str,
506        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
507    ) {
508        use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;
509
510        let mut field_metadata = HashMap::new();
511        field_metadata.insert(
512            STRUCTURAL_ENCODING_META_KEY.to_string(),
513            structural_encoding.into(),
514        );
515
516        let field = Field::new("", data_type, false).with_metadata(field_metadata);
517        check_round_trip_encoding_random(field, version).await;
518    }
519
520    #[rstest]
521    #[test_log::test(tokio::test)]
522    async fn test_binary_fsst(
523        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
524        structural_encoding: &str,
525        #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
526    ) {
527        let mut field_metadata = HashMap::new();
528        field_metadata.insert(
529            STRUCTURAL_ENCODING_META_KEY.to_string(),
530            structural_encoding.into(),
531        );
532        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
533        let field = Field::new("", data_type, true).with_metadata(field_metadata);
534        check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
535    }
536
537    #[rstest]
538    #[test_log::test(tokio::test)]
539    async fn test_large_binary_fsst(
540        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
541        structural_encoding: &str,
542        #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
543    ) {
544        let mut field_metadata = HashMap::new();
545        field_metadata.insert(
546            STRUCTURAL_ENCODING_META_KEY.to_string(),
547            structural_encoding.into(),
548        );
549        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
550        let field = Field::new("", data_type, true).with_metadata(field_metadata);
551        check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
552    }
553
554    #[rstest]
555    #[test_log::test(tokio::test)]
556    async fn test_large_binary_fsst_with_dict(
557        #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
558    ) {
559        use lance_core::datatypes::DICT_DIVISOR_META_KEY;
560
561        let mut field_metadata = HashMap::new();
562        field_metadata.insert(
563            STRUCTURAL_ENCODING_META_KEY.to_string(),
564            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
565        );
566        // Force it to use dictionary encoding
567        field_metadata.insert(DICT_DIVISOR_META_KEY.to_string(), "1".into());
568        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
569        let field = Field::new("", data_type, true).with_metadata(field_metadata);
570        check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
571    }
572
573    #[rstest]
574    #[test_log::test(tokio::test)]
575    async fn test_large_binary(
576        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
577    ) {
578        let field = Field::new("", DataType::LargeBinary, true);
579        check_round_trip_encoding_random(field, version).await;
580    }
581
582    #[test_log::test(tokio::test)]
583    async fn test_large_utf8() {
584        let field = Field::new("", DataType::LargeUtf8, true);
585        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
586    }
587
588    #[rstest]
589    #[test_log::test(tokio::test)]
590    async fn test_small_strings(
591        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
592        structural_encoding: &str,
593    ) {
594        let mut field_metadata = HashMap::new();
595        field_metadata.insert(
596            STRUCTURAL_ENCODING_META_KEY.to_string(),
597            structural_encoding.into(),
598        );
599        let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
600        check_round_trip_encoding_generated(
601            field,
602            Box::new(FnArrayGeneratorProvider::new(move || {
603                lance_datagen::array::utf8_prefix_plus_counter("user_", /*is_large=*/ false)
604            })),
605            LanceFileVersion::V2_1,
606        )
607        .await;
608    }
609
610    #[rstest]
611    #[test_log::test(tokio::test)]
612    async fn test_simple_binary(
613        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
614        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
615        structural_encoding: &str,
616        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
617    ) {
618        use lance_core::datatypes::STRUCTURAL_ENCODING_META_KEY;
619
620        let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
621        let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
622
623        let mut field_metadata = HashMap::new();
624        field_metadata.insert(
625            STRUCTURAL_ENCODING_META_KEY.to_string(),
626            structural_encoding.into(),
627        );
628
629        let test_cases = TestCases::default()
630            .with_range(0..2)
631            .with_range(0..3)
632            .with_range(1..3)
633            .with_indices(vec![0, 1, 3, 4])
634            .with_file_version(version);
635        check_round_trip_encoding_of_data(
636            vec![Arc::new(string_array)],
637            &test_cases,
638            field_metadata,
639        )
640        .await;
641    }
642
643    #[rstest]
644    #[test_log::test(tokio::test)]
645    async fn test_sliced_utf8(
646        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
647    ) {
648        let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
649        let string_array = string_array.slice(1, 3);
650
651        let test_cases = TestCases::default()
652            .with_range(0..1)
653            .with_range(0..2)
654            .with_range(1..2)
655            .with_file_version(version);
656        check_round_trip_encoding_of_data(
657            vec![Arc::new(string_array)],
658            &test_cases,
659            HashMap::new(),
660        )
661        .await;
662    }
663
664    #[test_log::test(tokio::test)]
665    async fn test_bigger_than_max_page_size() {
666        // Create an array with one single 32MiB string
667        let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
668        let string_array = StringArray::from(vec![
669            Some(big_string),
670            Some("abc".to_string()),
671            None,
672            None,
673            Some("xyz".to_string()),
674        ]);
675
676        // Drop the max page size to 1MiB
677        let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
678
679        check_round_trip_encoding_of_data(
680            vec![Arc::new(string_array)],
681            &test_cases,
682            HashMap::new(),
683        )
684        .await;
685
686        // This is a regression testing the case where a page with X rows is split into Y parts
687        // where the number of parts is not evenly divisible by the number of rows.  In this
688        // case we are splitting 90 rows into 4 parts.
689        let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
690        let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
691
692        check_round_trip_encoding_of_data(
693            vec![Arc::new(string_array)],
694            &TestCases::default(),
695            HashMap::new(),
696        )
697        .await;
698    }
699
700    #[rstest]
701    #[test_log::test(tokio::test)]
702    async fn test_empty_strings(
703        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
704    ) {
705        // Scenario 1: Some strings are empty
706
707        let values = [Some("abc"), Some(""), None];
708        // Test empty list at beginning, middle, and end
709        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
710            let mut string_builder = StringBuilder::new();
711            for idx in order {
712                string_builder.append_option(values[idx]);
713            }
714            let string_array = Arc::new(string_builder.finish());
715            let test_cases = TestCases::default()
716                .with_indices(vec![1])
717                .with_indices(vec![0])
718                .with_indices(vec![2])
719                .with_indices(vec![0, 1])
720                .with_file_version(version);
721            check_round_trip_encoding_of_data(
722                vec![string_array.clone()],
723                &test_cases,
724                HashMap::new(),
725            )
726            .await;
727            let test_cases = test_cases.with_batch_size(1);
728            check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
729                .await;
730        }
731
732        // Scenario 2: All strings are empty
733
734        // When encoding an array of empty strings there are no bytes to encode
735        // which is strange and we want to ensure we handle it
736        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
737
738        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
739        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
740            .await;
741        let test_cases = test_cases.with_batch_size(1);
742        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
743    }
744
745    #[test_log::test(tokio::test)]
746    #[ignore] // This test is quite slow in debug mode
747    async fn test_jumbo_string() {
748        // This is an overflow test.  We have a list of lists where each list
749        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
750        // offsets range
751        let mut string_builder = LargeStringBuilder::new();
752        // a 1 MiB string
753        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
754        for _ in 0..5000 {
755            string_builder.append_option(Some(&giant_string));
756        }
757        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
758        let arrs = vec![giant_array];
759
760        // // We can't validate because our validation relies on concatenating all input arrays
761        let test_cases = TestCases::default().without_validation();
762        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
763    }
764
765    #[rstest]
766    #[test_log::test(tokio::test)]
767    async fn test_binary_dictionary_encoding(
768        #[values(true, false)] with_nulls: bool,
769        #[values(100, 500, 35000)] dict_size: u32,
770    ) {
771        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
772        let strings = (0..dict_size)
773            .map(|i| i.to_string())
774            .collect::<Vec<String>>();
775
776        let repeated_strings: Vec<_> = strings
777            .iter()
778            .cycle()
779            .take(70000)
780            .enumerate()
781            .map(|(i, s)| {
782                if with_nulls && i % 7 == 0 {
783                    None
784                } else {
785                    Some(s.to_string())
786                }
787            })
788            .collect();
789        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
790        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
791    }
792}