lance_encoding/encodings/physical/
rle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # RLE (Run-Length Encoding) Miniblock Format
5//!
6//! RLE compression for Lance miniblock format, optimized for data with repeated values.
7//!
8//! ## Encoding Format
9//!
10//! RLE uses a dual-buffer format to store compressed data:
11//!
12//! - **Values Buffer**: Stores unique values in their original data type
13//! - **Lengths Buffer**: Stores the repeat count for each value as u8
14//!
15//! ### Example
16//!
17//! Input data: `[1, 1, 1, 2, 2, 3, 3, 3, 3]`
18//!
19//! Encoded as:
20//! - Values buffer: `[1, 2, 3]` (3 × 4 bytes for i32)
21//! - Lengths buffer: `[3, 2, 4]` (3 × 1 byte for u8)
22//!
23//! ### Long Run Handling
24//!
25//! When a run exceeds 255 values, it is split into multiple runs of 255
26//! followed by a final run with the remainder. For example, a run of 1000
27//! identical values becomes 4 runs: [255, 255, 255, 235].
28//!
29//! ## Supported Types
30//!
31//! RLE supports all fixed-width primitive types:
32//! - 8-bit: u8, i8
33//! - 16-bit: u16, i16
34//! - 32-bit: u32, i32, f32
35//! - 64-bit: u64, i64, f64
36//!
37//! ## Compression Strategy
38//!
39//! RLE is automatically selected when:
40//! - The run count (number of value transitions) < 50% of total values
41//! - This indicates sufficient repetition for RLE to be effective
42//!
43//! ## Chunk Handling
44//!
45//! - Maximum chunk size: 4096 values (miniblock constraint)
46//! - All chunks share two global buffers (values and lengths)
47//! - Each chunk's buffer_sizes indicate its portion of the global buffers
48//! - Non-last chunks always contain power-of-2 values
49//! - Byte limits are enforced dynamically during encoding
50
51use log::trace;
52use snafu::location;
53
54use crate::buffer::LanceBuffer;
55use crate::compression::MiniBlockDecompressor;
56use crate::data::DataBlock;
57use crate::data::{BlockInfo, FixedWidthDataBlock};
58use crate::encodings::logical::primitive::miniblock::{
59    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
60    MAX_MINIBLOCK_VALUES,
61};
62use crate::format::{pb, ProtobufUtils};
63
64use lance_core::{Error, Result};
65
66/// RLE encoder for miniblock format
67#[derive(Debug, Default)]
68pub struct RleMiniBlockEncoder;
69
70impl RleMiniBlockEncoder {
71    pub fn new() -> Self {
72        Self
73    }
74
75    fn encode_data(
76        &self,
77        data: &LanceBuffer,
78        num_values: u64,
79        bits_per_value: u64,
80    ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
81        if num_values == 0 {
82            return Ok((Vec::new(), Vec::new()));
83        }
84
85        let bytes_per_value = (bits_per_value / 8) as usize;
86
87        // Pre-allocate global buffers with estimated capacity
88        // Assume average compression ratio of ~10:1 (10 values per run)
89        let estimated_runs = num_values as usize / 10;
90        let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
91        let mut all_lengths = Vec::with_capacity(estimated_runs);
92        let mut chunks = Vec::new();
93
94        let mut offset = 0usize;
95        let mut values_remaining = num_values as usize;
96
97        while values_remaining > 0 {
98            let values_start = all_values.len();
99            let lengths_start = all_lengths.len();
100
101            let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
102                8 => self.encode_chunk_rolling::<u8>(
103                    data,
104                    offset,
105                    values_remaining,
106                    &mut all_values,
107                    &mut all_lengths,
108                ),
109                16 => self.encode_chunk_rolling::<u16>(
110                    data,
111                    offset,
112                    values_remaining,
113                    &mut all_values,
114                    &mut all_lengths,
115                ),
116                32 => self.encode_chunk_rolling::<u32>(
117                    data,
118                    offset,
119                    values_remaining,
120                    &mut all_values,
121                    &mut all_lengths,
122                ),
123                64 => self.encode_chunk_rolling::<u64>(
124                    data,
125                    offset,
126                    values_remaining,
127                    &mut all_values,
128                    &mut all_lengths,
129                ),
130                _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
131            };
132
133            if values_processed == 0 {
134                break;
135            }
136
137            let log_num_values = if is_last_chunk {
138                0
139            } else {
140                assert!(
141                    values_processed.is_power_of_two(),
142                    "Non-last chunk must have power-of-2 values"
143                );
144                values_processed.ilog2() as u8
145            };
146
147            let values_size = all_values.len() - values_start;
148            let lengths_size = all_lengths.len() - lengths_start;
149
150            let chunk = MiniBlockChunk {
151                buffer_sizes: vec![values_size as u16, lengths_size as u16],
152                log_num_values,
153            };
154
155            chunks.push(chunk);
156
157            offset += values_processed;
158            values_remaining -= values_processed;
159        }
160
161        // Return exactly two buffers: values and lengths
162        Ok((
163            vec![
164                LanceBuffer::Owned(all_values),
165                LanceBuffer::Owned(all_lengths),
166            ],
167            chunks,
168        ))
169    }
170
171    /// Encodes a chunk of data using RLE compression with dynamic boundary detection.
172    ///
173    /// This function processes values sequentially, detecting runs (sequences of identical values)
174    /// and encoding them as (value, length) pairs. It dynamically determines whether this chunk
175    /// should be the last chunk based on how many values were processed.
176    ///
177    /// # Key Features:
178    /// - Tracks byte usage to ensure we don't exceed MAX_MINIBLOCK_BYTES
179    /// - Maintains power-of-2 checkpoints for non-last chunks
180    /// - Splits long runs (>255) into multiple entries
181    /// - Dynamically determines if this is the last chunk
182    ///
183    /// # Returns:
184    /// - num_runs: Number of runs encoded
185    /// - values_processed: Number of input values processed
186    /// - is_last_chunk: Whether this chunk processed all remaining values
187    fn encode_chunk_rolling<T>(
188        &self,
189        data: &LanceBuffer,
190        offset: usize,
191        values_remaining: usize,
192        all_values: &mut Vec<u8>,
193        all_lengths: &mut Vec<u8>,
194    ) -> (usize, usize, bool)
195    where
196        T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug,
197    {
198        let type_size = std::mem::size_of::<T>();
199        let data_slice = data.as_ref();
200
201        let chunk_start = offset * type_size;
202        let max_by_count = MAX_MINIBLOCK_VALUES as usize;
203        let max_values = values_remaining.min(max_by_count);
204        let chunk_end = chunk_start + max_values * type_size;
205
206        if chunk_start >= data_slice.len() {
207            return (0, 0, false);
208        }
209
210        let chunk_data = &data_slice[chunk_start..chunk_end.min(data_slice.len())];
211        let typed_data: &[T] = bytemuck::cast_slice(chunk_data);
212
213        if typed_data.is_empty() {
214            return (0, 0, false);
215        }
216
217        // Record starting positions for this chunk
218        let values_start = all_values.len();
219
220        let mut current_value = typed_data[0];
221        let mut current_length = 1u64;
222        let mut bytes_used = 0usize;
223        let mut total_values_encoded = 0usize; // Track total encoded values
224
225        // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes
226        // For smaller data types like u8, we can use larger initial checkpoints
227        // since they take less space per value
228        let checkpoints = match type_size {
229            1 => vec![256, 512, 1024, 2048, 4096], // u8 can start from 256
230            2 => vec![128, 256, 512, 1024, 2048, 4096], // u16 can start from 128
231            _ => vec![64, 128, 256, 512, 1024, 2048, 4096], // u32/u64: no difference
232        };
233        let valid_checkpoints: Vec<usize> = checkpoints
234            .into_iter()
235            .filter(|&p| p <= values_remaining)
236            .collect();
237        let mut checkpoint_idx = 0;
238
239        // Save state at checkpoints so we can roll back if needed
240        let mut last_checkpoint_state = None;
241
242        for &value in typed_data[1..].iter() {
243            if value == current_value {
244                current_length += 1;
245            } else {
246                // Calculate space needed (may need multiple u8s if run > 255)
247                let run_chunks = current_length.div_ceil(255) as usize;
248                let bytes_needed = run_chunks * (type_size + 1);
249
250                // Stop if adding this run would exceed byte limit
251                if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
252                    if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
253                        // Roll back to last power-of-2 checkpoint
254                        all_values.truncate(val_pos);
255                        all_lengths.truncate(len_pos);
256                        let num_runs = (val_pos - values_start) / type_size;
257                        return (num_runs, checkpoint_values, false);
258                    }
259                    break;
260                }
261
262                bytes_used += self.add_run(&current_value, current_length, all_values, all_lengths);
263                total_values_encoded += current_length as usize;
264                current_value = value;
265                current_length = 1;
266            }
267
268            // Check if we reached a power-of-2 checkpoint
269            if checkpoint_idx < valid_checkpoints.len()
270                && total_values_encoded >= valid_checkpoints[checkpoint_idx]
271            {
272                last_checkpoint_state = Some((
273                    all_values.len(),
274                    all_lengths.len(),
275                    bytes_used,
276                    valid_checkpoints[checkpoint_idx],
277                ));
278                checkpoint_idx += 1;
279            }
280        }
281
282        // After the loop, we always have a pending run that needs to be added
283        // unless we've exceeded the byte limit
284        if current_length > 0 {
285            let run_chunks = current_length.div_ceil(255) as usize;
286            let bytes_needed = run_chunks * (type_size + 1);
287
288            if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
289                let _ = self.add_run(&current_value, current_length, all_values, all_lengths);
290                total_values_encoded += current_length as usize;
291            }
292        }
293
294        // Determine if we've processed all remaining values
295        let is_last_chunk = total_values_encoded == values_remaining;
296
297        // Non-last chunks must have power-of-2 values for miniblock format
298        if !is_last_chunk {
299            if total_values_encoded.is_power_of_two() {
300                // Already at power-of-2 boundary
301            } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
302                // Roll back to last valid checkpoint
303                all_values.truncate(val_pos);
304                all_lengths.truncate(len_pos);
305                let num_runs = (val_pos - values_start) / type_size;
306                return (num_runs, checkpoint_values, false);
307            } else {
308                // No valid checkpoint, can't create a valid chunk
309                return (0, 0, false);
310            }
311        }
312
313        let num_runs = (all_values.len() - values_start) / type_size;
314        (num_runs, total_values_encoded, is_last_chunk)
315    }
316
317    fn add_run<T>(
318        &self,
319        value: &T,
320        length: u64,
321        all_values: &mut Vec<u8>,
322        all_lengths: &mut Vec<u8>,
323    ) -> usize
324    where
325        T: bytemuck::Pod,
326    {
327        let value_bytes = bytemuck::bytes_of(value);
328        let type_size = std::mem::size_of::<T>();
329        let num_full_chunks = (length / 255) as usize;
330        let remainder = (length % 255) as u8;
331
332        let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
333        all_values.reserve(total_chunks * type_size);
334        all_lengths.reserve(total_chunks);
335
336        for _ in 0..num_full_chunks {
337            all_values.extend_from_slice(value_bytes);
338            all_lengths.push(255);
339        }
340
341        if remainder > 0 {
342            all_values.extend_from_slice(value_bytes);
343            all_lengths.push(remainder);
344        }
345
346        total_chunks * (type_size + 1)
347    }
348}
349
350impl MiniBlockCompressor for RleMiniBlockEncoder {
351    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
352        match data {
353            DataBlock::FixedWidth(fixed_width) => {
354                let num_values = fixed_width.num_values;
355                let bits_per_value = fixed_width.bits_per_value;
356
357                let (all_buffers, chunks) =
358                    self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
359
360                let compressed = MiniBlockCompressed {
361                    data: all_buffers,
362                    chunks,
363                    num_values,
364                };
365
366                let encoding = ProtobufUtils::rle(bits_per_value);
367
368                Ok((compressed, encoding))
369            }
370            _ => Err(Error::InvalidInput {
371                location: location!(),
372                source: "RLE encoding only supports FixedWidth data blocks".into(),
373            }),
374        }
375    }
376}
377
378/// RLE decompressor for miniblock format
379#[derive(Debug)]
380pub struct RleMiniBlockDecompressor {
381    bits_per_value: u64,
382}
383
384impl RleMiniBlockDecompressor {
385    pub fn new(bits_per_value: u64) -> Self {
386        Self { bits_per_value }
387    }
388
389    fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
390        if num_values == 0 {
391            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
392                bits_per_value: self.bits_per_value,
393                data: LanceBuffer::Owned(vec![]),
394                num_values: 0,
395                block_info: BlockInfo::default(),
396            }));
397        }
398
399        assert_eq!(
400            data.len(),
401            2,
402            "RLE decompressor expects exactly 2 buffers, got {}",
403            data.len()
404        );
405
406        let values_buffer = &data[0];
407        let lengths_buffer = &data[1];
408
409        let decoded_data = match self.bits_per_value {
410            8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
411            16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
412            32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
413            64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
414            _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
415        };
416
417        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
418            bits_per_value: self.bits_per_value,
419            data: LanceBuffer::Owned(decoded_data),
420            num_values,
421            block_info: BlockInfo::default(),
422        }))
423    }
424
425    fn decode_generic<T>(
426        &self,
427        values_buffer: &LanceBuffer,
428        lengths_buffer: &LanceBuffer,
429        num_values: u64,
430    ) -> Result<Vec<u8>>
431    where
432        T: bytemuck::Pod + Copy + std::fmt::Debug,
433    {
434        let values_bytes = values_buffer.as_ref();
435        let lengths_bytes = lengths_buffer.as_ref();
436
437        let type_size = std::mem::size_of::<T>();
438
439        if values_bytes.is_empty() || lengths_bytes.is_empty() {
440            if num_values == 0 {
441                return Ok(Vec::new());
442            } else {
443                return Err(Error::InvalidInput {
444                    location: location!(),
445                    source: format!("Empty buffers but expected {} values", num_values).into(),
446                });
447            }
448        }
449
450        if values_bytes.len() % type_size != 0 || lengths_bytes.is_empty() {
451            return Err(Error::InvalidInput {
452                location: location!(),
453                source: format!(
454                    "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
455                    std::any::type_name::<T>(),
456                    values_bytes.len(),
457                    type_size,
458                    lengths_bytes.len()
459                )
460                .into(),
461            });
462        }
463
464        let num_runs = values_bytes.len() / type_size;
465        let num_length_entries = lengths_bytes.len();
466        assert_eq!(
467            num_runs, num_length_entries,
468            "Inconsistent RLE buffers: {} runs but {} length entries",
469            num_runs, num_length_entries
470        );
471
472        let values: &[T] = bytemuck::cast_slice(values_bytes);
473        let lengths: &[u8] = lengths_bytes;
474
475        let expected_byte_count = num_values as usize * type_size;
476        let mut decoded = Vec::with_capacity(expected_byte_count);
477
478        for (value, &length) in values.iter().zip(lengths.iter()) {
479            let run_length = length as usize;
480            let bytes_to_write = run_length * type_size;
481            let bytes_of_value = bytemuck::bytes_of(value);
482
483            if decoded.len() + bytes_to_write > expected_byte_count {
484                let remaining_bytes = expected_byte_count - decoded.len();
485                let remaining_values = remaining_bytes / type_size;
486
487                for _ in 0..remaining_values {
488                    decoded.extend_from_slice(bytes_of_value);
489                }
490                break;
491            }
492
493            for _ in 0..run_length {
494                decoded.extend_from_slice(bytes_of_value);
495            }
496        }
497
498        if decoded.len() != expected_byte_count {
499            return Err(Error::InvalidInput {
500                location: location!(),
501                source: format!(
502                    "RLE decoding produced {} bytes, expected {}",
503                    decoded.len(),
504                    expected_byte_count
505                )
506                .into(),
507            });
508        }
509
510        trace!(
511            "RLE decoded {} {} values",
512            num_values,
513            std::any::type_name::<T>()
514        );
515        Ok(decoded)
516    }
517}
518
519impl MiniBlockDecompressor for RleMiniBlockDecompressor {
520    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
521        self.decode_data(data, num_values)
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
529    use crate::data::DataBlock;
530    use arrow_array::Int32Array;
531    use lance_core::datatypes::Field;
532
533    // ========== Core Functionality Tests ==========
534
535    #[test]
536    fn test_basic_rle_encoding() {
537        let encoder = RleMiniBlockEncoder::new();
538
539        // Test basic RLE pattern: [1, 1, 1, 2, 2, 3, 3, 3, 3]
540        let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
541        let data_block = DataBlock::from_array(array);
542
543        let (compressed, _) = encoder.compress(data_block).unwrap();
544
545        assert_eq!(compressed.num_values, 9);
546        assert_eq!(compressed.chunks.len(), 1);
547
548        // Verify compression happened (3 runs instead of 9 values)
549        let values_buffer = &compressed.data[0];
550        let lengths_buffer = &compressed.data[1];
551        assert_eq!(values_buffer.len(), 12); // 3 i32 values
552        assert_eq!(lengths_buffer.len(), 3); // 3 u8 lengths
553    }
554
555    #[test]
556    fn test_long_run_splitting() {
557        let encoder = RleMiniBlockEncoder::new();
558
559        // Create a run longer than 255 to test splitting
560        let mut data = vec![42i32; 1000]; // Will be split into 255+255+255+235
561        data.extend(&[100i32; 300]); // Will be split into 255+45
562
563        let array = Int32Array::from(data);
564        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
565
566        // Should have 6 runs total (4 for first value, 2 for second)
567        let lengths_buffer = &compressed.data[1];
568        assert_eq!(lengths_buffer.len(), 6);
569    }
570
571    #[test]
572    fn test_compression_strategy_selection() {
573        let strategy = DefaultCompressionStrategy::new();
574        let field = Field::new_arrow("test", arrow_schema::DataType::Int32, false).unwrap();
575
576        // High repetition - should select RLE
577        let repetitive_array = Int32Array::from(vec![1; 1000]);
578        let repetitive_block = DataBlock::from_array(repetitive_array);
579
580        let compressor = strategy
581            .create_miniblock_compressor(&field, &repetitive_block)
582            .unwrap();
583        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
584
585        // No repetition - should NOT select RLE
586        let unique_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
587        let unique_block = DataBlock::from_array(unique_array);
588
589        let compressor = strategy
590            .create_miniblock_compressor(&field, &unique_block)
591            .unwrap();
592        assert!(!format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
593    }
594
595    // ========== Round-trip Tests for Different Types ==========
596
597    #[test]
598    fn test_round_trip_all_types() {
599        // Test u8
600        test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
601
602        // Test u16
603        test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
604
605        // Test i32
606        test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
607
608        // Test u64
609        test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
610    }
611
612    fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
613    where
614        T: bytemuck::Pod + PartialEq + std::fmt::Debug,
615    {
616        let encoder = RleMiniBlockEncoder::new();
617        let bytes: Vec<u8> = data
618            .iter()
619            .flat_map(|v| bytemuck::bytes_of(v))
620            .copied()
621            .collect();
622
623        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
624            bits_per_value,
625            data: LanceBuffer::Owned(bytes),
626            num_values: data.len() as u64,
627            block_info: BlockInfo::default(),
628        });
629
630        let (compressed, _) = encoder.compress(block).unwrap();
631        let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
632        let decompressed = decompressor
633            .decompress(compressed.data, compressed.num_values)
634            .unwrap();
635
636        match decompressed {
637            DataBlock::FixedWidth(ref block) => {
638                // Verify the decompressed data length matches expected
639                assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
640            }
641            _ => panic!("Expected FixedWidth block"),
642        }
643    }
644
645    // ========== Chunk Boundary Tests ==========
646
647    #[test]
648    fn test_power_of_two_chunking() {
649        let encoder = RleMiniBlockEncoder::new();
650
651        // Create data that will require multiple chunks
652        let test_sizes = vec![1000, 2500, 5000, 10000];
653
654        for size in test_sizes {
655            let data: Vec<i32> = (0..size)
656                .map(|i| i / 50) // Create runs of 50
657                .collect();
658
659            let array = Int32Array::from(data);
660            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
661
662            // Verify all non-last chunks have power-of-2 values
663            for (i, chunk) in compressed.chunks.iter().enumerate() {
664                if i < compressed.chunks.len() - 1 {
665                    assert!(chunk.log_num_values > 0);
666                    let chunk_values = 1u64 << chunk.log_num_values;
667                    assert!(chunk_values.is_power_of_two());
668                    assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
669                } else {
670                    assert_eq!(chunk.log_num_values, 0);
671                }
672            }
673        }
674    }
675
676    // ========== Error Handling Tests ==========
677
678    #[test]
679    #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
680    fn test_invalid_buffer_count() {
681        let decompressor = RleMiniBlockDecompressor::new(32);
682        let _ = decompressor.decompress(vec![LanceBuffer::Owned(vec![1, 2, 3, 4])], 10);
683    }
684
685    #[test]
686    #[should_panic(expected = "Inconsistent RLE buffers")]
687    fn test_buffer_consistency() {
688        let decompressor = RleMiniBlockDecompressor::new(32);
689        let values = LanceBuffer::Owned(vec![1, 0, 0, 0]); // 1 i32 value
690        let lengths = LanceBuffer::Owned(vec![5, 10]); // 2 lengths - mismatch!
691        let _ = decompressor.decompress(vec![values, lengths], 15);
692    }
693
694    #[test]
695    fn test_empty_data_handling() {
696        let encoder = RleMiniBlockEncoder::new();
697
698        // Test empty block
699        let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
700            bits_per_value: 32,
701            data: LanceBuffer::Owned(vec![]),
702            num_values: 0,
703            block_info: BlockInfo::default(),
704        });
705
706        let (compressed, _) = encoder.compress(empty_block).unwrap();
707        assert_eq!(compressed.num_values, 0);
708        assert!(compressed.data.is_empty());
709
710        // Test decompression of empty data
711        let decompressor = RleMiniBlockDecompressor::new(32);
712        let decompressed = decompressor.decompress(vec![], 0).unwrap();
713
714        match decompressed {
715            DataBlock::FixedWidth(ref block) => {
716                assert_eq!(block.num_values, 0);
717                assert_eq!(block.data.len(), 0);
718            }
719            _ => panic!("Expected FixedWidth block"),
720        }
721    }
722
723    // ========== Integration Test ==========
724
725    #[test]
726    fn test_multi_chunk_round_trip() {
727        let encoder = RleMiniBlockEncoder::new();
728
729        // Create data that spans multiple chunks with mixed patterns
730        let mut data = Vec::new();
731
732        // High compression section
733        data.extend(vec![999i32; 2000]);
734        // Low compression section
735        data.extend(0..1000);
736        // Another high compression section
737        data.extend(vec![777i32; 2000]);
738
739        let array = Int32Array::from(data.clone());
740        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
741
742        // Manually decompress all chunks
743        let mut reconstructed = Vec::new();
744        let mut values_offset = 0usize;
745        let mut lengths_offset = 0usize;
746        let mut values_processed = 0u64;
747
748        // We now have exactly 2 global buffers
749        assert_eq!(compressed.data.len(), 2);
750        let global_values = &compressed.data[0];
751        let global_lengths = &compressed.data[1];
752
753        for chunk in &compressed.chunks {
754            let chunk_values = if chunk.log_num_values > 0 {
755                1u64 << chunk.log_num_values
756            } else {
757                compressed.num_values - values_processed
758            };
759
760            // Extract chunk buffers from global buffers using buffer_sizes
761            let values_size = chunk.buffer_sizes[0] as usize;
762            let lengths_size = chunk.buffer_sizes[1] as usize;
763
764            let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
765            let chunk_lengths_buffer =
766                global_lengths.slice_with_length(lengths_offset, lengths_size);
767
768            let decompressor = RleMiniBlockDecompressor::new(32);
769            let chunk_data = decompressor
770                .decompress(
771                    vec![chunk_values_buffer, chunk_lengths_buffer],
772                    chunk_values,
773                )
774                .unwrap();
775
776            values_offset += values_size;
777            lengths_offset += lengths_size;
778            values_processed += chunk_values;
779
780            match chunk_data {
781                DataBlock::FixedWidth(ref block) => {
782                    let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
783                    reconstructed.extend_from_slice(values);
784                }
785                _ => panic!("Expected FixedWidth block"),
786            }
787        }
788
789        assert_eq!(reconstructed, data);
790    }
791
792    #[test]
793    fn test_1024_boundary_conditions() {
794        // Comprehensive test for various boundary conditions at 1024 values
795        // This consolidates multiple bug tests that were previously separate
796        let encoder = RleMiniBlockEncoder::new();
797        let decompressor = RleMiniBlockDecompressor::new(32);
798
799        let test_cases = [
800            ("runs_of_2", {
801                let mut data = Vec::new();
802                for i in 0..512 {
803                    data.push(i);
804                    data.push(i);
805                }
806                data
807            }),
808            ("single_run_1024", vec![42i32; 1024]),
809            ("alternating_values", {
810                let mut data = Vec::new();
811                for i in 0..1024 {
812                    data.push(i % 2);
813                }
814                data
815            }),
816            ("run_boundary_255s", {
817                let mut data = Vec::new();
818                data.extend(vec![1i32; 255]);
819                data.extend(vec![2i32; 255]);
820                data.extend(vec![3i32; 255]);
821                data.extend(vec![4i32; 255]);
822                data.extend(vec![5i32; 4]);
823                data
824            }),
825            ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
826            ("unique_plus_duplicate", {
827                // 1023 unique values followed by one duplicate (regression test)
828                let mut data = Vec::new();
829                for i in 0..1023 {
830                    data.push(i);
831                }
832                data.push(1022i32); // Last value same as second-to-last
833                data
834            }),
835            ("bug_4092_pattern", {
836                // Test exact scenario that produces 4092 bytes instead of 4096
837                let mut data = Vec::new();
838                for i in 0..1022 {
839                    data.push(i);
840                }
841                data.push(999999i32);
842                data.push(999999i32);
843                data
844            }),
845        ];
846
847        for (test_name, data) in test_cases.iter() {
848            assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
849
850            // Compress the data
851            let array = Int32Array::from(data.clone());
852            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
853
854            // Decompress and verify
855            match decompressor.decompress(compressed.data, compressed.num_values) {
856                Ok(decompressed) => match decompressed {
857                    DataBlock::FixedWidth(ref block) => {
858                        let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
859                        assert_eq!(
860                            values.len(),
861                            1024,
862                            "Test case {} got {} values, expected 1024",
863                            test_name,
864                            values.len()
865                        );
866                        assert_eq!(
867                            block.data.len(),
868                            4096,
869                            "Test case {} got {} bytes, expected 4096",
870                            test_name,
871                            block.data.len()
872                        );
873                        assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
874                    }
875                    _ => panic!("Test case {} expected FixedWidth block", test_name),
876                },
877                Err(e) => {
878                    if e.to_string().contains("4092") {
879                        panic!("Test case {} found bug 4092: {}", test_name, e);
880                    }
881                    panic!("Test case {} failed with error: {}", test_name, e);
882                }
883            }
884        }
885    }
886
887    #[test]
888    fn test_low_repetition_50pct_bug() {
889        // Test case that reproduces the 4092 bytes bug with low repetition (50%)
890        // This simulates the 1M benchmark case
891        let encoder = RleMiniBlockEncoder::new();
892
893        // Create 1M values with low repetition (50% chance of change)
894        let num_values = 1_048_576; // 1M values
895        let mut data = Vec::with_capacity(num_values);
896        let mut value = 0i32;
897        let mut rng = 12345u64; // Simple deterministic RNG
898
899        for _ in 0..num_values {
900            data.push(value);
901            // Simple LCG for deterministic randomness
902            rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
903            // 50% chance to increment value
904            if (rng >> 16) & 1 == 1 {
905                value += 1;
906            }
907        }
908
909        let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
910
911        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
912            bits_per_value: 32,
913            data: LanceBuffer::Owned(bytes),
914            num_values: num_values as u64,
915            block_info: BlockInfo::default(),
916        });
917
918        let (compressed, _) = encoder.compress(block).unwrap();
919
920        // Debug first few chunks
921        for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
922            let _chunk_values = if chunk.log_num_values > 0 {
923                1 << chunk.log_num_values
924            } else {
925                // Last chunk - calculate remaining
926                let prev_total: usize = compressed.chunks[..i]
927                    .iter()
928                    .map(|c| 1usize << c.log_num_values)
929                    .sum();
930                num_values - prev_total
931            };
932        }
933
934        // Try to decompress
935        let decompressor = RleMiniBlockDecompressor::new(32);
936        match decompressor.decompress(compressed.data, compressed.num_values) {
937            Ok(decompressed) => match decompressed {
938                DataBlock::FixedWidth(ref block) => {
939                    assert_eq!(
940                        block.data.len(),
941                        num_values * 4,
942                        "Expected {} bytes but got {}",
943                        num_values * 4,
944                        block.data.len()
945                    );
946                }
947                _ => panic!("Expected FixedWidth block"),
948            },
949            Err(e) => {
950                if e.to_string().contains("4092") {
951                    panic!("Bug reproduced! {}", e);
952                } else {
953                    panic!("Unexpected error: {}", e);
954                }
955            }
956        }
957    }
958
959    // ========== Encoding Verification Tests ==========
960
961    #[test_log::test(tokio::test)]
962    async fn test_rle_encoding_verification() {
963        use crate::testing::{check_round_trip_encoding_of_data, TestCases};
964        use crate::version::LanceFileVersion;
965        use arrow_array::{Array, Int32Array};
966        use lance_datagen::{ArrayGenerator, RowCount};
967        use std::collections::HashMap;
968        use std::sync::Arc;
969
970        let test_cases = TestCases::default()
971            .with_expected_encoding("rle")
972            .with_file_version(LanceFileVersion::V2_1);
973
974        // Test both explicit metadata and automatic selection
975        // 1. Test with explicit RLE threshold metadata
976        let metadata_explicit = HashMap::from([(
977            "lance-encoding:rle-threshold".to_string(),
978            "0.8".to_string(),
979        )]);
980        let mut generator = RleDataGenerator::new(vec![1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]);
981        let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
982        check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
983            .await;
984
985        // 2. Test automatic RLE selection based on data characteristics
986        // 80% repetition should trigger RLE (> default 50% threshold)
987        let mut values = vec![42i32; 8000]; // 80% repetition
988        values.extend([1i32, 2i32, 3i32, 4i32, 5i32].repeat(400)); // 20% variety
989        let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
990        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
991    }
992
993    /// Generator that produces repetitive patterns suitable for RLE
994    #[derive(Debug)]
995    struct RleDataGenerator {
996        pattern: Vec<i32>,
997        idx: usize,
998    }
999
1000    impl RleDataGenerator {
1001        fn new(pattern: Vec<i32>) -> Self {
1002            Self { pattern, idx: 0 }
1003        }
1004    }
1005
1006    impl lance_datagen::ArrayGenerator for RleDataGenerator {
1007        fn generate(
1008            &mut self,
1009            _length: lance_datagen::RowCount,
1010            _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1011        ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1012        {
1013            use arrow_array::Int32Array;
1014            use std::sync::Arc;
1015
1016            // Generate enough repetitive data to trigger RLE
1017            let mut values = Vec::new();
1018            for _ in 0..10000 {
1019                values.push(self.pattern[self.idx]);
1020                self.idx = (self.idx + 1) % self.pattern.len();
1021            }
1022            Ok(Arc::new(Int32Array::from(values)))
1023        }
1024
1025        fn data_type(&self) -> &arrow_schema::DataType {
1026            &arrow_schema::DataType::Int32
1027        }
1028
1029        fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1030            Some(lance_datagen::ByteCount::from(4))
1031        }
1032    }
1033}