lance_encoding/encodings/physical/
rle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # RLE (Run-Length Encoding) Miniblock Format
5//!
6//! RLE compression for Lance miniblock format, optimized for data with repeated values.
7//!
8//! ## Encoding Format
9//!
10//! RLE uses a dual-buffer format to store compressed data:
11//!
12//! - **Values Buffer**: Stores unique values in their original data type
13//! - **Lengths Buffer**: Stores the repeat count for each value as u8
14//!
15//! ### Example
16//!
17//! Input data: `[1, 1, 1, 2, 2, 3, 3, 3, 3]`
18//!
19//! Encoded as:
20//! - Values buffer: `[1, 2, 3]` (3 × 4 bytes for i32)
21//! - Lengths buffer: `[3, 2, 4]` (3 × 1 byte for u8)
22//!
23//! ### Long Run Handling
24//!
25//! When a run exceeds 255 values, it is split into multiple runs of 255
26//! followed by a final run with the remainder. For example, a run of 1000
27//! identical values becomes 4 runs: [255, 255, 255, 235].
28//!
29//! ## Supported Types
30//!
31//! RLE supports all fixed-width primitive types:
32//! - 8-bit: u8, i8
33//! - 16-bit: u16, i16
34//! - 32-bit: u32, i32, f32
35//! - 64-bit: u64, i64, f64
36//!
37//! ## Compression Strategy
38//!
39//! RLE is automatically selected when:
40//! - The run count (number of value transitions) < 50% of total values
41//! - This indicates sufficient repetition for RLE to be effective
42//!
43//! ## Chunk Handling
44//!
45//! - Maximum chunk size: 4096 values (miniblock constraint)
46//! - All chunks share two global buffers (values and lengths)
47//! - Each chunk's buffer_sizes indicate its portion of the global buffers
48//! - Non-last chunks always contain power-of-2 values
49//! - Byte limits are enforced dynamically during encoding
50
51use arrow_buffer::ArrowNativeType;
52use log::trace;
53use snafu::location;
54
55use crate::buffer::LanceBuffer;
56use crate::compression::MiniBlockDecompressor;
57use crate::data::DataBlock;
58use crate::data::{BlockInfo, FixedWidthDataBlock};
59use crate::encodings::logical::primitive::miniblock::{
60    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
61};
62use crate::format::pb21::CompressiveEncoding;
63use crate::format::ProtobufUtils21;
64
65use lance_core::{Error, Result};
66
67/// RLE encoder for miniblock format
68#[derive(Debug, Default)]
69pub struct RleMiniBlockEncoder;
70
71impl RleMiniBlockEncoder {
72    pub fn new() -> Self {
73        Self
74    }
75
76    fn encode_data(
77        &self,
78        data: &LanceBuffer,
79        num_values: u64,
80        bits_per_value: u64,
81    ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
82        if num_values == 0 {
83            return Ok((Vec::new(), Vec::new()));
84        }
85
86        let bytes_per_value = (bits_per_value / 8) as usize;
87
88        // Pre-allocate global buffers with estimated capacity
89        // Assume average compression ratio of ~10:1 (10 values per run)
90        let estimated_runs = num_values as usize / 10;
91        let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
92        let mut all_lengths = Vec::with_capacity(estimated_runs);
93        let mut chunks = Vec::new();
94
95        let mut offset = 0usize;
96        let mut values_remaining = num_values as usize;
97
98        while values_remaining > 0 {
99            let values_start = all_values.len();
100            let lengths_start = all_lengths.len();
101
102            let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
103                8 => self.encode_chunk_rolling::<u8>(
104                    data,
105                    offset,
106                    values_remaining,
107                    &mut all_values,
108                    &mut all_lengths,
109                ),
110                16 => self.encode_chunk_rolling::<u16>(
111                    data,
112                    offset,
113                    values_remaining,
114                    &mut all_values,
115                    &mut all_lengths,
116                ),
117                32 => self.encode_chunk_rolling::<u32>(
118                    data,
119                    offset,
120                    values_remaining,
121                    &mut all_values,
122                    &mut all_lengths,
123                ),
124                64 => self.encode_chunk_rolling::<u64>(
125                    data,
126                    offset,
127                    values_remaining,
128                    &mut all_values,
129                    &mut all_lengths,
130                ),
131                _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
132            };
133
134            if values_processed == 0 {
135                break;
136            }
137
138            let log_num_values = if is_last_chunk {
139                0
140            } else {
141                assert!(
142                    values_processed.is_power_of_two(),
143                    "Non-last chunk must have power-of-2 values"
144                );
145                values_processed.ilog2() as u8
146            };
147
148            let values_size = all_values.len() - values_start;
149            let lengths_size = all_lengths.len() - lengths_start;
150
151            let chunk = MiniBlockChunk {
152                buffer_sizes: vec![values_size as u16, lengths_size as u16],
153                log_num_values,
154            };
155
156            chunks.push(chunk);
157
158            offset += values_processed;
159            values_remaining -= values_processed;
160        }
161
162        // Return exactly two buffers: values and lengths
163        Ok((
164            vec![
165                LanceBuffer::from(all_values),
166                LanceBuffer::from(all_lengths),
167            ],
168            chunks,
169        ))
170    }
171
172    /// Encodes a chunk of data using RLE compression with dynamic boundary detection.
173    ///
174    /// This function processes values sequentially, detecting runs (sequences of identical values)
175    /// and encoding them as (value, length) pairs. It dynamically determines whether this chunk
176    /// should be the last chunk based on how many values were processed.
177    ///
178    /// # Key Features:
179    /// - Tracks byte usage to ensure we don't exceed MAX_MINIBLOCK_BYTES
180    /// - Maintains power-of-2 checkpoints for non-last chunks
181    /// - Splits long runs (>255) into multiple entries
182    /// - Dynamically determines if this is the last chunk
183    ///
184    /// # Returns:
185    /// - num_runs: Number of runs encoded
186    /// - values_processed: Number of input values processed
187    /// - is_last_chunk: Whether this chunk processed all remaining values
188    fn encode_chunk_rolling<T>(
189        &self,
190        data: &LanceBuffer,
191        offset: usize,
192        values_remaining: usize,
193        all_values: &mut Vec<u8>,
194        all_lengths: &mut Vec<u8>,
195    ) -> (usize, usize, bool)
196    where
197        T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType,
198    {
199        let type_size = std::mem::size_of::<T>();
200
201        let chunk_start = offset * type_size;
202        // FIXME(xuanwo): we don't allow 4096 values as a workaround for https://github.com/lancedb/lance/issues/4429
203        // Since while rep/def takes 4B, 4Ki values will lead to the
204        // generated chunk buffer too large.MAX_MINIBLOCK_VALUES
205        //
206        // let max_by_count =  as usize;
207        let max_by_count = 2048usize;
208        let max_values = values_remaining.min(max_by_count);
209        let chunk_end = chunk_start + max_values * type_size;
210
211        if chunk_start >= data.len() {
212            return (0, 0, false);
213        }
214
215        let chunk_len = chunk_end.min(data.len()) - chunk_start;
216        let chunk_buffer = data.slice_with_length(chunk_start, chunk_len);
217        let typed_data_ref = chunk_buffer.borrow_to_typed_slice::<T>();
218        let typed_data: &[T] = typed_data_ref.as_ref();
219
220        if typed_data.is_empty() {
221            return (0, 0, false);
222        }
223
224        // Record starting positions for this chunk
225        let values_start = all_values.len();
226
227        let mut current_value = typed_data[0];
228        let mut current_length = 1u64;
229        let mut bytes_used = 0usize;
230        let mut total_values_encoded = 0usize; // Track total encoded values
231
232        // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes
233        // For smaller data types like u8, we can use larger initial checkpoints
234        // since they take less space per value
235        let checkpoints = match type_size {
236            1 => vec![256, 512, 1024, 2048, 4096], // u8 can start from 256
237            2 => vec![128, 256, 512, 1024, 2048, 4096], // u16 can start from 128
238            _ => vec![64, 128, 256, 512, 1024, 2048, 4096], // u32/u64: no difference
239        };
240        let valid_checkpoints: Vec<usize> = checkpoints
241            .into_iter()
242            .filter(|&p| p <= values_remaining)
243            .collect();
244        let mut checkpoint_idx = 0;
245
246        // Save state at checkpoints so we can roll back if needed
247        let mut last_checkpoint_state = None;
248
249        for &value in typed_data[1..].iter() {
250            if value == current_value {
251                current_length += 1;
252            } else {
253                // Calculate space needed (may need multiple u8s if run > 255)
254                let run_chunks = current_length.div_ceil(255) as usize;
255                let bytes_needed = run_chunks * (type_size + 1);
256
257                // Stop if adding this run would exceed byte limit
258                if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
259                    if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
260                        // Roll back to last power-of-2 checkpoint
261                        all_values.truncate(val_pos);
262                        all_lengths.truncate(len_pos);
263                        let num_runs = (val_pos - values_start) / type_size;
264                        return (num_runs, checkpoint_values, false);
265                    }
266                    break;
267                }
268
269                bytes_used += self.add_run(&current_value, current_length, all_values, all_lengths);
270                total_values_encoded += current_length as usize;
271                current_value = value;
272                current_length = 1;
273            }
274
275            // Check if we reached a power-of-2 checkpoint
276            if checkpoint_idx < valid_checkpoints.len()
277                && total_values_encoded >= valid_checkpoints[checkpoint_idx]
278            {
279                last_checkpoint_state = Some((
280                    all_values.len(),
281                    all_lengths.len(),
282                    bytes_used,
283                    valid_checkpoints[checkpoint_idx],
284                ));
285                checkpoint_idx += 1;
286            }
287        }
288
289        // After the loop, we always have a pending run that needs to be added
290        // unless we've exceeded the byte limit
291        if current_length > 0 {
292            let run_chunks = current_length.div_ceil(255) as usize;
293            let bytes_needed = run_chunks * (type_size + 1);
294
295            if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
296                let _ = self.add_run(&current_value, current_length, all_values, all_lengths);
297                total_values_encoded += current_length as usize;
298            }
299        }
300
301        // Determine if we've processed all remaining values
302        let is_last_chunk = total_values_encoded == values_remaining;
303
304        // Non-last chunks must have power-of-2 values for miniblock format
305        if !is_last_chunk {
306            if total_values_encoded.is_power_of_two() {
307                // Already at power-of-2 boundary
308            } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
309                // Roll back to last valid checkpoint
310                all_values.truncate(val_pos);
311                all_lengths.truncate(len_pos);
312                let num_runs = (val_pos - values_start) / type_size;
313                return (num_runs, checkpoint_values, false);
314            } else {
315                // No valid checkpoint, can't create a valid chunk
316                return (0, 0, false);
317            }
318        }
319
320        let num_runs = (all_values.len() - values_start) / type_size;
321        (num_runs, total_values_encoded, is_last_chunk)
322    }
323
324    fn add_run<T>(
325        &self,
326        value: &T,
327        length: u64,
328        all_values: &mut Vec<u8>,
329        all_lengths: &mut Vec<u8>,
330    ) -> usize
331    where
332        T: bytemuck::Pod,
333    {
334        let value_bytes = bytemuck::bytes_of(value);
335        let type_size = std::mem::size_of::<T>();
336        let num_full_chunks = (length / 255) as usize;
337        let remainder = (length % 255) as u8;
338
339        let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
340        all_values.reserve(total_chunks * type_size);
341        all_lengths.reserve(total_chunks);
342
343        for _ in 0..num_full_chunks {
344            all_values.extend_from_slice(value_bytes);
345            all_lengths.push(255);
346        }
347
348        if remainder > 0 {
349            all_values.extend_from_slice(value_bytes);
350            all_lengths.push(remainder);
351        }
352
353        total_chunks * (type_size + 1)
354    }
355}
356
357impl MiniBlockCompressor for RleMiniBlockEncoder {
358    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
359        match data {
360            DataBlock::FixedWidth(fixed_width) => {
361                let num_values = fixed_width.num_values;
362                let bits_per_value = fixed_width.bits_per_value;
363
364                let (all_buffers, chunks) =
365                    self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
366
367                let compressed = MiniBlockCompressed {
368                    data: all_buffers,
369                    chunks,
370                    num_values,
371                };
372
373                let encoding = ProtobufUtils21::rle(
374                    ProtobufUtils21::flat(bits_per_value, None),
375                    ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
376                );
377
378                Ok((compressed, encoding))
379            }
380            _ => Err(Error::InvalidInput {
381                location: location!(),
382                source: "RLE encoding only supports FixedWidth data blocks".into(),
383            }),
384        }
385    }
386}
387
388/// RLE decompressor for miniblock format
389#[derive(Debug)]
390pub struct RleMiniBlockDecompressor {
391    bits_per_value: u64,
392}
393
394impl RleMiniBlockDecompressor {
395    pub fn new(bits_per_value: u64) -> Self {
396        Self { bits_per_value }
397    }
398
399    fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
400        if num_values == 0 {
401            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
402                bits_per_value: self.bits_per_value,
403                data: LanceBuffer::from(vec![]),
404                num_values: 0,
405                block_info: BlockInfo::default(),
406            }));
407        }
408
409        assert_eq!(
410            data.len(),
411            2,
412            "RLE decompressor expects exactly 2 buffers, got {}",
413            data.len()
414        );
415
416        let values_buffer = &data[0];
417        let lengths_buffer = &data[1];
418
419        let decoded_data = match self.bits_per_value {
420            8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
421            16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
422            32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
423            64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
424            _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
425        };
426
427        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
428            bits_per_value: self.bits_per_value,
429            data: LanceBuffer::from(decoded_data),
430            num_values,
431            block_info: BlockInfo::default(),
432        }))
433    }
434
435    fn decode_generic<T>(
436        &self,
437        values_buffer: &LanceBuffer,
438        lengths_buffer: &LanceBuffer,
439        num_values: u64,
440    ) -> Result<Vec<u8>>
441    where
442        T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
443    {
444        let type_size = std::mem::size_of::<T>();
445
446        if values_buffer.is_empty() || lengths_buffer.is_empty() {
447            if num_values == 0 {
448                return Ok(Vec::new());
449            } else {
450                return Err(Error::InvalidInput {
451                    location: location!(),
452                    source: format!("Empty buffers but expected {} values", num_values).into(),
453                });
454            }
455        }
456
457        if values_buffer.len() % type_size != 0 || lengths_buffer.is_empty() {
458            return Err(Error::InvalidInput {
459                location: location!(),
460                source: format!(
461                    "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
462                    std::any::type_name::<T>(),
463                    values_buffer.len(),
464                    type_size,
465                    lengths_buffer.len()
466                )
467                .into(),
468            });
469        }
470
471        let num_runs = values_buffer.len() / type_size;
472        let num_length_entries = lengths_buffer.len();
473        assert_eq!(
474            num_runs, num_length_entries,
475            "Inconsistent RLE buffers: {} runs but {} length entries",
476            num_runs, num_length_entries
477        );
478
479        let values_ref = values_buffer.borrow_to_typed_slice::<T>();
480        let values: &[T] = values_ref.as_ref();
481        let lengths: &[u8] = lengths_buffer.as_ref();
482
483        let expected_byte_count = num_values as usize * type_size;
484        let mut decoded = Vec::with_capacity(expected_byte_count);
485
486        for (value, &length) in values.iter().zip(lengths.iter()) {
487            let run_length = length as usize;
488            let bytes_to_write = run_length * type_size;
489            let bytes_of_value = bytemuck::bytes_of(value);
490
491            if decoded.len() + bytes_to_write > expected_byte_count {
492                let remaining_bytes = expected_byte_count - decoded.len();
493                let remaining_values = remaining_bytes / type_size;
494
495                for _ in 0..remaining_values {
496                    decoded.extend_from_slice(bytes_of_value);
497                }
498                break;
499            }
500
501            for _ in 0..run_length {
502                decoded.extend_from_slice(bytes_of_value);
503            }
504        }
505
506        if decoded.len() != expected_byte_count {
507            return Err(Error::InvalidInput {
508                location: location!(),
509                source: format!(
510                    "RLE decoding produced {} bytes, expected {}",
511                    decoded.len(),
512                    expected_byte_count
513                )
514                .into(),
515            });
516        }
517
518        trace!(
519            "RLE decoded {} {} values",
520            num_values,
521            std::any::type_name::<T>()
522        );
523        Ok(decoded)
524    }
525}
526
527impl MiniBlockDecompressor for RleMiniBlockDecompressor {
528    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
529        self.decode_data(data, num_values)
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use crate::data::DataBlock;
537    use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES;
538    use arrow_array::Int32Array;
539
540    // ========== Core Functionality Tests ==========
541
542    #[test]
543    fn test_basic_rle_encoding() {
544        let encoder = RleMiniBlockEncoder::new();
545
546        // Test basic RLE pattern: [1, 1, 1, 2, 2, 3, 3, 3, 3]
547        let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
548        let data_block = DataBlock::from_array(array);
549
550        let (compressed, _) = encoder.compress(data_block).unwrap();
551
552        assert_eq!(compressed.num_values, 9);
553        assert_eq!(compressed.chunks.len(), 1);
554
555        // Verify compression happened (3 runs instead of 9 values)
556        let values_buffer = &compressed.data[0];
557        let lengths_buffer = &compressed.data[1];
558        assert_eq!(values_buffer.len(), 12); // 3 i32 values
559        assert_eq!(lengths_buffer.len(), 3); // 3 u8 lengths
560    }
561
562    #[test]
563    fn test_long_run_splitting() {
564        let encoder = RleMiniBlockEncoder::new();
565
566        // Create a run longer than 255 to test splitting
567        let mut data = vec![42i32; 1000]; // Will be split into 255+255+255+235
568        data.extend(&[100i32; 300]); // Will be split into 255+45
569
570        let array = Int32Array::from(data);
571        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
572
573        // Should have 6 runs total (4 for first value, 2 for second)
574        let lengths_buffer = &compressed.data[1];
575        assert_eq!(lengths_buffer.len(), 6);
576    }
577
578    // ========== Round-trip Tests for Different Types ==========
579
580    #[test]
581    fn test_round_trip_all_types() {
582        // Test u8
583        test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
584
585        // Test u16
586        test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
587
588        // Test i32
589        test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
590
591        // Test u64
592        test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
593    }
594
595    fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
596    where
597        T: bytemuck::Pod + PartialEq + std::fmt::Debug,
598    {
599        let encoder = RleMiniBlockEncoder::new();
600        let bytes: Vec<u8> = data
601            .iter()
602            .flat_map(|v| bytemuck::bytes_of(v))
603            .copied()
604            .collect();
605
606        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
607            bits_per_value,
608            data: LanceBuffer::from(bytes),
609            num_values: data.len() as u64,
610            block_info: BlockInfo::default(),
611        });
612
613        let (compressed, _) = encoder.compress(block).unwrap();
614        let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
615        let decompressed = decompressor
616            .decompress(compressed.data, compressed.num_values)
617            .unwrap();
618
619        match decompressed {
620            DataBlock::FixedWidth(ref block) => {
621                // Verify the decompressed data length matches expected
622                assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
623            }
624            _ => panic!("Expected FixedWidth block"),
625        }
626    }
627
628    // ========== Chunk Boundary Tests ==========
629
630    #[test]
631    fn test_power_of_two_chunking() {
632        let encoder = RleMiniBlockEncoder::new();
633
634        // Create data that will require multiple chunks
635        let test_sizes = vec![1000, 2500, 5000, 10000];
636
637        for size in test_sizes {
638            let data: Vec<i32> = (0..size)
639                .map(|i| i / 50) // Create runs of 50
640                .collect();
641
642            let array = Int32Array::from(data);
643            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
644
645            // Verify all non-last chunks have power-of-2 values
646            for (i, chunk) in compressed.chunks.iter().enumerate() {
647                if i < compressed.chunks.len() - 1 {
648                    assert!(chunk.log_num_values > 0);
649                    let chunk_values = 1u64 << chunk.log_num_values;
650                    assert!(chunk_values.is_power_of_two());
651                    assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
652                } else {
653                    assert_eq!(chunk.log_num_values, 0);
654                }
655            }
656        }
657    }
658
659    // ========== Error Handling Tests ==========
660
661    #[test]
662    #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
663    fn test_invalid_buffer_count() {
664        let decompressor = RleMiniBlockDecompressor::new(32);
665        let _ = decompressor.decompress(vec![LanceBuffer::from(vec![1, 2, 3, 4])], 10);
666    }
667
668    #[test]
669    #[should_panic(expected = "Inconsistent RLE buffers")]
670    fn test_buffer_consistency() {
671        let decompressor = RleMiniBlockDecompressor::new(32);
672        let values = LanceBuffer::from(vec![1, 0, 0, 0]); // 1 i32 value
673        let lengths = LanceBuffer::from(vec![5, 10]); // 2 lengths - mismatch!
674        let _ = decompressor.decompress(vec![values, lengths], 15);
675    }
676
677    #[test]
678    fn test_empty_data_handling() {
679        let encoder = RleMiniBlockEncoder::new();
680
681        // Test empty block
682        let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
683            bits_per_value: 32,
684            data: LanceBuffer::from(vec![]),
685            num_values: 0,
686            block_info: BlockInfo::default(),
687        });
688
689        let (compressed, _) = encoder.compress(empty_block).unwrap();
690        assert_eq!(compressed.num_values, 0);
691        assert!(compressed.data.is_empty());
692
693        // Test decompression of empty data
694        let decompressor = RleMiniBlockDecompressor::new(32);
695        let decompressed = decompressor.decompress(vec![], 0).unwrap();
696
697        match decompressed {
698            DataBlock::FixedWidth(ref block) => {
699                assert_eq!(block.num_values, 0);
700                assert_eq!(block.data.len(), 0);
701            }
702            _ => panic!("Expected FixedWidth block"),
703        }
704    }
705
706    // ========== Integration Test ==========
707
708    #[test]
709    fn test_multi_chunk_round_trip() {
710        let encoder = RleMiniBlockEncoder::new();
711
712        // Create data that spans multiple chunks with mixed patterns
713        let mut data = Vec::new();
714
715        // High compression section
716        data.extend(vec![999i32; 2000]);
717        // Low compression section
718        data.extend(0..1000);
719        // Another high compression section
720        data.extend(vec![777i32; 2000]);
721
722        let array = Int32Array::from(data.clone());
723        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
724
725        // Manually decompress all chunks
726        let mut reconstructed = Vec::new();
727        let mut values_offset = 0usize;
728        let mut lengths_offset = 0usize;
729        let mut values_processed = 0u64;
730
731        // We now have exactly 2 global buffers
732        assert_eq!(compressed.data.len(), 2);
733        let global_values = &compressed.data[0];
734        let global_lengths = &compressed.data[1];
735
736        for chunk in &compressed.chunks {
737            let chunk_values = if chunk.log_num_values > 0 {
738                1u64 << chunk.log_num_values
739            } else {
740                compressed.num_values - values_processed
741            };
742
743            // Extract chunk buffers from global buffers using buffer_sizes
744            let values_size = chunk.buffer_sizes[0] as usize;
745            let lengths_size = chunk.buffer_sizes[1] as usize;
746
747            let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
748            let chunk_lengths_buffer =
749                global_lengths.slice_with_length(lengths_offset, lengths_size);
750
751            let decompressor = RleMiniBlockDecompressor::new(32);
752            let chunk_data = decompressor
753                .decompress(
754                    vec![chunk_values_buffer, chunk_lengths_buffer],
755                    chunk_values,
756                )
757                .unwrap();
758
759            values_offset += values_size;
760            lengths_offset += lengths_size;
761            values_processed += chunk_values;
762
763            match chunk_data {
764                DataBlock::FixedWidth(ref block) => {
765                    let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
766                    reconstructed.extend_from_slice(values);
767                }
768                _ => panic!("Expected FixedWidth block"),
769            }
770        }
771
772        assert_eq!(reconstructed, data);
773    }
774
775    #[test]
776    fn test_1024_boundary_conditions() {
777        // Comprehensive test for various boundary conditions at 1024 values
778        // This consolidates multiple bug tests that were previously separate
779        let encoder = RleMiniBlockEncoder::new();
780        let decompressor = RleMiniBlockDecompressor::new(32);
781
782        let test_cases = [
783            ("runs_of_2", {
784                let mut data = Vec::new();
785                for i in 0..512 {
786                    data.push(i);
787                    data.push(i);
788                }
789                data
790            }),
791            ("single_run_1024", vec![42i32; 1024]),
792            ("alternating_values", {
793                let mut data = Vec::new();
794                for i in 0..1024 {
795                    data.push(i % 2);
796                }
797                data
798            }),
799            ("run_boundary_255s", {
800                let mut data = Vec::new();
801                data.extend(vec![1i32; 255]);
802                data.extend(vec![2i32; 255]);
803                data.extend(vec![3i32; 255]);
804                data.extend(vec![4i32; 255]);
805                data.extend(vec![5i32; 4]);
806                data
807            }),
808            ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
809            ("unique_plus_duplicate", {
810                // 1023 unique values followed by one duplicate (regression test)
811                let mut data = Vec::new();
812                for i in 0..1023 {
813                    data.push(i);
814                }
815                data.push(1022i32); // Last value same as second-to-last
816                data
817            }),
818            ("bug_4092_pattern", {
819                // Test exact scenario that produces 4092 bytes instead of 4096
820                let mut data = Vec::new();
821                for i in 0..1022 {
822                    data.push(i);
823                }
824                data.push(999999i32);
825                data.push(999999i32);
826                data
827            }),
828        ];
829
830        for (test_name, data) in test_cases.iter() {
831            assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
832
833            // Compress the data
834            let array = Int32Array::from(data.clone());
835            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
836
837            // Decompress and verify
838            match decompressor.decompress(compressed.data, compressed.num_values) {
839                Ok(decompressed) => match decompressed {
840                    DataBlock::FixedWidth(ref block) => {
841                        let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
842                        assert_eq!(
843                            values.len(),
844                            1024,
845                            "Test case {} got {} values, expected 1024",
846                            test_name,
847                            values.len()
848                        );
849                        assert_eq!(
850                            block.data.len(),
851                            4096,
852                            "Test case {} got {} bytes, expected 4096",
853                            test_name,
854                            block.data.len()
855                        );
856                        assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
857                    }
858                    _ => panic!("Test case {} expected FixedWidth block", test_name),
859                },
860                Err(e) => {
861                    if e.to_string().contains("4092") {
862                        panic!("Test case {} found bug 4092: {}", test_name, e);
863                    }
864                    panic!("Test case {} failed with error: {}", test_name, e);
865                }
866            }
867        }
868    }
869
870    #[test]
871    fn test_low_repetition_50pct_bug() {
872        // Test case that reproduces the 4092 bytes bug with low repetition (50%)
873        // This simulates the 1M benchmark case
874        let encoder = RleMiniBlockEncoder::new();
875
876        // Create 1M values with low repetition (50% chance of change)
877        let num_values = 1_048_576; // 1M values
878        let mut data = Vec::with_capacity(num_values);
879        let mut value = 0i32;
880        let mut rng = 12345u64; // Simple deterministic RNG
881
882        for _ in 0..num_values {
883            data.push(value);
884            // Simple LCG for deterministic randomness
885            rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
886            // 50% chance to increment value
887            if (rng >> 16) & 1 == 1 {
888                value += 1;
889            }
890        }
891
892        let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
893
894        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
895            bits_per_value: 32,
896            data: LanceBuffer::from(bytes),
897            num_values: num_values as u64,
898            block_info: BlockInfo::default(),
899        });
900
901        let (compressed, _) = encoder.compress(block).unwrap();
902
903        // Debug first few chunks
904        for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
905            let _chunk_values = if chunk.log_num_values > 0 {
906                1 << chunk.log_num_values
907            } else {
908                // Last chunk - calculate remaining
909                let prev_total: usize = compressed.chunks[..i]
910                    .iter()
911                    .map(|c| 1usize << c.log_num_values)
912                    .sum();
913                num_values - prev_total
914            };
915        }
916
917        // Try to decompress
918        let decompressor = RleMiniBlockDecompressor::new(32);
919        match decompressor.decompress(compressed.data, compressed.num_values) {
920            Ok(decompressed) => match decompressed {
921                DataBlock::FixedWidth(ref block) => {
922                    assert_eq!(
923                        block.data.len(),
924                        num_values * 4,
925                        "Expected {} bytes but got {}",
926                        num_values * 4,
927                        block.data.len()
928                    );
929                }
930                _ => panic!("Expected FixedWidth block"),
931            },
932            Err(e) => {
933                if e.to_string().contains("4092") {
934                    panic!("Bug reproduced! {}", e);
935                } else {
936                    panic!("Unexpected error: {}", e);
937                }
938            }
939        }
940    }
941
942    // ========== Encoding Verification Tests ==========
943
944    #[test_log::test(tokio::test)]
945    async fn test_rle_encoding_verification() {
946        use crate::testing::{check_round_trip_encoding_of_data, TestCases};
947        use crate::version::LanceFileVersion;
948        use arrow_array::{Array, Int32Array};
949        use lance_datagen::{ArrayGenerator, RowCount};
950        use std::collections::HashMap;
951        use std::sync::Arc;
952
953        let test_cases = TestCases::default()
954            .with_expected_encoding("rle")
955            .with_min_file_version(LanceFileVersion::V2_1);
956
957        // Test both explicit metadata and automatic selection
958        // 1. Test with explicit RLE threshold metadata (also disable BSS)
959        let mut metadata_explicit = HashMap::new();
960        metadata_explicit.insert(
961            "lance-encoding:rle-threshold".to_string(),
962            "0.8".to_string(),
963        );
964        metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
965
966        let mut generator = RleDataGenerator::new(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]);
967        let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
968        check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
969            .await;
970
971        // 2. Test automatic RLE selection based on data characteristics
972        // 80% repetition should trigger RLE (> default 50% threshold)
973        // Explicitly disable BSS to ensure RLE is tested
974        let mut metadata = HashMap::new();
975        metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
976
977        let mut values = vec![42i32; 8000]; // 80% repetition
978        values.extend([1i32, 2i32, 3i32, 4i32, 5i32].repeat(400)); // 20% variety
979        let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
980        check_round_trip_encoding_of_data(vec![arr], &test_cases, metadata).await;
981    }
982
983    /// Generator that produces repetitive patterns suitable for RLE
984    #[derive(Debug)]
985    struct RleDataGenerator {
986        pattern: Vec<i32>,
987        idx: usize,
988    }
989
990    impl RleDataGenerator {
991        fn new(pattern: Vec<i32>) -> Self {
992            Self { pattern, idx: 0 }
993        }
994    }
995
996    impl lance_datagen::ArrayGenerator for RleDataGenerator {
997        fn generate(
998            &mut self,
999            _length: lance_datagen::RowCount,
1000            _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1001        ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1002        {
1003            use arrow_array::Int32Array;
1004            use std::sync::Arc;
1005
1006            // Generate enough repetitive data to trigger RLE
1007            let mut values = Vec::new();
1008            for _ in 0..10000 {
1009                values.push(self.pattern[self.idx]);
1010                self.idx = (self.idx + 1) % self.pattern.len();
1011            }
1012            Ok(Arc::new(Int32Array::from(values)))
1013        }
1014
1015        fn data_type(&self) -> &arrow_schema::DataType {
1016            &arrow_schema::DataType::Int32
1017        }
1018
1019        fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1020            Some(lance_datagen::ByteCount::from(4))
1021        }
1022    }
1023}