Skip to main content

lance_encoding/encodings/physical/
rle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # RLE (Run-Length Encoding)
5//!
6//! RLE compression for Lance, optimized for data with repeated values.
7//!
8//! ## Encoding Format
9//!
10//! RLE uses a dual-buffer format to store compressed data:
11//!
12//! - **Values Buffer**: Stores unique values in their original data type
13//! - **Lengths Buffer**: Stores the repeat count for each value as u8
14//!
15//! ### Example
16//!
17//! Input data: `[1, 1, 1, 2, 2, 3, 3, 3, 3]`
18//!
19//! Encoded as:
20//! - Values buffer: `[1, 2, 3]` (3 × 4 bytes for i32)
21//! - Lengths buffer: `[3, 2, 4]` (3 × 1 byte for u8)
22//!
23//! ### Long Run Handling
24//!
25//! When a run exceeds 255 values, it is split into multiple runs of 255
26//! followed by a final run with the remainder. For example, a run of 1000
27//! identical values becomes 4 runs: [255, 255, 255, 235].
28//!
29//! ## Supported Types
30//!
31//! RLE supports all fixed-width primitive types:
32//! - 8-bit: u8, i8
33//! - 16-bit: u16, i16
34//! - 32-bit: u32, i32, f32
35//! - 64-bit: u64, i64, f64
36//!
37//! ## Compression Strategy
38//!
39//! RLE is automatically selected when:
40//! - The run count (number of value transitions) < 50% of total values
41//! - This indicates sufficient repetition for RLE to be effective
42//!
43//! ## MiniBlock Chunk Handling
44//!
45//! When used in the miniblock path, all chunks share two global buffers (values and lengths).
46//! Each chunk's `buffer_sizes` identifies its slice within those global buffers. Non-last chunks
47//! contain a power-of-2 number of values.
48//!
49//! NOTE: The current encoder uses a 2048-value cap per chunk as a workaround for
50//! <https://github.com/lancedb/lance/issues/4429>.
51//!
52//! ## Block Format
53//!
54//! When used in the block compression path, the encoded output is a single buffer:
55//! `[8-byte header: values buffer size][values buffer][run_lengths buffer]`.
56
57use arrow_buffer::ArrowNativeType;
58use log::trace;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::{BlockCompressor, BlockDecompressor, MiniBlockDecompressor};
62use crate::data::DataBlock;
63use crate::data::{BlockInfo, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65    MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES, MiniBlockChunk, MiniBlockCompressed,
66    MiniBlockCompressor,
67};
68use crate::format::ProtobufUtils21;
69use crate::format::pb21::CompressiveEncoding;
70
71use lance_core::{Error, Result};
72
73/// RLE encoder for miniblock format
74#[derive(Debug, Default)]
75pub struct RleEncoder;
76
77impl RleEncoder {
78    pub fn new() -> Self {
79        Self
80    }
81
82    fn encode_data(
83        &self,
84        data: &LanceBuffer,
85        num_values: u64,
86        bits_per_value: u64,
87    ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
88        if num_values == 0 {
89            return Ok((Vec::new(), Vec::new()));
90        }
91
92        let bytes_per_value = (bits_per_value / 8) as usize;
93
94        // Pre-allocate global buffers with estimated capacity
95        // Assume average compression ratio of ~10:1 (10 values per run)
96        let estimated_runs = num_values as usize / 10;
97        let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
98        let mut all_lengths = Vec::with_capacity(estimated_runs);
99        let mut chunks = Vec::new();
100
101        let mut offset = 0usize;
102        let mut values_remaining = num_values as usize;
103
104        while values_remaining > 0 {
105            let values_start = all_values.len();
106            let lengths_start = all_lengths.len();
107
108            let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
109                8 => self.encode_chunk_rolling::<u8>(
110                    data,
111                    offset,
112                    values_remaining,
113                    &mut all_values,
114                    &mut all_lengths,
115                ),
116                16 => self.encode_chunk_rolling::<u16>(
117                    data,
118                    offset,
119                    values_remaining,
120                    &mut all_values,
121                    &mut all_lengths,
122                ),
123                32 => self.encode_chunk_rolling::<u32>(
124                    data,
125                    offset,
126                    values_remaining,
127                    &mut all_values,
128                    &mut all_lengths,
129                ),
130                64 => self.encode_chunk_rolling::<u64>(
131                    data,
132                    offset,
133                    values_remaining,
134                    &mut all_values,
135                    &mut all_lengths,
136                ),
137                _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
138            };
139
140            if values_processed == 0 {
141                break;
142            }
143
144            let log_num_values = if is_last_chunk {
145                0
146            } else {
147                assert!(
148                    values_processed.is_power_of_two(),
149                    "Non-last chunk must have power-of-2 values"
150                );
151                values_processed.ilog2() as u8
152            };
153
154            let values_size = all_values.len() - values_start;
155            let lengths_size = all_lengths.len() - lengths_start;
156
157            let chunk = MiniBlockChunk {
158                buffer_sizes: vec![values_size as u32, lengths_size as u32],
159                log_num_values,
160            };
161
162            chunks.push(chunk);
163
164            offset += values_processed;
165            values_remaining -= values_processed;
166        }
167
168        // Return exactly two buffers: values and lengths
169        Ok((
170            vec![
171                LanceBuffer::from(all_values),
172                LanceBuffer::from(all_lengths),
173            ],
174            chunks,
175        ))
176    }
177
178    /// Encodes a chunk of data using RLE compression with dynamic boundary detection.
179    ///
180    /// This function processes values sequentially, detecting runs (sequences of identical values)
181    /// and encoding them as (value, length) pairs. It dynamically determines whether this chunk
182    /// should be the last chunk based on how many values were processed.
183    ///
184    /// # Key Features:
185    /// - Tracks byte usage to ensure we don't exceed MAX_MINIBLOCK_BYTES
186    /// - Maintains power-of-2 checkpoints for non-last chunks
187    /// - Splits long runs (>255) into multiple entries
188    /// - Dynamically determines if this is the last chunk
189    ///
190    /// # Returns:
191    /// - num_runs: Number of runs encoded
192    /// - values_processed: Number of input values processed
193    /// - is_last_chunk: Whether this chunk processed all remaining values
194    fn encode_chunk_rolling<T>(
195        &self,
196        data: &LanceBuffer,
197        offset: usize,
198        values_remaining: usize,
199        all_values: &mut Vec<u8>,
200        all_lengths: &mut Vec<u8>,
201    ) -> (usize, usize, bool)
202    where
203        T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType,
204    {
205        let type_size = std::mem::size_of::<T>();
206
207        let chunk_start = offset * type_size;
208        let max_by_count = MAX_MINIBLOCK_VALUES as usize;
209        let max_values = values_remaining.min(max_by_count);
210        let chunk_end = chunk_start + max_values * type_size;
211
212        if chunk_start >= data.len() {
213            return (0, 0, false);
214        }
215
216        let chunk_len = chunk_end.min(data.len()) - chunk_start;
217        let chunk_buffer = data.slice_with_length(chunk_start, chunk_len);
218        let typed_data_ref = chunk_buffer.borrow_to_typed_slice::<T>();
219        let typed_data: &[T] = typed_data_ref.as_ref();
220
221        if typed_data.is_empty() {
222            return (0, 0, false);
223        }
224
225        // Record starting positions for this chunk
226        let values_start = all_values.len();
227
228        let mut current_value = typed_data[0];
229        let mut current_length = 1u64;
230        let mut bytes_used = 0usize;
231        let mut total_values_encoded = 0usize; // Track total encoded values
232
233        // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes.
234        //
235        // We start from a slightly larger minimum checkpoint for smaller types since
236        // they encode more compactly and are less likely to hit MAX_MINIBLOCK_BYTES.
237        let min_checkpoint_log2 = match type_size {
238            1 => 8, // 256
239            2 => 7, // 128
240            _ => 6, // 64
241        };
242        let max_checkpoint_log2 = (values_remaining.min(MAX_MINIBLOCK_VALUES as usize))
243            .next_power_of_two()
244            .ilog2();
245        let mut checkpoint_log2 = min_checkpoint_log2;
246
247        // Save state at checkpoints so we can roll back if needed
248        let mut last_checkpoint_state = None;
249
250        for &value in typed_data[1..].iter() {
251            if value == current_value {
252                current_length += 1;
253            } else {
254                // Calculate space needed (may need multiple u8s if run > 255)
255                let run_chunks = current_length.div_ceil(255) as usize;
256                let bytes_needed = run_chunks * (type_size + 1);
257
258                // Stop if adding this run would exceed byte limit
259                if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
260                    if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
261                        // Roll back to last power-of-2 checkpoint
262                        all_values.truncate(val_pos);
263                        all_lengths.truncate(len_pos);
264                        let num_runs = (val_pos - values_start) / type_size;
265                        return (num_runs, checkpoint_values, false);
266                    }
267                    break;
268                }
269
270                bytes_used += self.add_run(&current_value, current_length, all_values, all_lengths);
271                total_values_encoded += current_length as usize;
272                current_value = value;
273                current_length = 1;
274            }
275
276            // Check if we reached a power-of-2 checkpoint.
277            while checkpoint_log2 <= max_checkpoint_log2 {
278                let checkpoint_values = 1usize << checkpoint_log2;
279                if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values
280                {
281                    break;
282                }
283                last_checkpoint_state = Some((
284                    all_values.len(),
285                    all_lengths.len(),
286                    bytes_used,
287                    checkpoint_values,
288                ));
289                checkpoint_log2 += 1;
290            }
291        }
292
293        // After the loop, we always have a pending run that needs to be added
294        // unless we've exceeded the byte limit
295        if current_length > 0 {
296            let run_chunks = current_length.div_ceil(255) as usize;
297            let bytes_needed = run_chunks * (type_size + 1);
298
299            if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
300                let _ = self.add_run(&current_value, current_length, all_values, all_lengths);
301                total_values_encoded += current_length as usize;
302            }
303        }
304
305        // Determine if we've processed all remaining values
306        let is_last_chunk = total_values_encoded == values_remaining;
307
308        // Non-last chunks must have power-of-2 values for miniblock format
309        if !is_last_chunk {
310            if total_values_encoded.is_power_of_two() {
311                // Already at power-of-2 boundary
312            } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
313                // Roll back to last valid checkpoint
314                all_values.truncate(val_pos);
315                all_lengths.truncate(len_pos);
316                let num_runs = (val_pos - values_start) / type_size;
317                return (num_runs, checkpoint_values, false);
318            } else {
319                // No valid checkpoint, can't create a valid chunk
320                return (0, 0, false);
321            }
322        }
323
324        let num_runs = (all_values.len() - values_start) / type_size;
325        (num_runs, total_values_encoded, is_last_chunk)
326    }
327
328    fn add_run<T>(
329        &self,
330        value: &T,
331        length: u64,
332        all_values: &mut Vec<u8>,
333        all_lengths: &mut Vec<u8>,
334    ) -> usize
335    where
336        T: bytemuck::Pod,
337    {
338        let value_bytes = bytemuck::bytes_of(value);
339        let type_size = std::mem::size_of::<T>();
340        let num_full_chunks = (length / 255) as usize;
341        let remainder = (length % 255) as u8;
342
343        let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
344        all_values.reserve(total_chunks * type_size);
345        all_lengths.reserve(total_chunks);
346
347        for _ in 0..num_full_chunks {
348            all_values.extend_from_slice(value_bytes);
349            all_lengths.push(255);
350        }
351
352        if remainder > 0 {
353            all_values.extend_from_slice(value_bytes);
354            all_lengths.push(remainder);
355        }
356
357        total_chunks * (type_size + 1)
358    }
359}
360
361impl MiniBlockCompressor for RleEncoder {
362    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
363        match data {
364            DataBlock::FixedWidth(fixed_width) => {
365                let num_values = fixed_width.num_values;
366                let bits_per_value = fixed_width.bits_per_value;
367
368                let (all_buffers, chunks) =
369                    self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
370
371                let compressed = MiniBlockCompressed {
372                    data: all_buffers,
373                    chunks,
374                    num_values,
375                };
376
377                let encoding = ProtobufUtils21::rle(
378                    ProtobufUtils21::flat(bits_per_value, None),
379                    ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
380                );
381
382                Ok((compressed, encoding))
383            }
384            _ => Err(Error::invalid_input_source(
385                "RLE encoding only supports FixedWidth data blocks".into(),
386            )),
387        }
388    }
389}
390
391impl BlockCompressor for RleEncoder {
392    // Block format: [8-byte header: values buffer size][values buffer][run_lengths buffer]
393    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
394        match data {
395            DataBlock::FixedWidth(fixed_width) => {
396                let num_values = fixed_width.num_values;
397                let bits_per_value = fixed_width.bits_per_value;
398
399                let (all_buffers, _) =
400                    self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
401
402                let values_size = all_buffers[0].len() as u64;
403
404                let mut combined = Vec::new();
405                combined.extend_from_slice(&values_size.to_le_bytes());
406                combined.extend_from_slice(&all_buffers[0]);
407                combined.extend_from_slice(&all_buffers[1]);
408                Ok(LanceBuffer::from(combined))
409            }
410            _ => Err(Error::invalid_input_source(
411                "RLE encoding only supports FixedWidth data blocks".into(),
412            )),
413        }
414    }
415}
416
417/// RLE decompressor for miniblock format
418#[derive(Debug)]
419pub struct RleDecompressor {
420    bits_per_value: u64,
421}
422
423impl RleDecompressor {
424    pub fn new(bits_per_value: u64) -> Self {
425        Self { bits_per_value }
426    }
427
428    fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
429        if num_values == 0 {
430            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
431                bits_per_value: self.bits_per_value,
432                data: LanceBuffer::from(vec![]),
433                num_values: 0,
434                block_info: BlockInfo::default(),
435            }));
436        }
437
438        if data.len() != 2 {
439            return Err(Error::invalid_input_source(
440                format!(
441                    "RLE decompressor expects exactly 2 buffers, got {}",
442                    data.len()
443                )
444                .into(),
445            ));
446        }
447
448        let values_buffer = &data[0];
449        let lengths_buffer = &data[1];
450
451        let decoded_data = match self.bits_per_value {
452            8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
453            16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
454            32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
455            64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
456            _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
457        };
458
459        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
460            bits_per_value: self.bits_per_value,
461            data: decoded_data,
462            num_values,
463            block_info: BlockInfo::default(),
464        }))
465    }
466
467    fn decode_generic<T>(
468        &self,
469        values_buffer: &LanceBuffer,
470        lengths_buffer: &LanceBuffer,
471        num_values: u64,
472    ) -> Result<LanceBuffer>
473    where
474        T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
475    {
476        let type_size = std::mem::size_of::<T>();
477
478        if values_buffer.is_empty() || lengths_buffer.is_empty() {
479            if num_values == 0 {
480                return Ok(LanceBuffer::empty());
481            } else {
482                return Err(Error::invalid_input_source(
483                    format!("Empty buffers but expected {} values", num_values).into(),
484                ));
485            }
486        }
487
488        if !values_buffer.len().is_multiple_of(type_size) || lengths_buffer.is_empty() {
489            return Err(Error::invalid_input_source(format!(
490                "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
491                std::any::type_name::<T>(),
492                values_buffer.len(),
493                type_size,
494                lengths_buffer.len()
495            )
496            .into()));
497        }
498
499        let num_runs = values_buffer.len() / type_size;
500        let num_length_entries = lengths_buffer.len();
501        if num_runs != num_length_entries {
502            return Err(Error::invalid_input_source(
503                format!(
504                    "Inconsistent RLE buffers: {} runs but {} length entries",
505                    num_runs, num_length_entries
506                )
507                .into(),
508            ));
509        }
510
511        let values_ref = values_buffer.borrow_to_typed_slice::<T>();
512        let values: &[T] = values_ref.as_ref();
513        let lengths: &[u8] = lengths_buffer.as_ref();
514
515        let expected_value_count = num_values as usize;
516        let mut decoded: Vec<T> = Vec::with_capacity(expected_value_count);
517
518        for (value, &length) in values.iter().zip(lengths.iter()) {
519            if decoded.len() == expected_value_count {
520                break;
521            }
522
523            if length == 0 {
524                return Err(Error::invalid_input_source(
525                    "RLE decoding encountered a zero run length".into(),
526                ));
527            }
528
529            let remaining = expected_value_count - decoded.len();
530            let write_len = (length as usize).min(remaining);
531
532            decoded.resize(decoded.len() + write_len, *value);
533        }
534
535        if decoded.len() != expected_value_count {
536            return Err(Error::invalid_input_source(
537                format!(
538                    "RLE decoding produced {} values, expected {}",
539                    decoded.len(),
540                    expected_value_count
541                )
542                .into(),
543            ));
544        }
545
546        trace!(
547            "RLE decoded {} {} values",
548            num_values,
549            std::any::type_name::<T>()
550        );
551        Ok(LanceBuffer::reinterpret_vec(decoded))
552    }
553}
554
555impl MiniBlockDecompressor for RleDecompressor {
556    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
557        self.decode_data(data, num_values)
558    }
559}
560
561impl BlockDecompressor for RleDecompressor {
562    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
563        // fetch the values_size
564        if data.len() < 8 {
565            return Err(Error::invalid_input_source(
566                format!("Insufficient data size: {}", data.len()).into(),
567            ));
568        }
569
570        let values_size_bytes: [u8; 8] =
571            data[..8].try_into().expect("slice length already checked");
572        let values_size: u64 = u64::from_le_bytes(values_size_bytes);
573
574        // parse values
575        let values_start: usize = 8;
576        let values_size: usize = values_size.try_into().map_err(|_| {
577            Error::invalid_input_source(
578                format!("Invalid values buffer size: {}", values_size).into(),
579            )
580        })?;
581        let lengths_start = values_start
582            .checked_add(values_size)
583            .ok_or_else(|| Error::invalid_input_source("Invalid RLE values buffer size".into()))?;
584
585        if data.len() < lengths_start {
586            return Err(Error::invalid_input_source(
587                format!("Insufficient data size: {}", data.len()).into(),
588            ));
589        }
590
591        let values_buffer = data.slice_with_length(values_start, values_size);
592        let lengths_buffer = data.slice_with_length(lengths_start, data.len() - lengths_start);
593
594        self.decode_data(vec![values_buffer, lengths_buffer], num_values)
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use crate::data::DataBlock;
602    use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES;
603    use crate::{buffer::LanceBuffer, compression::BlockDecompressor};
604    use arrow_array::Int32Array;
605    // ========== Core Functionality Tests ==========
606
607    #[test]
608    fn test_basic_miniblock_rle_encoding() {
609        let encoder = RleEncoder::new();
610
611        // Test basic RLE pattern: [1, 1, 1, 2, 2, 3, 3, 3, 3]
612        let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
613        let data_block = DataBlock::from_array(array);
614
615        let (compressed, _) = MiniBlockCompressor::compress(&encoder, data_block).unwrap();
616
617        assert_eq!(compressed.num_values, 9);
618        assert_eq!(compressed.chunks.len(), 1);
619
620        // Verify compression happened (3 runs instead of 9 values)
621        let values_buffer = &compressed.data[0];
622        let lengths_buffer = &compressed.data[1];
623        assert_eq!(values_buffer.len(), 12); // 3 i32 values
624        assert_eq!(lengths_buffer.len(), 3); // 3 u8 lengths
625    }
626
627    #[test]
628    fn test_long_run_splitting() {
629        let encoder = RleEncoder::new();
630
631        // Create a run longer than 255 to test splitting
632        let mut data = vec![42i32; 1000]; // Will be split into 255+255+255+235
633        data.extend(&[100i32; 300]); // Will be split into 255+45
634
635        let array = Int32Array::from(data);
636        let (compressed, _) =
637            MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
638
639        // Should have 6 runs total (4 for first value, 2 for second)
640        let lengths_buffer = &compressed.data[1];
641        assert_eq!(lengths_buffer.len(), 6);
642    }
643
644    // ========== Round-trip Tests for Different Types ==========
645
646    #[test]
647    fn test_round_trip_all_types() {
648        // Test u8
649        test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
650
651        // Test u16
652        test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
653
654        // Test i32
655        test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
656
657        // Test u64
658        test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
659    }
660
661    fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
662    where
663        T: bytemuck::Pod + PartialEq + std::fmt::Debug,
664    {
665        let encoder = RleEncoder::new();
666        let bytes: Vec<u8> = data
667            .iter()
668            .flat_map(|v| bytemuck::bytes_of(v))
669            .copied()
670            .collect();
671
672        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
673            bits_per_value,
674            data: LanceBuffer::from(bytes),
675            num_values: data.len() as u64,
676            block_info: BlockInfo::default(),
677        });
678
679        let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap();
680        let decompressor = RleDecompressor::new(bits_per_value);
681        let decompressed = MiniBlockDecompressor::decompress(
682            &decompressor,
683            compressed.data,
684            compressed.num_values,
685        )
686        .unwrap();
687
688        match decompressed {
689            DataBlock::FixedWidth(ref block) => {
690                // Verify the decompressed data length matches expected
691                assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
692            }
693            _ => panic!("Expected FixedWidth block"),
694        }
695    }
696
697    // ========== Chunk Boundary Tests ==========
698
699    #[test]
700    fn test_power_of_two_chunking() {
701        let encoder = RleEncoder::new();
702
703        // Create data that will require multiple chunks
704        let test_sizes = vec![1000, 2500, 5000, 10000];
705
706        for size in test_sizes {
707            let data: Vec<i32> = (0..size)
708                .map(|i| i / 50) // Create runs of 50
709                .collect();
710
711            let array = Int32Array::from(data);
712            let (compressed, _) =
713                MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
714
715            // Verify all non-last chunks have power-of-2 values
716            for (i, chunk) in compressed.chunks.iter().enumerate() {
717                if i < compressed.chunks.len() - 1 {
718                    assert!(chunk.log_num_values > 0);
719                    let chunk_values = 1u64 << chunk.log_num_values;
720                    assert!(chunk_values.is_power_of_two());
721                    assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
722                } else {
723                    assert_eq!(chunk.log_num_values, 0);
724                }
725            }
726        }
727    }
728
729    // ========== Error Handling Tests ==========
730
731    #[test]
732    fn test_invalid_buffer_count() {
733        let decompressor = RleDecompressor::new(32);
734        let result = MiniBlockDecompressor::decompress(
735            &decompressor,
736            vec![LanceBuffer::from(vec![1, 2, 3, 4])],
737            10,
738        );
739        assert!(result.is_err());
740        assert!(
741            result
742                .unwrap_err()
743                .to_string()
744                .contains("expects exactly 2 buffers")
745        );
746    }
747
748    #[test]
749    fn test_buffer_consistency() {
750        let decompressor = RleDecompressor::new(32);
751        let values = LanceBuffer::from(vec![1, 0, 0, 0]); // 1 i32 value
752        let lengths = LanceBuffer::from(vec![5, 10]); // 2 lengths - mismatch!
753        let result = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 15);
754        assert!(result.is_err());
755        assert!(
756            result
757                .unwrap_err()
758                .to_string()
759                .contains("Inconsistent RLE buffers")
760        );
761    }
762
763    #[test]
764    fn test_empty_data_handling() {
765        let encoder = RleEncoder::new();
766
767        // Test empty block
768        let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
769            bits_per_value: 32,
770            data: LanceBuffer::from(vec![]),
771            num_values: 0,
772            block_info: BlockInfo::default(),
773        });
774
775        let (compressed, _) = MiniBlockCompressor::compress(&encoder, empty_block).unwrap();
776        assert_eq!(compressed.num_values, 0);
777        assert!(compressed.data.is_empty());
778
779        // Test decompression of empty data
780        let decompressor = RleDecompressor::new(32);
781        let decompressed = MiniBlockDecompressor::decompress(&decompressor, vec![], 0).unwrap();
782
783        match decompressed {
784            DataBlock::FixedWidth(ref block) => {
785                assert_eq!(block.num_values, 0);
786                assert_eq!(block.data.len(), 0);
787            }
788            _ => panic!("Expected FixedWidth block"),
789        }
790    }
791
792    // ========== Integration Test ==========
793
794    #[test]
795    fn test_multi_chunk_round_trip() {
796        let encoder = RleEncoder::new();
797
798        // Create data that spans multiple chunks with mixed patterns
799        let mut data = Vec::new();
800
801        // High compression section
802        data.extend(vec![999i32; 2000]);
803        // Low compression section
804        data.extend(0..1000);
805        // Another high compression section
806        data.extend(vec![777i32; 2000]);
807
808        let array = Int32Array::from(data.clone());
809        let (compressed, _) =
810            MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
811
812        // Manually decompress all chunks
813        let mut reconstructed = Vec::new();
814        let mut values_offset = 0usize;
815        let mut lengths_offset = 0usize;
816        let mut values_processed = 0u64;
817
818        // We now have exactly 2 global buffers
819        assert_eq!(compressed.data.len(), 2);
820        let global_values = &compressed.data[0];
821        let global_lengths = &compressed.data[1];
822
823        for chunk in &compressed.chunks {
824            let chunk_values = if chunk.log_num_values > 0 {
825                1u64 << chunk.log_num_values
826            } else {
827                compressed.num_values - values_processed
828            };
829
830            // Extract chunk buffers from global buffers using buffer_sizes
831            let values_size = chunk.buffer_sizes[0] as usize;
832            let lengths_size = chunk.buffer_sizes[1] as usize;
833
834            let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
835            let chunk_lengths_buffer =
836                global_lengths.slice_with_length(lengths_offset, lengths_size);
837
838            let decompressor = RleDecompressor::new(32);
839            let chunk_data = MiniBlockDecompressor::decompress(
840                &decompressor,
841                vec![chunk_values_buffer, chunk_lengths_buffer],
842                chunk_values,
843            )
844            .unwrap();
845
846            values_offset += values_size;
847            lengths_offset += lengths_size;
848            values_processed += chunk_values;
849
850            match chunk_data {
851                DataBlock::FixedWidth(ref block) => {
852                    let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
853                    reconstructed.extend_from_slice(values);
854                }
855                _ => panic!("Expected FixedWidth block"),
856            }
857        }
858
859        assert_eq!(reconstructed, data);
860    }
861
862    #[test]
863    fn test_1024_boundary_conditions() {
864        // Comprehensive test for various boundary conditions at 1024 values
865        // This consolidates multiple bug tests that were previously separate
866        let encoder = RleEncoder::new();
867        let decompressor = RleDecompressor::new(32);
868
869        let test_cases = [
870            ("runs_of_2", {
871                let mut data = Vec::new();
872                for i in 0..512 {
873                    data.push(i);
874                    data.push(i);
875                }
876                data
877            }),
878            ("single_run_1024", vec![42i32; 1024]),
879            ("alternating_values", {
880                let mut data = Vec::new();
881                for i in 0..1024 {
882                    data.push(i % 2);
883                }
884                data
885            }),
886            ("run_boundary_255s", {
887                let mut data = Vec::new();
888                data.extend(vec![1i32; 255]);
889                data.extend(vec![2i32; 255]);
890                data.extend(vec![3i32; 255]);
891                data.extend(vec![4i32; 255]);
892                data.extend(vec![5i32; 4]);
893                data
894            }),
895            ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
896            ("unique_plus_duplicate", {
897                // 1023 unique values followed by one duplicate (regression test)
898                let mut data = Vec::new();
899                for i in 0..1023 {
900                    data.push(i);
901                }
902                data.push(1022i32); // Last value same as second-to-last
903                data
904            }),
905            ("bug_4092_pattern", {
906                // Test exact scenario that produces 4092 bytes instead of 4096
907                let mut data = Vec::new();
908                for i in 0..1022 {
909                    data.push(i);
910                }
911                data.push(999999i32);
912                data.push(999999i32);
913                data
914            }),
915        ];
916
917        for (test_name, data) in test_cases.iter() {
918            assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
919
920            // Compress the data
921            let array = Int32Array::from(data.clone());
922            let (compressed, _) =
923                MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
924
925            // Decompress and verify
926            match MiniBlockDecompressor::decompress(
927                &decompressor,
928                compressed.data,
929                compressed.num_values,
930            ) {
931                Ok(decompressed) => match decompressed {
932                    DataBlock::FixedWidth(ref block) => {
933                        let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
934                        assert_eq!(
935                            values.len(),
936                            1024,
937                            "Test case {} got {} values, expected 1024",
938                            test_name,
939                            values.len()
940                        );
941                        assert_eq!(
942                            block.data.len(),
943                            4096,
944                            "Test case {} got {} bytes, expected 4096",
945                            test_name,
946                            block.data.len()
947                        );
948                        assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
949                    }
950                    _ => panic!("Test case {} expected FixedWidth block", test_name),
951                },
952                Err(e) => {
953                    if e.to_string().contains("4092") {
954                        panic!("Test case {} found bug 4092: {}", test_name, e);
955                    }
956                    panic!("Test case {} failed with error: {}", test_name, e);
957                }
958            }
959        }
960    }
961
962    #[test]
963    fn test_low_repetition_50pct_bug() {
964        // Test case that reproduces the 4092 bytes bug with low repetition (50%)
965        // This simulates the 1M benchmark case
966        let encoder = RleEncoder::new();
967
968        // Create 1M values with low repetition (50% chance of change)
969        let num_values = 1_048_576; // 1M values
970        let mut data = Vec::with_capacity(num_values);
971        let mut value = 0i32;
972        let mut rng = 12345u64; // Simple deterministic RNG
973
974        for _ in 0..num_values {
975            data.push(value);
976            // Simple LCG for deterministic randomness
977            rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
978            // 50% chance to increment value
979            if (rng >> 16) & 1 == 1 {
980                value += 1;
981            }
982        }
983
984        let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
985
986        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
987            bits_per_value: 32,
988            data: LanceBuffer::from(bytes),
989            num_values: num_values as u64,
990            block_info: BlockInfo::default(),
991        });
992
993        let (compressed, _) = MiniBlockCompressor::compress(&encoder, block).unwrap();
994
995        // Debug first few chunks
996        for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
997            let _chunk_values = if chunk.log_num_values > 0 {
998                1 << chunk.log_num_values
999            } else {
1000                // Last chunk - calculate remaining
1001                let prev_total: usize = compressed.chunks[..i]
1002                    .iter()
1003                    .map(|c| 1usize << c.log_num_values)
1004                    .sum();
1005                num_values - prev_total
1006            };
1007        }
1008
1009        // Try to decompress
1010        let decompressor = RleDecompressor::new(32);
1011        match MiniBlockDecompressor::decompress(
1012            &decompressor,
1013            compressed.data,
1014            compressed.num_values,
1015        ) {
1016            Ok(decompressed) => match decompressed {
1017                DataBlock::FixedWidth(ref block) => {
1018                    assert_eq!(
1019                        block.data.len(),
1020                        num_values * 4,
1021                        "Expected {} bytes but got {}",
1022                        num_values * 4,
1023                        block.data.len()
1024                    );
1025                }
1026                _ => panic!("Expected FixedWidth block"),
1027            },
1028            Err(e) => {
1029                if e.to_string().contains("4092") {
1030                    panic!("Bug reproduced! {}", e);
1031                } else {
1032                    panic!("Unexpected error: {}", e);
1033                }
1034            }
1035        }
1036    }
1037
1038    // ========== Encoding Verification Tests ==========
1039
1040    #[test_log::test(tokio::test)]
1041    async fn test_rle_encoding_verification() {
1042        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
1043        use crate::version::LanceFileVersion;
1044        use arrow_array::{Array, Int32Array};
1045        use lance_datagen::{ArrayGenerator, RowCount};
1046        use std::collections::HashMap;
1047        use std::sync::Arc;
1048
1049        let test_cases = TestCases::default()
1050            .with_expected_encoding("rle")
1051            .with_min_file_version(LanceFileVersion::V2_1);
1052
1053        // Test both explicit metadata and automatic selection
1054        // 1. Test with explicit RLE threshold metadata (also disable BSS)
1055        let mut metadata_explicit = HashMap::new();
1056        metadata_explicit.insert(
1057            "lance-encoding:rle-threshold".to_string(),
1058            "0.8".to_string(),
1059        );
1060        metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
1061
1062        let mut generator = RleDataGenerator::new(vec![
1063            i32::MIN,
1064            i32::MIN,
1065            i32::MIN,
1066            i32::MIN,
1067            i32::MIN + 1,
1068            i32::MIN + 1,
1069            i32::MIN + 1,
1070            i32::MIN + 1,
1071            i32::MIN + 2,
1072            i32::MIN + 2,
1073            i32::MIN + 2,
1074            i32::MIN + 2,
1075        ]);
1076        let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
1077        check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
1078            .await;
1079
1080        // 2. Test automatic RLE selection based on data characteristics
1081        // 80% repetition should trigger RLE (> default 50% threshold).
1082        //
1083        // Use values with the high bit set so bitpacking can't shrink the values.
1084        // Explicitly disable BSS to ensure RLE is tested
1085        let mut metadata = HashMap::new();
1086        metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
1087
1088        let mut values = vec![i32::MIN; 8000]; // 80% repetition
1089        values.extend(
1090            [
1091                i32::MIN + 1,
1092                i32::MIN + 2,
1093                i32::MIN + 3,
1094                i32::MIN + 4,
1095                i32::MIN + 5,
1096            ]
1097            .repeat(400),
1098        ); // 20% variety
1099        let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
1100        check_round_trip_encoding_of_data(vec![arr], &test_cases, metadata).await;
1101    }
1102
1103    /// Generator that produces repetitive patterns suitable for RLE
1104    #[derive(Debug)]
1105    struct RleDataGenerator {
1106        pattern: Vec<i32>,
1107        idx: usize,
1108    }
1109
1110    impl RleDataGenerator {
1111        fn new(pattern: Vec<i32>) -> Self {
1112            Self { pattern, idx: 0 }
1113        }
1114    }
1115
1116    impl lance_datagen::ArrayGenerator for RleDataGenerator {
1117        fn generate(
1118            &mut self,
1119            _length: lance_datagen::RowCount,
1120            _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1121        ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1122        {
1123            use arrow_array::Int32Array;
1124            use std::sync::Arc;
1125
1126            // Generate enough repetitive data to trigger RLE
1127            let mut values = Vec::new();
1128            for _ in 0..10000 {
1129                values.push(self.pattern[self.idx]);
1130                self.idx = (self.idx + 1) % self.pattern.len();
1131            }
1132            Ok(Arc::new(Int32Array::from(values)))
1133        }
1134
1135        fn data_type(&self) -> &arrow_schema::DataType {
1136            &arrow_schema::DataType::Int32
1137        }
1138
1139        fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1140            Some(lance_datagen::ByteCount::from(4))
1141        }
1142    }
1143
1144    // ========== Block Related tests ==========
1145    #[test]
1146    fn test_block_decompressor_rejects_overflowing_values_size() {
1147        let decompressor = RleDecompressor::new(32);
1148
1149        let mut data = Vec::new();
1150        data.extend_from_slice(&u64::MAX.to_le_bytes());
1151        let result = BlockDecompressor::decompress(&decompressor, LanceBuffer::from(data), 1);
1152        assert!(result.is_err());
1153        assert!(
1154            result
1155                .unwrap_err()
1156                .to_string()
1157                .contains("Invalid RLE values buffer size")
1158        );
1159    }
1160
1161    #[test]
1162    fn test_block_decompressor_too_small() {
1163        let decompressor = RleDecompressor::new(32);
1164        let result =
1165            BlockDecompressor::decompress(&decompressor, LanceBuffer::from(vec![1, 2, 3]), 10);
1166        assert!(result.is_err());
1167        assert!(
1168            result
1169                .unwrap_err()
1170                .to_string()
1171                .contains("Insufficient data size: 3")
1172        );
1173    }
1174
1175    #[test]
1176    fn test_block_compressor_header_format() {
1177        let encoder = RleEncoder::new();
1178
1179        let data = vec![1i32, 1, 1];
1180        let array = Int32Array::from(data);
1181        let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
1182
1183        // Verify header format: first 8 bytes should be values_size as u64
1184        assert!(compressed.len() >= 8);
1185        let values_size_bytes: [u8; 8] = compressed.as_ref()[..8].try_into().unwrap();
1186        let values_size = u64::from_le_bytes(values_size_bytes);
1187
1188        // Values buffer should contain 1 i32 value (4 bytes)
1189        assert_eq!(values_size, 4);
1190
1191        // Total size should be: 8 (header) + 4 (values) + 1 (lengths)
1192        assert_eq!(compressed.len(), 13);
1193    }
1194
1195    #[test]
1196    fn test_block_compressor_round_trip() {
1197        let encoder = RleEncoder::new();
1198        let decompressor = RleDecompressor::new(32);
1199
1200        // Test basic pattern
1201        let data = vec![1i32, 1, 1, 2, 2, 3, 3, 3, 3];
1202        let array = Int32Array::from(data.clone());
1203        let data_block = DataBlock::from_array(array);
1204
1205        let compressed = BlockCompressor::compress(&encoder, data_block).unwrap();
1206        let decompressed =
1207            BlockDecompressor::decompress(&decompressor, compressed, data.len() as u64).unwrap();
1208
1209        match decompressed {
1210            DataBlock::FixedWidth(block) => {
1211                let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
1212                assert_eq!(values, &data[..]);
1213            }
1214            _ => panic!("Expected FixedWidth block"),
1215        }
1216    }
1217
1218    #[test]
1219    fn test_block_compressor_large_data() {
1220        let encoder = RleEncoder::new();
1221        let decompressor = RleDecompressor::new(32);
1222
1223        // Create data that will span multiple chunks
1224        // Each chunks can handle ~2048 values, so use 10K values
1225        let mut data = Vec::new();
1226        data.extend(vec![999i32; 3000]); // First ~2 chunks
1227        data.extend(vec![777i32; 3000]); // Next ~2 chunks
1228        data.extend(vec![555i32; 4000]); // Final ~2 chunks
1229
1230        let total_values = data.len();
1231        assert_eq!(total_values, 10000);
1232
1233        let array = Int32Array::from(data.clone());
1234        let compressed = BlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap();
1235        let decompressed =
1236            BlockDecompressor::decompress(&decompressor, compressed, total_values as u64).unwrap();
1237
1238        match decompressed {
1239            DataBlock::FixedWidth(block) => {
1240                let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
1241                assert_eq!(values.len(), total_values);
1242                assert_eq!(values, &data[..]);
1243            }
1244            _ => panic!("Expected FixedWidth block"),
1245        }
1246    }
1247}