Skip to main content

lance_encoding/encodings/physical/
binary.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Basic encodings for variable width data
5//!
6//! These are not compression but represent the "leaf" encodings for variable length data
7//! where we simply match the data with the rules of the structural encoding.
8//!
9//! These encodings are transparent since we aren't actually doing any compression.  No information
10//! is needed in the encoding description.
11
12use arrow_array::OffsetSizeTrait;
13use byteorder::{ByteOrder, LittleEndian};
14use core::panic;
15
16use crate::compression::{
17    BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
18};
19
20use crate::buffer::LanceBuffer;
21use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
22use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
23use crate::encodings::logical::primitive::miniblock::{
24    MAX_MINIBLOCK_VALUES, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
25};
26use crate::format::pb21::CompressiveEncoding;
27use crate::format::pb21::compressive_encoding::Compression;
28use crate::format::{ProtobufUtils21, pb21};
29
30use lance_core::utils::bit::pad_bytes_to;
31use lance_core::{Error, Result};
32
33#[derive(Debug)]
34pub struct BinaryMiniBlockEncoder {
35    minichunk_size: i64,
36}
37
38impl Default for BinaryMiniBlockEncoder {
39    fn default() -> Self {
40        Self {
41            minichunk_size: *AIM_MINICHUNK_SIZE,
42        }
43    }
44}
45
46const DEFAULT_AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
47
48pub static AIM_MINICHUNK_SIZE: std::sync::LazyLock<i64> = std::sync::LazyLock::new(|| {
49    std::env::var("LANCE_BINARY_MINIBLOCK_CHUNK_SIZE")
50        .unwrap_or_else(|_| DEFAULT_AIM_MINICHUNK_SIZE.to_string())
51        .parse::<i64>()
52        .unwrap_or(DEFAULT_AIM_MINICHUNK_SIZE)
53});
54
55// Make it to support both u32 and u64
56fn chunk_offsets<N: OffsetSizeTrait>(
57    offsets: &[N],
58    data: &[u8],
59    alignment: usize,
60    minichunk_size: i64,
61) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
62    #[derive(Debug)]
63    struct ChunkInfo {
64        chunk_start_offset_in_orig_idx: usize,
65        chunk_last_offset_in_orig_idx: usize,
66        // the bytes in every chunk starts at `chunk.bytes_start_offset`
67        bytes_start_offset: usize,
68        // every chunk is padded to 8 bytes.
69        // we need to interpret every chunk as &[u32] so we need it to padded at least to 4 bytes,
70        // this field can actually be eliminated and I can use `num_bytes` in `MiniBlockChunk` to compute
71        // the `output_total_bytes`.
72        padded_chunk_size: usize,
73    }
74
75    let byte_width: usize = N::get_byte_width();
76    let mut chunks_info = vec![];
77    let mut chunks = vec![];
78    let mut last_offset_in_orig_idx = 0;
79    loop {
80        let this_last_offset_in_orig_idx =
81            search_next_offset_idx(offsets, last_offset_in_orig_idx, minichunk_size);
82
83        let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
84        let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
85        let this_chunk_size =
86            (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
87
88        let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
89        debug_assert!(padded_chunk_size > 0);
90
91        let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
92        chunks_info.push(ChunkInfo {
93            chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
94            chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
95            bytes_start_offset: this_chunk_bytes_start_offset,
96            padded_chunk_size,
97        });
98        chunks.push(MiniBlockChunk {
99            log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
100                0
101            } else {
102                num_values_in_this_chunk.trailing_zeros() as u8
103            },
104            buffer_sizes: vec![padded_chunk_size as u32],
105        });
106        if this_last_offset_in_orig_idx == offsets.len() - 1 {
107            break;
108        }
109        last_offset_in_orig_idx = this_last_offset_in_orig_idx;
110    }
111
112    let output_total_bytes = chunks_info
113        .iter()
114        .map(|chunk_info| chunk_info.padded_chunk_size)
115        .sum::<usize>();
116
117    let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
118
119    for chunk in chunks_info {
120        let this_chunk_offsets: Vec<N> = offsets
121            [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
122            .iter()
123            .map(|offset| {
124                *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
125                    + N::from_usize(chunk.bytes_start_offset).unwrap()
126            })
127            .collect();
128
129        let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
130        output.extend_from_slice(&this_chunk_offsets);
131
132        let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
133            .to_usize()
134            .unwrap();
135        let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
136            .to_usize()
137            .unwrap();
138        output.extend_from_slice(&data[start_in_orig..end_in_orig]);
139
140        // pad this chunk to make it align to desired bytes.
141        const PAD_BYTE: u8 = 72;
142        let pad_len = pad_bytes_to(output.len(), alignment);
143
144        // Compare with usize literal to avoid type mismatch with N
145        if pad_len > 0_usize {
146            output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
147        }
148    }
149    (vec![LanceBuffer::reinterpret_vec(output)], chunks)
150}
151
152// search for the next offset index to cut the values into a chunk.
153// this function incrementally peek the number of values in a chunk,
154// each time multiplies the number of values by 2.
155// It returns the offset_idx in `offsets` that belongs to this chunk.
156fn search_next_offset_idx<N: OffsetSizeTrait>(
157    offsets: &[N],
158    last_offset_idx: usize,
159    minichunk_size: i64,
160) -> usize {
161    // MiniBlockChunk uses `log_num_values == 0` as a sentinel for the final chunk. This means we
162    // must avoid creating 1-value chunks except for the final chunk, even if the configured
163    // `minichunk_size` is too small to fit more than one value.
164    let remaining_values = offsets.len().saturating_sub(last_offset_idx + 1);
165    if remaining_values <= 1 {
166        return offsets.len() - 1;
167    }
168
169    let mut num_values = 2;
170    let mut new_num_values = num_values * 2;
171    loop {
172        if last_offset_idx + new_num_values >= offsets.len() {
173            let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
174            // existing bytes plus the new offset size
175            let new_size = existing_bytes
176                + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
177            if new_size.to_i64().unwrap() <= minichunk_size {
178                // case 1: can fit the rest of all data into a miniblock
179                return offsets.len() - 1;
180            } else {
181                // case 2: can only fit the last tried `num_values` into a miniblock
182                return last_offset_idx + num_values;
183            }
184        }
185        let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
186        let new_size =
187            existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
188        if new_size.to_i64().unwrap() <= minichunk_size {
189            if new_num_values * 2 > MAX_MINIBLOCK_VALUES as usize {
190                // hit the max number of values limit
191                break;
192            }
193            num_values = new_num_values;
194            new_num_values *= 2;
195        } else {
196            break;
197        }
198    }
199    last_offset_idx + num_values
200}
201
202impl BinaryMiniBlockEncoder {
203    pub fn new(minichunk_size: Option<i64>) -> Self {
204        Self {
205            minichunk_size: minichunk_size.unwrap_or(*AIM_MINICHUNK_SIZE),
206        }
207    }
208
209    // put binary data into chunks, every chunk is less than or equal to `minichunk_size`.
210    // In each chunk, offsets are put first then followed by binary bytes data, each chunk is padded to 8 bytes.
211    // the offsets in the chunk points to the bytes offset in this chunk.
212    fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
213        // TODO: Support compression of offsets
214        // TODO: Support general compression of data
215        match data.bits_per_offset {
216            32 => {
217                let offsets = data.offsets.borrow_to_typed_slice::<i32>();
218                let (buffers, chunks) =
219                    chunk_offsets(offsets.as_ref(), &data.data, 4, self.minichunk_size);
220                (
221                    MiniBlockCompressed {
222                        data: buffers,
223                        chunks,
224                        num_values: data.num_values,
225                    },
226                    ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
227                )
228            }
229            64 => {
230                let offsets = data.offsets.borrow_to_typed_slice::<i64>();
231                let (buffers, chunks) =
232                    chunk_offsets(offsets.as_ref(), &data.data, 8, self.minichunk_size);
233                (
234                    MiniBlockCompressed {
235                        data: buffers,
236                        chunks,
237                        num_values: data.num_values,
238                    },
239                    ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
240                )
241            }
242            _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
243        }
244    }
245}
246
247impl MiniBlockCompressor for BinaryMiniBlockEncoder {
248    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
249        match data {
250            DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
251            _ => Err(Error::invalid_input_source(
252                format!(
253                    "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
254                    data.name()
255                )
256                .into(),
257            )),
258        }
259    }
260}
261
262#[derive(Debug)]
263pub struct BinaryMiniBlockDecompressor {
264    bits_per_offset: u8,
265}
266
267impl BinaryMiniBlockDecompressor {
268    pub fn new(bits_per_offset: u8) -> Self {
269        assert!(bits_per_offset == 32 || bits_per_offset == 64);
270        Self { bits_per_offset }
271    }
272
273    pub fn from_variable(variable: &pb21::Variable) -> Self {
274        if let Compression::Flat(flat) = variable
275            .offsets
276            .as_ref()
277            .unwrap()
278            .compression
279            .as_ref()
280            .unwrap()
281        {
282            Self {
283                bits_per_offset: flat.bits_per_value as u8,
284            }
285        } else {
286            panic!("Unsupported offsets compression: {:?}", variable.offsets);
287        }
288    }
289}
290
291impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
292    // decompress a MiniBlock of binary data, the num_values must be less than or equal
293    // to the number of values this MiniBlock has, BinaryMiniBlock doesn't store `the number of values`
294    // it has so assertion can not be done here and the caller of `decompress` must ensure
295    // `num_values` <= number of values in the chunk.
296    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
297        assert_eq!(data.len(), 1);
298        let data = data.into_iter().next().unwrap();
299
300        if self.bits_per_offset == 64 {
301            // offset and at least one value
302            assert!(data.len() >= 16);
303
304            let offsets_buffer = data.borrow_to_typed_slice::<u64>();
305            let offsets = offsets_buffer.as_ref();
306
307            let result_offsets = offsets[0..(num_values + 1) as usize]
308                .iter()
309                .map(|offset| offset - offsets[0])
310                .collect::<Vec<u64>>();
311
312            Ok(DataBlock::VariableWidth(VariableWidthBlock {
313                data: LanceBuffer::from(
314                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
315                ),
316                offsets: LanceBuffer::reinterpret_vec(result_offsets),
317                bits_per_offset: 64,
318                num_values,
319                block_info: BlockInfo::new(),
320            }))
321        } else {
322            // offset and at least one value
323            assert!(data.len() >= 8);
324
325            let offsets_buffer = data.borrow_to_typed_slice::<u32>();
326            let offsets = offsets_buffer.as_ref();
327
328            let result_offsets = offsets[0..(num_values + 1) as usize]
329                .iter()
330                .map(|offset| offset - offsets[0])
331                .collect::<Vec<u32>>();
332
333            Ok(DataBlock::VariableWidth(VariableWidthBlock {
334                data: LanceBuffer::from(
335                    data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
336                ),
337                offsets: LanceBuffer::reinterpret_vec(result_offsets),
338                bits_per_offset: 32,
339                num_values,
340                block_info: BlockInfo::new(),
341            }))
342        }
343    }
344}
345
346/// Most basic encoding for variable-width data which does no compression at all
347/// The DataBlock memory layout looks like below:
348///
349/// | bits_per_offset           | bytes_start_offset        | offsets data | bytes data |
350/// | ------------------------- | ------------------------- | ------------ | ---------- |
351/// | <bits_per_offset>/8 bytes | <bits_per_offset>/8 bytes | offsets_len  | data_len   |
352///
353/// It's used in VariableEncoder and BinaryBlockDecompressor
354///
355#[derive(Debug, Default)]
356pub struct VariableEncoder {}
357
358impl BlockCompressor for VariableEncoder {
359    fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
360        match data {
361            DataBlock::VariableWidth(ref mut variable_width_data) => {
362                match variable_width_data.bits_per_offset {
363                    32 => {
364                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
365                        let offsets = offsets.as_ref();
366                        // The first 4 bytes store the bits per offset, the next 4 bytes store the start
367                        // offset of the bytes data, then offsets data, then bytes data.
368                        let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
369
370                        let output_total_bytes =
371                            bytes_start_offset as usize + variable_width_data.data.len();
372                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
373
374                        // Store bit_per_offset info
375                        output.extend_from_slice(&(32_u32).to_le_bytes());
376
377                        // store `bytes_start_offset` in the next 4 bytes of output buffer
378                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
379
380                        // store offsets
381                        output.extend_from_slice(&variable_width_data.offsets);
382
383                        // store bytes
384                        output.extend_from_slice(&variable_width_data.data);
385                        Ok(LanceBuffer::from(output))
386                    }
387                    64 => {
388                        let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
389                        let offsets = offsets.as_ref();
390                        // The first 8 bytes store the bits per offset, the next 8 bytes store the start
391                        // offset of the bytes data, then offsets data, then bytes data.
392                        let bytes_start_offset = 8 + 8 + std::mem::size_of_val(offsets) as u64;
393
394                        let output_total_bytes =
395                            bytes_start_offset as usize + variable_width_data.data.len();
396                        let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
397
398                        // Store bit_per_offset info
399                        output.extend_from_slice(&(64_u64).to_le_bytes());
400
401                        // store `bytes_start_offset` in the next 8 bytes of output buffer
402                        output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
403
404                        // store offsets
405                        output.extend_from_slice(&variable_width_data.offsets);
406
407                        // store bytes
408                        output.extend_from_slice(&variable_width_data.data);
409                        Ok(LanceBuffer::from(output))
410                    }
411                    _ => {
412                        panic!(
413                            "BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
414                            variable_width_data.bits_per_offset
415                        );
416                    }
417                }
418            }
419            _ => {
420                panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
421            }
422        }
423    }
424}
425
426impl PerValueCompressor for VariableEncoder {
427    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
428        let DataBlock::VariableWidth(variable) = data else {
429            panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
430        };
431
432        let encoding = ProtobufUtils21::variable(
433            ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
434            None,
435        );
436        Ok((PerValueDataBlock::Variable(variable), encoding))
437    }
438}
439
440#[derive(Debug, Default)]
441pub struct VariableDecoder {}
442
443impl VariablePerValueDecompressor for VariableDecoder {
444    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
445        Ok(DataBlock::VariableWidth(data))
446    }
447}
448
449#[derive(Debug, Default)]
450pub struct BinaryBlockDecompressor {}
451
452impl BlockDecompressor for BinaryBlockDecompressor {
453    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
454        // In older (not quite stable) versions we stored the bits per offset as a single byte and then the num_values
455        // as four bytes.  However, this led to alignment problems and was wasteful since we already store the num_values
456        // in higher layers.
457        //
458        // In the standard scheme we use 4 bytes for the bits per offset and 4 bytes for the bytes_start_offset and we
459        // rely on the passed in num_values to be correct.
460
461        // This isn't perfect but it's probably good enough and the best I think we can do.  The bits per offset will
462        // never be more than 255 and it's little endian so the last 3 bytes will always be 0.  These will be the least
463        // significant 3 bytes of the number of values in the old scheme.  It's pretty unlikely these are all 0 (that would
464        // mean there are at least 16M values in a single page) so we'll use this to determine if the old scheme is used.
465        let is_old_scheme = data[1] != 0 || data[2] != 0 || data[3] != 0;
466
467        let (bits_per_offset, bytes_start_offset, offset_start) = if is_old_scheme {
468            // Old scheme
469            let bits_per_offset = data[0];
470            match bits_per_offset {
471                32 => {
472                    debug_assert_eq!(LittleEndian::read_u32(&data[1..5]), num_values as u32);
473                    let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
474                    (bits_per_offset, bytes_start_offset as u64, 9)
475                }
476                64 => {
477                    debug_assert_eq!(LittleEndian::read_u64(&data[1..9]), num_values);
478                    let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
479                    (bits_per_offset, bytes_start_offset, 17)
480                }
481                _ => {
482                    return Err(Error::invalid_input_source(
483                        format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
484                    ));
485                }
486            }
487        } else {
488            // Standard scheme
489            let bits_per_offset = LittleEndian::read_u32(&data[0..4]) as u8;
490            match bits_per_offset {
491                32 => {
492                    let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
493                    (bits_per_offset, bytes_start_offset as u64, 8)
494                }
495                64 => {
496                    let bytes_start_offset = LittleEndian::read_u64(&data[8..16]);
497                    (bits_per_offset, bytes_start_offset, 16)
498                }
499                _ => {
500                    return Err(Error::invalid_input_source(
501                        format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
502                    ));
503                }
504            }
505        };
506
507        // the next `bytes_start_offset - offset_start` stores the offsets.
508        let offsets =
509            data.slice_with_length(offset_start, bytes_start_offset as usize - offset_start);
510
511        // the rest are the binary bytes.
512        let data = data.slice_with_length(
513            bytes_start_offset as usize,
514            data.len() - bytes_start_offset as usize,
515        );
516
517        Ok(DataBlock::VariableWidth(VariableWidthBlock {
518            data,
519            offsets,
520            bits_per_offset,
521            num_values,
522            block_info: BlockInfo::new(),
523        }))
524    }
525}
526
527#[cfg(test)]
528pub mod tests {
529    use arrow_array::{
530        ArrayRef, StringArray,
531        builder::{LargeStringBuilder, StringBuilder},
532    };
533    use arrow_schema::{DataType, Field};
534
535    use crate::{
536        constants::{
537            COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
538            STRUCTURAL_ENCODING_MINIBLOCK,
539        },
540        testing::check_specific_random,
541    };
542    use rstest::rstest;
543    use std::{collections::HashMap, sync::Arc, vec};
544
545    use crate::{
546        testing::{
547            FnArrayGeneratorProvider, TestCases, check_basic_random,
548            check_round_trip_encoding_of_data,
549        },
550        version::LanceFileVersion,
551    };
552
553    #[test_log::test(tokio::test)]
554    async fn test_utf8_binary() {
555        let field = Field::new("", DataType::Utf8, false);
556        check_specific_random(
557            field,
558            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
559        )
560        .await;
561    }
562
563    #[rstest]
564    #[test_log::test(tokio::test)]
565    async fn test_binary(
566        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
567        structural_encoding: &str,
568        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
569    ) {
570        let mut field_metadata = HashMap::new();
571        field_metadata.insert(
572            STRUCTURAL_ENCODING_META_KEY.to_string(),
573            structural_encoding.into(),
574        );
575
576        let field = Field::new("", data_type, false).with_metadata(field_metadata);
577        check_basic_random(field).await;
578    }
579
580    #[rstest]
581    #[test_log::test(tokio::test)]
582    async fn test_binary_fsst(
583        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
584        structural_encoding: &str,
585        #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
586    ) {
587        let mut field_metadata = HashMap::new();
588        field_metadata.insert(
589            STRUCTURAL_ENCODING_META_KEY.to_string(),
590            structural_encoding.into(),
591        );
592        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
593        let field = Field::new("", data_type, true).with_metadata(field_metadata);
594        // TODO (https://github.com/lance-format/lance/issues/4783)
595        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
596        check_specific_random(field, test_cases).await;
597    }
598
599    #[rstest]
600    #[test_log::test(tokio::test)]
601    async fn test_fsst_large_binary(
602        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
603        structural_encoding: &str,
604        #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
605    ) {
606        let mut field_metadata = HashMap::new();
607        field_metadata.insert(
608            STRUCTURAL_ENCODING_META_KEY.to_string(),
609            structural_encoding.into(),
610        );
611        field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
612        let field = Field::new("", data_type, true).with_metadata(field_metadata);
613        check_specific_random(
614            field,
615            TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
616        )
617        .await;
618    }
619
620    #[test_log::test(tokio::test)]
621    async fn test_large_binary() {
622        let field = Field::new("", DataType::LargeBinary, true);
623        check_basic_random(field).await;
624    }
625
626    #[test_log::test(tokio::test)]
627    async fn test_large_utf8() {
628        let field = Field::new("", DataType::LargeUtf8, true);
629        check_basic_random(field).await;
630    }
631
632    #[rstest]
633    #[test_log::test(tokio::test)]
634    async fn test_small_strings(
635        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
636        structural_encoding: &str,
637    ) {
638        use crate::testing::check_basic_generated;
639
640        let mut field_metadata = HashMap::new();
641        field_metadata.insert(
642            STRUCTURAL_ENCODING_META_KEY.to_string(),
643            structural_encoding.into(),
644        );
645        let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
646        check_basic_generated(
647            field,
648            Box::new(FnArrayGeneratorProvider::new(move || {
649                lance_datagen::array::utf8_prefix_plus_counter("user_", /*is_large=*/ false)
650            })),
651        )
652        .await;
653    }
654
655    #[rstest]
656    #[test_log::test(tokio::test)]
657    async fn test_simple_binary(
658        #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
659        structural_encoding: &str,
660        #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
661    ) {
662        let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
663        let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
664
665        let mut field_metadata = HashMap::new();
666        field_metadata.insert(
667            STRUCTURAL_ENCODING_META_KEY.to_string(),
668            structural_encoding.into(),
669        );
670
671        let test_cases = TestCases::default()
672            .with_range(0..2)
673            .with_range(0..3)
674            .with_range(1..3)
675            .with_indices(vec![0, 1, 3, 4]);
676        check_round_trip_encoding_of_data(
677            vec![Arc::new(string_array)],
678            &test_cases,
679            field_metadata,
680        )
681        .await;
682    }
683
684    #[test_log::test(tokio::test)]
685    async fn test_sliced_utf8() {
686        let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
687        let string_array = string_array.slice(1, 3);
688
689        let test_cases = TestCases::default()
690            .with_range(0..1)
691            .with_range(0..2)
692            .with_range(1..2);
693        check_round_trip_encoding_of_data(
694            vec![Arc::new(string_array)],
695            &test_cases,
696            HashMap::new(),
697        )
698        .await;
699    }
700
701    #[test_log::test(tokio::test)]
702    async fn test_bigger_than_max_page_size() {
703        // Create an array with one single 32MiB string
704        let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
705        let string_array = StringArray::from(vec![
706            Some(big_string),
707            Some("abc".to_string()),
708            None,
709            None,
710            Some("xyz".to_string()),
711        ]);
712
713        // Drop the max page size to 1MiB
714        let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
715
716        check_round_trip_encoding_of_data(
717            vec![Arc::new(string_array)],
718            &test_cases,
719            HashMap::new(),
720        )
721        .await;
722
723        // This is a regression testing the case where a page with X rows is split into Y parts
724        // where the number of parts is not evenly divisible by the number of rows.  In this
725        // case we are splitting 90 rows into 4 parts.
726        let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
727        let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
728
729        check_round_trip_encoding_of_data(
730            vec![Arc::new(string_array)],
731            &TestCases::default(),
732            HashMap::new(),
733        )
734        .await;
735    }
736
737    #[test_log::test(tokio::test)]
738    async fn test_empty_strings() {
739        // Scenario 1: Some strings are empty
740
741        let values = [Some("abc"), Some(""), None];
742        // Test empty list at beginning, middle, and end
743        for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
744            let mut string_builder = StringBuilder::new();
745            for idx in order {
746                string_builder.append_option(values[idx]);
747            }
748            let string_array = Arc::new(string_builder.finish());
749            let test_cases = TestCases::default()
750                .with_indices(vec![1])
751                .with_indices(vec![0])
752                .with_indices(vec![2])
753                .with_indices(vec![0, 1]);
754            check_round_trip_encoding_of_data(
755                vec![string_array.clone()],
756                &test_cases,
757                HashMap::new(),
758            )
759            .await;
760            let test_cases = test_cases.with_batch_size(1);
761            check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
762                .await;
763        }
764
765        // Scenario 2: All strings are empty
766
767        // When encoding an array of empty strings there are no bytes to encode
768        // which is strange and we want to ensure we handle it
769        let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
770
771        let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
772        check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
773            .await;
774        let test_cases = test_cases.with_batch_size(1);
775        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
776    }
777
778    #[test_log::test(tokio::test)]
779    #[ignore] // This test is quite slow in debug mode
780    async fn test_jumbo_string() {
781        // This is an overflow test.  We have a list of lists where each list
782        // has 1Mi items.  We encode 5000 of these lists and so we have over 4Gi in the
783        // offsets range
784        let mut string_builder = LargeStringBuilder::new();
785        // a 1 MiB string
786        let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
787        for _ in 0..5000 {
788            string_builder.append_option(Some(&giant_string));
789        }
790        let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
791        let arrs = vec![giant_array];
792
793        // // We can't validate because our validation relies on concatenating all input arrays
794        let test_cases = TestCases::default().without_validation();
795        check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
796    }
797
798    #[rstest]
799    #[test_log::test(tokio::test)]
800    async fn test_binary_dictionary_encoding(
801        #[values(true, false)] with_nulls: bool,
802        #[values(100, 500, 35000)] dict_size: u32,
803    ) {
804        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
805        let strings = (0..dict_size)
806            .map(|i| i.to_string())
807            .collect::<Vec<String>>();
808
809        let repeated_strings: Vec<_> = strings
810            .iter()
811            .cycle()
812            .take(70000)
813            .enumerate()
814            .map(|(i, s)| {
815                if with_nulls && i % 7 == 0 {
816                    None
817                } else {
818                    Some(s.clone())
819                }
820            })
821            .collect();
822        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
823        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
824    }
825
826    #[test_log::test(tokio::test)]
827    async fn test_binary_encoding_verification() {
828        use lance_datagen::{ByteCount, RowCount};
829
830        let test_cases = TestCases::default()
831            .with_expected_encoding("variable")
832            .with_min_file_version(LanceFileVersion::V2_1);
833
834        // Test both automatic selection and explicit configuration
835        // 1. Test automatic binary encoding selection (small strings that won't trigger FSST)
836        let arr_small = lance_datagen::gen_batch()
837            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
838            .into_batch_rows(RowCount::from(1000))
839            .unwrap()
840            .column(0)
841            .clone();
842        check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
843
844        // 2. Test explicit "none" compression to force binary encoding
845        let metadata_explicit =
846            HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
847        let arr_large = lance_datagen::gen_batch()
848            .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
849            .into_batch_rows(RowCount::from(2000))
850            .unwrap()
851            .column(0)
852            .clone();
853        check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
854    }
855
856    #[test]
857    fn test_binary_miniblock_with_misaligned_buffer() {
858        use super::BinaryMiniBlockDecompressor;
859        use crate::buffer::LanceBuffer;
860        use crate::compression::MiniBlockDecompressor;
861        use crate::data::DataBlock;
862
863        // Test case 1: u32 offsets
864        {
865            let decompressor = BinaryMiniBlockDecompressor {
866                bits_per_offset: 32,
867            };
868
869            // Create test data with u32 offsets
870            // BinaryMiniBlock format: all offsets followed by all string data
871            // Need to ensure total size is divisible by 4 for u32
872            let mut test_data = Vec::new();
873
874            // Offsets section (3 offsets for 2 values + 1 end offset)
875            test_data.extend_from_slice(&12u32.to_le_bytes()); // offset to start of strings (after offsets)
876            test_data.extend_from_slice(&15u32.to_le_bytes()); // offset to second string
877            test_data.extend_from_slice(&20u32.to_le_bytes()); // offset to end
878
879            // String data section
880            test_data.extend_from_slice(b"ABCXYZ"); // 6 bytes of string data
881            test_data.extend_from_slice(&[0, 0]); // 2 bytes padding to make total 20 bytes (divisible by 4)
882
883            // Create a misaligned buffer by adding padding and slicing
884            let mut padded = Vec::with_capacity(test_data.len() + 1);
885            padded.push(0xFF); // Padding byte to misalign
886            padded.extend_from_slice(&test_data);
887
888            let bytes = bytes::Bytes::from(padded);
889            let misaligned = bytes.slice(1..); // Skip first byte to create misalignment
890
891            // Create LanceBuffer with bytes_per_value=1 to bypass alignment check
892            let buffer = LanceBuffer::from_bytes(misaligned, 1);
893
894            // Verify the buffer is actually misaligned
895            let ptr = buffer.as_ref().as_ptr();
896            assert_ne!(
897                ptr.align_offset(4),
898                0,
899                "Test setup: buffer should be misaligned for u32"
900            );
901
902            // Decompress with misaligned buffer - should work with borrow_to_typed_slice
903            let result = decompressor.decompress(vec![buffer], 2);
904            assert!(
905                result.is_ok(),
906                "Decompression should succeed with misaligned buffer"
907            );
908
909            // Verify the data is correct
910            if let Ok(DataBlock::VariableWidth(block)) = result {
911                assert_eq!(block.num_values, 2);
912                // Data should be the strings (including padding from the original buffer)
913                assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
914            } else {
915                panic!("Expected VariableWidth block");
916            }
917        }
918
919        // Test case 2: u64 offsets
920        {
921            let decompressor = BinaryMiniBlockDecompressor {
922                bits_per_offset: 64,
923            };
924
925            // Create test data with u64 offsets
926            let mut test_data = Vec::new();
927
928            // Offsets section (3 offsets for 2 values + 1 end offset)
929            test_data.extend_from_slice(&24u64.to_le_bytes()); // offset to start of strings (after offsets)
930            test_data.extend_from_slice(&29u64.to_le_bytes()); // offset to second string
931            test_data.extend_from_slice(&40u64.to_le_bytes()); // offset to end (divisible by 8)
932
933            // String data section
934            test_data.extend_from_slice(b"HelloWorld"); // 10 bytes of string data
935            test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); // 6 bytes padding to make total 40 bytes (divisible by 8)
936
937            // Create misaligned buffer
938            let mut padded = Vec::with_capacity(test_data.len() + 3);
939            padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); // 3 bytes padding for misalignment
940            padded.extend_from_slice(&test_data);
941
942            let bytes = bytes::Bytes::from(padded);
943            let misaligned = bytes.slice(3..); // Skip 3 bytes
944
945            let buffer = LanceBuffer::from_bytes(misaligned, 1);
946
947            // Verify misalignment for u64
948            let ptr = buffer.as_ref().as_ptr();
949            assert_ne!(
950                ptr.align_offset(8),
951                0,
952                "Test setup: buffer should be misaligned for u64"
953            );
954
955            // Decompress should succeed
956            let result = decompressor.decompress(vec![buffer], 2);
957            assert!(
958                result.is_ok(),
959                "Decompression should succeed with misaligned u64 buffer"
960            );
961
962            if let Ok(DataBlock::VariableWidth(block)) = result {
963                assert_eq!(block.num_values, 2);
964                // Data should be the strings (including padding from the original buffer)
965                assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
966            } else {
967                panic!("Expected VariableWidth block");
968            }
969        }
970    }
971}