lance_encoding/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Basic encodings for variable width data
5//!
6//! These are not compression but represent the "leaf" encodings for variable length data
7//! where we simply match the data with the rules of the structural encoding.
8//!
9//! These encodings are transparent since we aren't actually doing any compression.  No information
10//! is needed in the encoding description.
11
12use arrow_array::OffsetSizeTrait;
13use bytemuck::cast_slice;
14use byteorder::{ByteOrder, LittleEndian};
15use core::panic;
16use snafu::location;
17
18use crate::compression::{
19    BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
20};
21
22use crate::buffer::LanceBuffer;
23use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
24use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
25use crate::encodings::logical::primitive::miniblock::{
26    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
27};
28use crate::format::pb21::compressive_encoding::Compression;
29use crate::format::pb21::CompressiveEncoding;
30use crate::format::{pb21, ProtobufUtils21};
31
32use lance_core::utils::bit::pad_bytes_to;
33use lance_core::{Error, Result};
34
35#[derive(Debug, Default)]
36pub struct BinaryMiniBlockEncoder {}
37
38const AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
39
40// Make it to support both u32 and u64
41fn chunk_offsets<N: OffsetSizeTrait>(
42    offsets: &[N],
43    data: &[u8],
44    alignment: usize,
45) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
46    #[derive(Debug)]
47    struct ChunkInfo {
48        chunk_start_offset_in_orig_idx: usize,
49        chunk_last_offset_in_orig_idx: usize,
50        // the bytes in every chunk starts at `chunk.bytes_start_offset`
51        bytes_start_offset: usize,
52        // every chunk is padded to 8 bytes.
53        // we need to interpret every chunk as &[u32] so we need it to padded at least to 4 bytes,
54        // this field can actually be eliminated and I can use `num_bytes` in `MiniBlockChunk` to compute
55        // the `output_total_bytes`.
56        padded_chunk_size: usize,
57    }
58
59    let byte_width: usize = N::get_byte_width();
60    let mut chunks_info = vec![];
61    let mut chunks = vec![];
62    let mut last_offset_in_orig_idx = 0;
63    loop {
64        let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx);
65
66        let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
67        let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
68        let this_chunk_size =
69            (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
70
71        let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
72        debug_assert!(padded_chunk_size > 0);
73
74        let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
75        chunks_info.push(ChunkInfo {
76            chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
77            chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
78            bytes_start_offset: this_chunk_bytes_start_offset,
79            padded_chunk_size,
80        });
81        chunks.push(MiniBlockChunk {
82            log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
83                0
84            } else {
85                num_values_in_this_chunk.trailing_zeros() as u8
86            },
87            buffer_sizes: vec![padded_chunk_size as u16],
88        });
89        if this_last_offset_in_orig_idx == offsets.len() - 1 {
90            break;
91        }
92        last_offset_in_orig_idx = this_last_offset_in_orig_idx;
93    }
94
95    let output_total_bytes = chunks_info
96        .iter()
97        .map(|chunk_info| chunk_info.padded_chunk_size)
98        .sum::<usize>();
99
100    let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
101
102    for chunk in chunks_info {
103        let this_chunk_offsets: Vec<N> = offsets
104            [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
105            .iter()
106            .map(|offset| {
107                *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
108                    + N::from_usize(chunk.bytes_start_offset).unwrap()
109            })
110            .collect();
111
112        let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
113        output.extend_from_slice(&this_chunk_offsets);
114
115        let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
116            .to_usize()
117            .unwrap();
118        let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
119            .to_usize()
120            .unwrap();
121        output.extend_from_slice(&data[start_in_orig..end_in_orig]);
122
123        // pad this chunk to make it align to desired bytes.
124        const PAD_BYTE: u8 = 72;
125        let pad_len = pad_bytes_to(output.len(), alignment);
126
127        // Compare with usize literal to avoid type mismatch with N
128        if pad_len > 0_usize {
129            output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
130        }
131    }
132    (vec![LanceBuffer::reinterpret_vec(output)], chunks)
133}
134
135// search for the next offset index to cut the values into a chunk.
136// this function incrementally peek the number of values in a chunk,
137// each time multiplies the number of values by 2.
138// It returns the offset_idx in `offsets` that belongs to this chunk.
139fn search_next_offset_idx<N: OffsetSizeTrait>(offsets: &[N], last_offset_idx: usize) -> usize {
140    let mut num_values = 1;
141    let mut new_num_values = num_values * 2;
142    loop {
143        if last_offset_idx + new_num_values >= offsets.len() {
144            let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
145            // existing bytes plus the new offset size
146            let new_size = existing_bytes
147                + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
148            if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
149                // case 1: can fit the rest of all data into a miniblock
150                return offsets.len() - 1;
151            } else {
152                // case 2: can only fit the last tried `num_values` into a miniblock
153                return last_offset_idx + num_values;
154            }
155        }
156        let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
157        let new_size =
158            existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
159        if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
160            num_values = new_num_values;
161            new_num_values *= 2;
162        } else {
163            break;
164        }
165    }
166    last_offset_idx + new_num_values
167}
168
169impl BinaryMiniBlockEncoder {
170    // put binary data into chunks, every chunk is less than or equal to `AIM_MINICHUNK_SIZE`.
171    // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes.
172    // the offsets in the chunk points to the bytes offset in this chunk.
173    fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
174        // TODO: Support compression of offsets
175        // TODO: Support general compression of data
176        match data.bits_per_offset {
177            32 => {
178                let offsets = data.offsets.borrow_to_typed_slice::<i32>();
179                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4);
180                (
181                    MiniBlockCompressed {
182                        data: buffers,
183                        chunks,
184                        num_values: data.num_values,
185                    },
186                    ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
187                )
188            }
189            64 => {
190                let offsets = data.offsets.borrow_to_typed_slice::<i64>();
191                let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8);
192                (
193                    MiniBlockCompressed {
194                        data: buffers,
195                        chunks,
196                        num_values: data.num_values,
197                    },
198                    ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
199                )
200            }
201            _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
202        }
203    }
204}
205
206impl MiniBlockCompressor for BinaryMiniBlockEncoder {
207    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
208        match data {
209            DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
210            _ => Err(Error::InvalidInput {
211                source: format!(
212                    "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
213                    data.name()
214                )
215                .into(),
216                location: location!(),
217            }),
218        }
219    }
220}
221
222#[derive(Debug)]
223pub struct BinaryMiniBlockDecompressor {
224    bits_per_offset: u8,
225}
226
227impl BinaryMiniBlockDecompressor {
228    pub fn new(bits_per_offset: u8) -> Self {
229        assert!(bits_per_offset == 32 || bits_per_offset == 64);
230        Self { bits_per_offset }
231    }
232
233    pub fn from_variable(variable: &pb21::Variable) -> Self {
234        if let Compression::Flat(flat) = variable
235            .offsets
236            .as_ref()
237            .unwrap()
238            .compression
239            .as_ref()
240            .unwrap()
241        {
242            Self {
243                bits_per_offset: flat.bits_per_value as u8,
244            }
245        } else {
246            panic!("Unsupported offsets compression: {:?}", variable.offsets);
247        }
248    }
249}
250
251impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
252    // decompress a MiniBlock of binary data, the num_values must be less than or equal
253    // to the number of values this MiniBlock has, BinaryMiniBlock doesn't store `the number of values`
254    // it has so assertion can not be done here and the caller of `decompress` must ensure
255    // `num_values` <= number of values in the chunk.
256    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
257        assert_eq!(data.len(), 1);
258        let data = data.into_iter().next().unwrap();
259
260        if self.bits_per_offset == 64 {
261            // offset and at least one value
262            assert!(data.len() >= 16);
263
264            let offsets_buffer = data.borrow_to_typed_slice::<u64>();
265            let offsets = offsets_buffer.as_ref();
266
267            let result_offsets = offsets[0..(num_values + 1) as usize]
268                .iter()
269                .map(|offset| offset - offsets[0])
270                .collect::<Vec<u64>>();
271
272            Ok(DataBlock::VariableWidth(VariableWidthBlock {
273                data: LanceBuffer::from(
274                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
275                ),
276                offsets: LanceBuffer::reinterpret_vec(result_offsets),
277                bits_per_offset: 64,
278                num_values,
279                block_info: BlockInfo::new(),
280            }))
281        } else {
282            // offset and at least one value
283            assert!(data.len() >= 8);
284
285            let offsets_buffer = data.borrow_to_typed_slice::<u32>();
286            let offsets = offsets_buffer.as_ref();
287
288            let result_offsets = offsets[0..(num_values + 1) as usize]
289                .iter()
290                .map(|offset| offset - offsets[0])
291                .collect::<Vec<u32>>();
292
293            Ok(DataBlock::VariableWidth(VariableWidthBlock {
294                data: LanceBuffer::from(
295                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
296                ),
297                offsets: LanceBuffer::reinterpret_vec(result_offsets),
298                bits_per_offset: 32,
299                num_values,
300                block_info: BlockInfo::new(),
301            }))
302        }
303    }
304}
305
306/// Most basic encoding for variable-width data which does no compression at all
307/// The DataBlock memory layout looks like below:
308/// ----------------------------------------------------------------------------------------
309/// | bits_per_offset | number of values  | bytes_start_offset | offsets data | bytes data
310/// ----------------------------------------------------------------------------------------
311/// |       1 byte    |<bits_per_offset>/8|<bits_per_offset>/8 |  offsets_len | dat_len
312/// ----------------------------------------------------------------------------------------
313/// It's used in VariableEncoder and BinaryBlockDecompressor
314///
315#[derive(Debug, Default)]
316pub struct VariableEncoder {}
317
318impl BlockCompressor for VariableEncoder {
319    fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
320        match data {
321            DataBlock::VariableWidth(ref mut variable_width_data) => {
322                let num_values: u64 = variable_width_data.num_values;
323                match variable_width_data.bits_per_offset {
324                    32 => {
325                        let num_values: u32 = num_values
326            .try_into()
327            .expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
328
329                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
330                        let offsets = offsets.as_ref();
331                        // the first bit stores the bits_per_offset, the next 4 bytes store the number of values,
332                        // then 4 bytes for bytes_start_offset,
333                        // then offsets data, then bytes data.
334                        let bytes_start_offset = 1 + 4 + 4 + std::mem::size_of_val(offsets) as u32;
335
336                        let output_total_bytes =
337                            bytes_start_offset as usize + variable_width_data.data.len();
338                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
339
340                        // Store bit_per_offset info
341                        output.push(32_u8);
342
343                        // store `num_values` in the first 4 bytes of output buffer
344                        output.extend_from_slice(&(num_values).to_le_bytes());
345
346                        // store `bytes_start_offset` in the next 4 bytes of output buffer
347                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
348
349                        // store offsets
350                        output.extend_from_slice(cast_slice(offsets));
351
352                        // store bytes
353                        output.extend_from_slice(&variable_width_data.data);
354                        Ok(LanceBuffer::from(output))
355                    }
356                    64 => {
357                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
358                        let offsets = offsets.as_ref();
359                        // the first bit stores the bits_per_offset, the next 8 bytes store the number of values,
360                        // then 8 bytes for bytes_start_offset,
361                        // then offsets data, then bytes data.
362
363                        let bytes_start_offset = 1 + 8 + 8 + std::mem::size_of_val(offsets) as u64;
364
365                        let output_total_bytes =
366                            bytes_start_offset as usize + variable_width_data.data.len();
367                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
368
369                        // Store bit_per_offset info
370                        output.push(64_u8);
371
372                        // store `num_values` in the first 8 bytes of output buffer
373                        output.extend_from_slice(&(num_values).to_le_bytes());
374
375                        // store `bytes_start_offset` in the next 8 bytes of output buffer
376                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
377
378                        // store offsets
379                        output.extend_from_slice(cast_slice(offsets));
380
381                        // store bytes
382                        output.extend_from_slice(&variable_width_data.data);
383                        Ok(LanceBuffer::from(output))
384                    }
385                    _ => {
386                        panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
387                variable_width_data.bits_per_offset);
388                    }
389                }
390            }
391            _ => {
392                panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
393            }
394        }
395    }
396}
397
398impl PerValueCompressor for VariableEncoder {
399    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
400        let DataBlock::VariableWidth(variable) = data else {
401            panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
402        };
403
404        let encoding = ProtobufUtils21::variable(
405            ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
406            None,
407        );
408        Ok((PerValueDataBlock::Variable(variable), encoding))
409    }
410}
411
412#[derive(Debug, Default)]
413pub struct VariableDecoder {}
414
415impl VariablePerValueDecompressor for VariableDecoder {
416    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
417        Ok(DataBlock::VariableWidth(data))
418    }
419}
420
421#[derive(Debug, Default)]
422pub struct BinaryBlockDecompressor {}
423
424impl BlockDecompressor for BinaryBlockDecompressor {
425    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
426        // The first byte contains bytes_per_offset info
427        let bits_per_offset = data[0];
428        match bits_per_offset {
429            32 => {
430                // the next 4 bytes in the BinaryBlock compressed buffer stores the num_values this block has.
431                let stored_num_values = LittleEndian::read_u32(&data[1..5]);
432                debug_assert_eq!(num_values, stored_num_values as u64);
433
434                // the next 4 bytes in the BinaryBlock compressed buffer stores the bytes_start_offset.
435                let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
436
437                // the next `bytes_start_offset - 9` stores the offsets.
438                let offsets = data.slice_with_length(9, bytes_start_offset as usize - 9);
439
440                // the rest are the binary bytes.
441                let data = data.slice_with_length(
442                    bytes_start_offset as usize,
443                    data.len() - bytes_start_offset as usize,
444                );
445
446                Ok(DataBlock::VariableWidth(VariableWidthBlock {
447                    data,
448                    offsets,
449                    bits_per_offset: 32,
450                    num_values,
451                    block_info: BlockInfo::new(),
452                }))
453            }
454            64 => {
455                // the next 8 bytes in the BinaryBlock compressed buffer stores the num_values this block has.
456                let stored_num_values = LittleEndian::read_u64(&data[1..9]);
457                debug_assert_eq!(num_values, stored_num_values);
458
459                // the next 8 bytes in the BinaryBlock compressed buffer stores the bytes_start_offset.
460                let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
461
462                // the next `bytes_start_offset - 17` stores the offsets.
463                let offsets = data.slice_with_length(17, bytes_start_offset as usize - 17);
464
465                // the rest are the binary bytes.
466                let data = data.slice_with_length(
467                    bytes_start_offset as usize,
468                    data.len() - bytes_start_offset as usize,
469                );
470
471                Ok(DataBlock::VariableWidth(VariableWidthBlock {
472                    data,
473                    offsets,
474                    bits_per_offset: 64,
475                    num_values,
476                    block_info: BlockInfo::new(),
477                }))
478            }
479            _ => panic!("Unsupported bits_per_offset={}", bits_per_offset),
480        }
481    }
482}
483
484#[cfg(test)]
485pub mod tests {
486    use arrow_array::{
487        builder::{LargeStringBuilder, StringBuilder},
488        ArrayRef, StringArray,
489    };
490    use arrow_schema::{DataType, Field};
491
492    use crate::{
493        constants::{
494            COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
495            STRUCTURAL_ENCODING_MINIBLOCK,
496        },
497        testing::check_specific_random,
498    };
499    use rstest::rstest;
500    use std::{collections::HashMap, sync::Arc, vec};
501
502    use crate::{
503        testing::{
504            check_basic_random, check_round_trip_encoding_of_data, FnArrayGeneratorProvider,
505            TestCases,
506        },
507        version::LanceFileVersion,
508    };
509
510    #[test_log::test(tokio::test)]
511    async fn test_utf8_binary() {
512        let field = Field::new("", DataType::Utf8, false);
513        check_specific_random(
514            field,
515            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
516        )
517        .await;
518    }
519
520    #[rstest]
521    #[test_log::test(tokio::test)]
522    async fn test_binary(
523        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
524        structural_encoding: &str,
525        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
526    ) {
527        let mut field_metadata = HashMap::new();
528        field_metadata.insert(
529            STRUCTURAL_ENCODING_META_KEY.to_string(),
530            structural_encoding.into(),
531        );
532
533        let field = Field::new("", data_type, false).with_metadata(field_metadata);
534        check_basic_random(field).await;
535    }
536
537    #[rstest]
538    #[test_log::test(tokio::test)]
539    async fn test_binary_fsst(
540        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
541        structural_encoding: &str,
542        #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
543    ) {
544        let mut field_metadata = HashMap::new();
545        field_metadata.insert(
546            STRUCTURAL_ENCODING_META_KEY.to_string(),
547            structural_encoding.into(),
548        );
549        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
550        let field = Field::new("", data_type, true).with_metadata(field_metadata);
551        // TODO (https://github.com/lancedb/lance/issues/4783)
552        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
553        check_specific_random(field, test_cases).await;
554    }
555
556    #[rstest]
557    #[test_log::test(tokio::test)]
558    async fn test_fsst_large_binary(
559        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
560        structural_encoding: &str,
561        #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
562    ) {
563        let mut field_metadata = HashMap::new();
564        field_metadata.insert(
565            STRUCTURAL_ENCODING_META_KEY.to_string(),
566            structural_encoding.into(),
567        );
568        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
569        let field = Field::new("", data_type, true).with_metadata(field_metadata);
570        check_specific_random(
571            field,
572            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
573        )
574        .await;
575    }
576
577    #[test_log::test(tokio::test)]
578    async fn test_large_binary() {
579        let field = Field::new("", DataType::LargeBinary, true);
580        check_basic_random(field).await;
581    }
582
583    #[test_log::test(tokio::test)]
584    async fn test_large_utf8() {
585        let field = Field::new("", DataType::LargeUtf8, true);
586        check_basic_random(field).await;
587    }
588
589    #[rstest]
590    #[test_log::test(tokio::test)]
591    async fn test_small_strings(
592        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
593        structural_encoding: &str,
594    ) {
595        use crate::testing::check_basic_generated;
596
597        let mut field_metadata = HashMap::new();
598        field_metadata.insert(
599            STRUCTURAL_ENCODING_META_KEY.to_string(),
600            structural_encoding.into(),
601        );
602        let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
603        check_basic_generated(
604            field,
605            Box::new(FnArrayGeneratorProvider::new(move || {
606                lance_datagen::array::utf8_prefix_plus_counter("user_", /*is_large=*/ false)
607            })),
608        )
609        .await;
610    }
611
612    #[rstest]
613    #[test_log::test(tokio::test)]
614    async fn test_simple_binary(
615        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
616        structural_encoding: &str,
617        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
618    ) {
619        let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
620        let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
621
622        let mut field_metadata = HashMap::new();
623        field_metadata.insert(
624            STRUCTURAL_ENCODING_META_KEY.to_string(),
625            structural_encoding.into(),
626        );
627
628        let test_cases = TestCases::default()
629            .with_range(0..2)
630            .with_range(0..3)
631            .with_range(1..3)
632            .with_indices(vec![0, 1, 3, 4]);
633        check_round_trip_encoding_of_data(
634            vec![Arc::new(string_array)],
635            &test_cases,
636            field_metadata,
637        )
638        .await;
639    }
640
641    #[test_log::test(tokio::test)]
642    async fn test_sliced_utf8() {
643        let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
644        let string_array = string_array.slice(1, 3);
645
646        let test_cases = TestCases::default()
647            .with_range(0..1)
648            .with_range(0..2)
649            .with_range(1..2);
650        check_round_trip_encoding_of_data(
651            vec![Arc::new(string_array)],
652            &test_cases,
653            HashMap::new(),
654        )
655        .await;
656    }
657
658    #[test_log::test(tokio::test)]
659    async fn test_bigger_than_max_page_size() {
660        // Create an array with one single 32MiB string
661        let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
662        let string_array = StringArray::from(vec![
663            Some(big_string),
664            Some("abc".to_string()),
665            None,
666            None,
667            Some("xyz".to_string()),
668        ]);
669
670        // Drop the max page size to 1MiB
671        let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
672
673        check_round_trip_encoding_of_data(
674            vec![Arc::new(string_array)],
675            &test_cases,
676            HashMap::new(),
677        )
678        .await;
679
680        // This is a regression testing the case where a page with X rows is split into Y parts
681        // where the number of parts is not evenly divisible by the number of rows.  In this
682        // case we are splitting 90 rows into 4 parts.
683        let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
684        let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
685
686        check_round_trip_encoding_of_data(
687            vec![Arc::new(string_array)],
688            &TestCases::default(),
689            HashMap::new(),
690        )
691        .await;
692    }
693
694    #[test_log::test(tokio::test)]
695    async fn test_empty_strings() {
696        // Scenario 1: Some strings are empty
697
698        let values = [Some("abc"), Some(""), None];
699        // Test empty list at beginning, middle, and end
700        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
701            let mut string_builder = StringBuilder::new();
702            for idx in order {
703                string_builder.append_option(values[idx]);
704            }
705            let string_array = Arc::new(string_builder.finish());
706            let test_cases = TestCases::default()
707                .with_indices(vec![1])
708                .with_indices(vec![0])
709                .with_indices(vec![2])
710                .with_indices(vec![0, 1]);
711            check_round_trip_encoding_of_data(
712                vec![string_array.clone()],
713                &test_cases,
714                HashMap::new(),
715            )
716            .await;
717            let test_cases = test_cases.with_batch_size(1);
718            check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
719                .await;
720        }
721
722        // Scenario 2: All strings are empty
723
724        // When encoding an array of empty strings there are no bytes to encode
725        // which is strange and we want to ensure we handle it
726        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
727
728        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
729        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
730            .await;
731        let test_cases = test_cases.with_batch_size(1);
732        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
733    }
734
735    #[test_log::test(tokio::test)]
736    #[ignore] // This test is quite slow in debug mode
737    async fn test_jumbo_string() {
738        // This is an overflow test.  We have a list of lists where each list
739        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
740        // offsets range
741        let mut string_builder = LargeStringBuilder::new();
742        // a 1 MiB string
743        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
744        for _ in 0..5000 {
745            string_builder.append_option(Some(&giant_string));
746        }
747        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
748        let arrs = vec![giant_array];
749
750        // // We can't validate because our validation relies on concatenating all input arrays
751        let test_cases = TestCases::default().without_validation();
752        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
753    }
754
755    #[rstest]
756    #[test_log::test(tokio::test)]
757    async fn test_binary_dictionary_encoding(
758        #[values(true, false)] with_nulls: bool,
759        #[values(100, 500, 35000)] dict_size: u32,
760    ) {
761        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
762        let strings = (0..dict_size)
763            .map(|i| i.to_string())
764            .collect::<Vec<String>>();
765
766        let repeated_strings: Vec<_> = strings
767            .iter()
768            .cycle()
769            .take(70000)
770            .enumerate()
771            .map(|(i, s)| {
772                if with_nulls && i % 7 == 0 {
773                    None
774                } else {
775                    Some(s.clone())
776                }
777            })
778            .collect();
779        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
780        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
781    }
782
783    #[test_log::test(tokio::test)]
784    async fn test_binary_encoding_verification() {
785        use lance_datagen::{ByteCount, RowCount};
786
787        let test_cases = TestCases::default()
788            .with_expected_encoding("variable")
789            .with_min_file_version(LanceFileVersion::V2_1);
790
791        // Test both automatic selection and explicit configuration
792        // 1. Test automatic binary encoding selection (small strings that won't trigger FSST)
793        let arr_small = lance_datagen::gen_batch()
794            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
795            .into_batch_rows(RowCount::from(1000))
796            .unwrap()
797            .column(0)
798            .clone();
799        check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
800
801        // 2. Test explicit "none" compression to force binary encoding
802        let metadata_explicit =
803            HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
804        let arr_large = lance_datagen::gen_batch()
805            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
806            .into_batch_rows(RowCount::from(2000))
807            .unwrap()
808            .column(0)
809            .clone();
810        check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
811    }
812
813    #[test]
814    fn test_binary_miniblock_with_misaligned_buffer() {
815        use super::BinaryMiniBlockDecompressor;
816        use crate::buffer::LanceBuffer;
817        use crate::compression::MiniBlockDecompressor;
818        use crate::data::DataBlock;
819
820        // Test case 1: u32 offsets
821        {
822            let decompressor = BinaryMiniBlockDecompressor {
823                bits_per_offset: 32,
824            };
825
826            // Create test data with u32 offsets
827            // BinaryMiniBlock format: all offsets followed by all string data
828            // Need to ensure total size is divisible by 4 for u32
829            let mut test_data = Vec::new();
830
831            // Offsets section (3 offsets for 2 values + 1 end offset)
832            test_data.extend_from_slice(&12u32.to_le_bytes()); // offset to start of strings (after offsets)
833            test_data.extend_from_slice(&15u32.to_le_bytes()); // offset to second string
834            test_data.extend_from_slice(&20u32.to_le_bytes()); // offset to end
835
836            // String data section
837            test_data.extend_from_slice(b"ABCXYZ"); // 6 bytes of string data
838            test_data.extend_from_slice(&[0, 0]); // 2 bytes padding to make total 20 bytes (divisible by 4)
839
840            // Create a misaligned buffer by adding padding and slicing
841            let mut padded = Vec::with_capacity(test_data.len() + 1);
842            padded.push(0xFF); // Padding byte to misalign
843            padded.extend_from_slice(&test_data);
844
845            let bytes = bytes::Bytes::from(padded);
846            let misaligned = bytes.slice(1..); // Skip first byte to create misalignment
847
848            // Create LanceBuffer with bytes_per_value=1 to bypass alignment check
849            let buffer = LanceBuffer::from_bytes(misaligned, 1);
850
851            // Verify the buffer is actually misaligned
852            let ptr = buffer.as_ref().as_ptr();
853            assert_ne!(
854                ptr.align_offset(4),
855                0,
856                "Test setup: buffer should be misaligned for u32"
857            );
858
859            // Decompress with misaligned buffer - should work with borrow_to_typed_slice
860            let result = decompressor.decompress(vec![buffer], 2);
861            assert!(
862                result.is_ok(),
863                "Decompression should succeed with misaligned buffer"
864            );
865
866            // Verify the data is correct
867            if let Ok(DataBlock::VariableWidth(block)) = result {
868                assert_eq!(block.num_values, 2);
869                // Data should be the strings (including padding from the original buffer)
870                assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
871            } else {
872                panic!("Expected VariableWidth block");
873            }
874        }
875
876        // Test case 2: u64 offsets
877        {
878            let decompressor = BinaryMiniBlockDecompressor {
879                bits_per_offset: 64,
880            };
881
882            // Create test data with u64 offsets
883            let mut test_data = Vec::new();
884
885            // Offsets section (3 offsets for 2 values + 1 end offset)
886            test_data.extend_from_slice(&24u64.to_le_bytes()); // offset to start of strings (after offsets)
887            test_data.extend_from_slice(&29u64.to_le_bytes()); // offset to second string
888            test_data.extend_from_slice(&40u64.to_le_bytes()); // offset to end (divisible by 8)
889
890            // String data section
891            test_data.extend_from_slice(b"HelloWorld"); // 10 bytes of string data
892            test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); // 6 bytes padding to make total 40 bytes (divisible by 8)
893
894            // Create misaligned buffer
895            let mut padded = Vec::with_capacity(test_data.len() + 3);
896            padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); // 3 bytes padding for misalignment
897            padded.extend_from_slice(&test_data);
898
899            let bytes = bytes::Bytes::from(padded);
900            let misaligned = bytes.slice(3..); // Skip 3 bytes
901
902            let buffer = LanceBuffer::from_bytes(misaligned, 1);
903
904            // Verify misalignment for u64
905            let ptr = buffer.as_ref().as_ptr();
906            assert_ne!(
907                ptr.align_offset(8),
908                0,
909                "Test setup: buffer should be misaligned for u64"
910            );
911
912            // Decompress should succeed
913            let result = decompressor.decompress(vec![buffer], 2);
914            assert!(
915                result.is_ok(),
916                "Decompression should succeed with misaligned u64 buffer"
917            );
918
919            if let Ok(DataBlock::VariableWidth(block)) = result {
920                assert_eq!(block.num_values, 2);
921                // Data should be the strings (including padding from the original buffer)
922                assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
923            } else {
924                panic!("Expected VariableWidth block");
925            }
926        }
927    }
928}