Skip to main content

lance_encoding/encodings/physical/
rle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # RLE (Run-Length Encoding) Miniblock Format
5//!
6//! RLE compression for Lance miniblock format, optimized for data with repeated values.
7//!
8//! ## Encoding Format
9//!
10//! RLE uses a dual-buffer format to store compressed data:
11//!
12//! - **Values Buffer**: Stores unique values in their original data type
13//! - **Lengths Buffer**: Stores the repeat count for each value as u8
14//!
15//! ### Example
16//!
17//! Input data: `[1, 1, 1, 2, 2, 3, 3, 3, 3]`
18//!
19//! Encoded as:
20//! - Values buffer: `[1, 2, 3]` (3 × 4 bytes for i32)
21//! - Lengths buffer: `[3, 2, 4]` (3 × 1 byte for u8)
22//!
23//! ### Long Run Handling
24//!
25//! When a run exceeds 255 values, it is split into multiple runs of 255
26//! followed by a final run with the remainder. For example, a run of 1000
27//! identical values becomes 4 runs: [255, 255, 255, 235].
28//!
29//! ## Supported Types
30//!
31//! RLE supports all fixed-width primitive types:
32//! - 8-bit: u8, i8
33//! - 16-bit: u16, i16
34//! - 32-bit: u32, i32, f32
35//! - 64-bit: u64, i64, f64
36//!
37//! ## Compression Strategy
38//!
39//! RLE is automatically selected when:
40//! - The run count (number of value transitions) < 50% of total values
41//! - This indicates sufficient repetition for RLE to be effective
42//!
43//! ## Chunk Handling
44//!
45//! - Maximum chunk size: 4096 values (miniblock constraint)
46//! - All chunks share two global buffers (values and lengths)
47//! - Each chunk's buffer_sizes indicate its portion of the global buffers
48//! - Non-last chunks always contain power-of-2 values
49//! - Byte limits are enforced dynamically during encoding
50
51use arrow_buffer::ArrowNativeType;
52use log::trace;
53use snafu::location;
54
55use crate::buffer::LanceBuffer;
56use crate::compression::MiniBlockDecompressor;
57use crate::data::DataBlock;
58use crate::data::{BlockInfo, FixedWidthDataBlock};
59use crate::encodings::logical::primitive::miniblock::{
60    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
61    MAX_MINIBLOCK_VALUES,
62};
63use crate::format::pb21::CompressiveEncoding;
64use crate::format::ProtobufUtils21;
65
66use lance_core::{Error, Result};
67
68/// RLE encoder for miniblock format
69#[derive(Debug, Default)]
70pub struct RleMiniBlockEncoder;
71
72impl RleMiniBlockEncoder {
73    pub fn new() -> Self {
74        Self
75    }
76
77    fn encode_data(
78        &self,
79        data: &LanceBuffer,
80        num_values: u64,
81        bits_per_value: u64,
82    ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
83        if num_values == 0 {
84            return Ok((Vec::new(), Vec::new()));
85        }
86
87        let bytes_per_value = (bits_per_value / 8) as usize;
88
89        // Pre-allocate global buffers with estimated capacity
90        // Assume average compression ratio of ~10:1 (10 values per run)
91        let estimated_runs = num_values as usize / 10;
92        let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
93        let mut all_lengths = Vec::with_capacity(estimated_runs);
94        let mut chunks = Vec::new();
95
96        let mut offset = 0usize;
97        let mut values_remaining = num_values as usize;
98
99        while values_remaining > 0 {
100            let values_start = all_values.len();
101            let lengths_start = all_lengths.len();
102
103            let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
104                8 => self.encode_chunk_rolling::<u8>(
105                    data,
106                    offset,
107                    values_remaining,
108                    &mut all_values,
109                    &mut all_lengths,
110                ),
111                16 => self.encode_chunk_rolling::<u16>(
112                    data,
113                    offset,
114                    values_remaining,
115                    &mut all_values,
116                    &mut all_lengths,
117                ),
118                32 => self.encode_chunk_rolling::<u32>(
119                    data,
120                    offset,
121                    values_remaining,
122                    &mut all_values,
123                    &mut all_lengths,
124                ),
125                64 => self.encode_chunk_rolling::<u64>(
126                    data,
127                    offset,
128                    values_remaining,
129                    &mut all_values,
130                    &mut all_lengths,
131                ),
132                _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
133            };
134
135            if values_processed == 0 {
136                break;
137            }
138
139            let log_num_values = if is_last_chunk {
140                0
141            } else {
142                assert!(
143                    values_processed.is_power_of_two(),
144                    "Non-last chunk must have power-of-2 values"
145                );
146                values_processed.ilog2() as u8
147            };
148
149            let values_size = all_values.len() - values_start;
150            let lengths_size = all_lengths.len() - lengths_start;
151
152            let chunk = MiniBlockChunk {
153                buffer_sizes: vec![values_size as u32, lengths_size as u32],
154                log_num_values,
155            };
156
157            chunks.push(chunk);
158
159            offset += values_processed;
160            values_remaining -= values_processed;
161        }
162
163        // Return exactly two buffers: values and lengths
164        Ok((
165            vec![
166                LanceBuffer::from(all_values),
167                LanceBuffer::from(all_lengths),
168            ],
169            chunks,
170        ))
171    }
172
173    /// Encodes a chunk of data using RLE compression with dynamic boundary detection.
174    ///
175    /// This function processes values sequentially, detecting runs (sequences of identical values)
176    /// and encoding them as (value, length) pairs. It dynamically determines whether this chunk
177    /// should be the last chunk based on how many values were processed.
178    ///
179    /// # Key Features:
180    /// - Tracks byte usage to ensure we don't exceed MAX_MINIBLOCK_BYTES
181    /// - Maintains power-of-2 checkpoints for non-last chunks
182    /// - Splits long runs (>255) into multiple entries
183    /// - Dynamically determines if this is the last chunk
184    ///
185    /// # Returns:
186    /// - num_runs: Number of runs encoded
187    /// - values_processed: Number of input values processed
188    /// - is_last_chunk: Whether this chunk processed all remaining values
189    fn encode_chunk_rolling<T>(
190        &self,
191        data: &LanceBuffer,
192        offset: usize,
193        values_remaining: usize,
194        all_values: &mut Vec<u8>,
195        all_lengths: &mut Vec<u8>,
196    ) -> (usize, usize, bool)
197    where
198        T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType,
199    {
200        let type_size = std::mem::size_of::<T>();
201
202        let chunk_start = offset * type_size;
203        let max_by_count = MAX_MINIBLOCK_VALUES as usize;
204        let max_values = values_remaining.min(max_by_count);
205        let chunk_end = chunk_start + max_values * type_size;
206
207        if chunk_start >= data.len() {
208            return (0, 0, false);
209        }
210
211        let chunk_len = chunk_end.min(data.len()) - chunk_start;
212        let chunk_buffer = data.slice_with_length(chunk_start, chunk_len);
213        let typed_data_ref = chunk_buffer.borrow_to_typed_slice::<T>();
214        let typed_data: &[T] = typed_data_ref.as_ref();
215
216        if typed_data.is_empty() {
217            return (0, 0, false);
218        }
219
220        // Record starting positions for this chunk
221        let values_start = all_values.len();
222
223        let mut current_value = typed_data[0];
224        let mut current_length = 1u64;
225        let mut bytes_used = 0usize;
226        let mut total_values_encoded = 0usize; // Track total encoded values
227
228        // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes.
229        //
230        // We start from a slightly larger minimum checkpoint for smaller types since
231        // they encode more compactly and are less likely to hit MAX_MINIBLOCK_BYTES.
232        let min_checkpoint_log2 = match type_size {
233            1 => 8, // 256
234            2 => 7, // 128
235            _ => 6, // 64
236        };
237        let max_checkpoint_log2 = (values_remaining.min(MAX_MINIBLOCK_VALUES as usize))
238            .next_power_of_two()
239            .ilog2();
240        let mut checkpoint_log2 = min_checkpoint_log2;
241
242        // Save state at checkpoints so we can roll back if needed
243        let mut last_checkpoint_state = None;
244
245        for &value in typed_data[1..].iter() {
246            if value == current_value {
247                current_length += 1;
248            } else {
249                // Calculate space needed (may need multiple u8s if run > 255)
250                let run_chunks = current_length.div_ceil(255) as usize;
251                let bytes_needed = run_chunks * (type_size + 1);
252
253                // Stop if adding this run would exceed byte limit
254                if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
255                    if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
256                        // Roll back to last power-of-2 checkpoint
257                        all_values.truncate(val_pos);
258                        all_lengths.truncate(len_pos);
259                        let num_runs = (val_pos - values_start) / type_size;
260                        return (num_runs, checkpoint_values, false);
261                    }
262                    break;
263                }
264
265                bytes_used += self.add_run(&current_value, current_length, all_values, all_lengths);
266                total_values_encoded += current_length as usize;
267                current_value = value;
268                current_length = 1;
269            }
270
271            // Check if we reached a power-of-2 checkpoint.
272            while checkpoint_log2 <= max_checkpoint_log2 {
273                let checkpoint_values = 1usize << checkpoint_log2;
274                if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values
275                {
276                    break;
277                }
278                last_checkpoint_state = Some((
279                    all_values.len(),
280                    all_lengths.len(),
281                    bytes_used,
282                    checkpoint_values,
283                ));
284                checkpoint_log2 += 1;
285            }
286        }
287
288        // After the loop, we always have a pending run that needs to be added
289        // unless we've exceeded the byte limit
290        if current_length > 0 {
291            let run_chunks = current_length.div_ceil(255) as usize;
292            let bytes_needed = run_chunks * (type_size + 1);
293
294            if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
295                let _ = self.add_run(&current_value, current_length, all_values, all_lengths);
296                total_values_encoded += current_length as usize;
297            }
298        }
299
300        // Determine if we've processed all remaining values
301        let is_last_chunk = total_values_encoded == values_remaining;
302
303        // Non-last chunks must have power-of-2 values for miniblock format
304        if !is_last_chunk {
305            if total_values_encoded.is_power_of_two() {
306                // Already at power-of-2 boundary
307            } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
308                // Roll back to last valid checkpoint
309                all_values.truncate(val_pos);
310                all_lengths.truncate(len_pos);
311                let num_runs = (val_pos - values_start) / type_size;
312                return (num_runs, checkpoint_values, false);
313            } else {
314                // No valid checkpoint, can't create a valid chunk
315                return (0, 0, false);
316            }
317        }
318
319        let num_runs = (all_values.len() - values_start) / type_size;
320        (num_runs, total_values_encoded, is_last_chunk)
321    }
322
323    fn add_run<T>(
324        &self,
325        value: &T,
326        length: u64,
327        all_values: &mut Vec<u8>,
328        all_lengths: &mut Vec<u8>,
329    ) -> usize
330    where
331        T: bytemuck::Pod,
332    {
333        let value_bytes = bytemuck::bytes_of(value);
334        let type_size = std::mem::size_of::<T>();
335        let num_full_chunks = (length / 255) as usize;
336        let remainder = (length % 255) as u8;
337
338        let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
339        all_values.reserve(total_chunks * type_size);
340        all_lengths.reserve(total_chunks);
341
342        for _ in 0..num_full_chunks {
343            all_values.extend_from_slice(value_bytes);
344            all_lengths.push(255);
345        }
346
347        if remainder > 0 {
348            all_values.extend_from_slice(value_bytes);
349            all_lengths.push(remainder);
350        }
351
352        total_chunks * (type_size + 1)
353    }
354}
355
356impl MiniBlockCompressor for RleMiniBlockEncoder {
357    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
358        match data {
359            DataBlock::FixedWidth(fixed_width) => {
360                let num_values = fixed_width.num_values;
361                let bits_per_value = fixed_width.bits_per_value;
362
363                let (all_buffers, chunks) =
364                    self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
365
366                let compressed = MiniBlockCompressed {
367                    data: all_buffers,
368                    chunks,
369                    num_values,
370                };
371
372                let encoding = ProtobufUtils21::rle(
373                    ProtobufUtils21::flat(bits_per_value, None),
374                    ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
375                );
376
377                Ok((compressed, encoding))
378            }
379            _ => Err(Error::InvalidInput {
380                location: location!(),
381                source: "RLE encoding only supports FixedWidth data blocks".into(),
382            }),
383        }
384    }
385}
386
387/// RLE decompressor for miniblock format
388#[derive(Debug)]
389pub struct RleMiniBlockDecompressor {
390    bits_per_value: u64,
391}
392
393impl RleMiniBlockDecompressor {
394    pub fn new(bits_per_value: u64) -> Self {
395        Self { bits_per_value }
396    }
397
398    fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
399        if num_values == 0 {
400            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
401                bits_per_value: self.bits_per_value,
402                data: LanceBuffer::from(vec![]),
403                num_values: 0,
404                block_info: BlockInfo::default(),
405            }));
406        }
407
408        assert_eq!(
409            data.len(),
410            2,
411            "RLE decompressor expects exactly 2 buffers, got {}",
412            data.len()
413        );
414
415        let values_buffer = &data[0];
416        let lengths_buffer = &data[1];
417
418        let decoded_data = match self.bits_per_value {
419            8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
420            16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
421            32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
422            64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
423            _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
424        };
425
426        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
427            bits_per_value: self.bits_per_value,
428            data: decoded_data,
429            num_values,
430            block_info: BlockInfo::default(),
431        }))
432    }
433
434    fn decode_generic<T>(
435        &self,
436        values_buffer: &LanceBuffer,
437        lengths_buffer: &LanceBuffer,
438        num_values: u64,
439    ) -> Result<LanceBuffer>
440    where
441        T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType,
442    {
443        let type_size = std::mem::size_of::<T>();
444
445        if values_buffer.is_empty() || lengths_buffer.is_empty() {
446            if num_values == 0 {
447                return Ok(LanceBuffer::empty());
448            } else {
449                return Err(Error::InvalidInput {
450                    location: location!(),
451                    source: format!("Empty buffers but expected {} values", num_values).into(),
452                });
453            }
454        }
455
456        if values_buffer.len() % type_size != 0 || lengths_buffer.is_empty() {
457            return Err(Error::InvalidInput {
458                location: location!(),
459                source: format!(
460                    "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
461                    std::any::type_name::<T>(),
462                    values_buffer.len(),
463                    type_size,
464                    lengths_buffer.len()
465                )
466                .into(),
467            });
468        }
469
470        let num_runs = values_buffer.len() / type_size;
471        let num_length_entries = lengths_buffer.len();
472        assert_eq!(
473            num_runs, num_length_entries,
474            "Inconsistent RLE buffers: {} runs but {} length entries",
475            num_runs, num_length_entries
476        );
477
478        let values_ref = values_buffer.borrow_to_typed_slice::<T>();
479        let values: &[T] = values_ref.as_ref();
480        let lengths: &[u8] = lengths_buffer.as_ref();
481
482        let expected_value_count = num_values as usize;
483        let mut decoded: Vec<T> = Vec::with_capacity(expected_value_count);
484
485        for (value, &length) in values.iter().zip(lengths.iter()) {
486            if decoded.len() == expected_value_count {
487                break;
488            }
489
490            if length == 0 {
491                return Err(Error::InvalidInput {
492                    location: location!(),
493                    source: "RLE decoding encountered a zero run length".into(),
494                });
495            }
496
497            let remaining = expected_value_count - decoded.len();
498            let write_len = (length as usize).min(remaining);
499
500            decoded.resize(decoded.len() + write_len, *value);
501        }
502
503        if decoded.len() != expected_value_count {
504            return Err(Error::InvalidInput {
505                location: location!(),
506                source: format!(
507                    "RLE decoding produced {} values, expected {}",
508                    decoded.len(),
509                    expected_value_count
510                )
511                .into(),
512            });
513        }
514
515        trace!(
516            "RLE decoded {} {} values",
517            num_values,
518            std::any::type_name::<T>()
519        );
520        Ok(LanceBuffer::reinterpret_vec(decoded))
521    }
522}
523
524impl MiniBlockDecompressor for RleMiniBlockDecompressor {
525    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
526        self.decode_data(data, num_values)
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use crate::data::DataBlock;
534    use crate::encodings::logical::primitive::miniblock::MAX_MINIBLOCK_VALUES;
535    use arrow_array::Int32Array;
536
537    // ========== Core Functionality Tests ==========
538
539    #[test]
540    fn test_basic_rle_encoding() {
541        let encoder = RleMiniBlockEncoder::new();
542
543        // Test basic RLE pattern: [1, 1, 1, 2, 2, 3, 3, 3, 3]
544        let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
545        let data_block = DataBlock::from_array(array);
546
547        let (compressed, _) = encoder.compress(data_block).unwrap();
548
549        assert_eq!(compressed.num_values, 9);
550        assert_eq!(compressed.chunks.len(), 1);
551
552        // Verify compression happened (3 runs instead of 9 values)
553        let values_buffer = &compressed.data[0];
554        let lengths_buffer = &compressed.data[1];
555        assert_eq!(values_buffer.len(), 12); // 3 i32 values
556        assert_eq!(lengths_buffer.len(), 3); // 3 u8 lengths
557    }
558
559    #[test]
560    fn test_long_run_splitting() {
561        let encoder = RleMiniBlockEncoder::new();
562
563        // Create a run longer than 255 to test splitting
564        let mut data = vec![42i32; 1000]; // Will be split into 255+255+255+235
565        data.extend(&[100i32; 300]); // Will be split into 255+45
566
567        let array = Int32Array::from(data);
568        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
569
570        // Should have 6 runs total (4 for first value, 2 for second)
571        let lengths_buffer = &compressed.data[1];
572        assert_eq!(lengths_buffer.len(), 6);
573    }
574
575    // ========== Round-trip Tests for Different Types ==========
576
577    #[test]
578    fn test_round_trip_all_types() {
579        // Test u8
580        test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
581
582        // Test u16
583        test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
584
585        // Test i32
586        test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
587
588        // Test u64
589        test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
590    }
591
592    fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
593    where
594        T: bytemuck::Pod + PartialEq + std::fmt::Debug,
595    {
596        let encoder = RleMiniBlockEncoder::new();
597        let bytes: Vec<u8> = data
598            .iter()
599            .flat_map(|v| bytemuck::bytes_of(v))
600            .copied()
601            .collect();
602
603        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
604            bits_per_value,
605            data: LanceBuffer::from(bytes),
606            num_values: data.len() as u64,
607            block_info: BlockInfo::default(),
608        });
609
610        let (compressed, _) = encoder.compress(block).unwrap();
611        let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
612        let decompressed = decompressor
613            .decompress(compressed.data, compressed.num_values)
614            .unwrap();
615
616        match decompressed {
617            DataBlock::FixedWidth(ref block) => {
618                // Verify the decompressed data length matches expected
619                assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
620            }
621            _ => panic!("Expected FixedWidth block"),
622        }
623    }
624
625    // ========== Chunk Boundary Tests ==========
626
627    #[test]
628    fn test_power_of_two_chunking() {
629        let encoder = RleMiniBlockEncoder::new();
630
631        // Create data that will require multiple chunks
632        let test_sizes = vec![1000, 2500, 5000, 10000];
633
634        for size in test_sizes {
635            let data: Vec<i32> = (0..size)
636                .map(|i| i / 50) // Create runs of 50
637                .collect();
638
639            let array = Int32Array::from(data);
640            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
641
642            // Verify all non-last chunks have power-of-2 values
643            for (i, chunk) in compressed.chunks.iter().enumerate() {
644                if i < compressed.chunks.len() - 1 {
645                    assert!(chunk.log_num_values > 0);
646                    let chunk_values = 1u64 << chunk.log_num_values;
647                    assert!(chunk_values.is_power_of_two());
648                    assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
649                } else {
650                    assert_eq!(chunk.log_num_values, 0);
651                }
652            }
653        }
654    }
655
656    // ========== Error Handling Tests ==========
657
658    #[test]
659    #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
660    fn test_invalid_buffer_count() {
661        let decompressor = RleMiniBlockDecompressor::new(32);
662        let _ = decompressor.decompress(vec![LanceBuffer::from(vec![1, 2, 3, 4])], 10);
663    }
664
665    #[test]
666    #[should_panic(expected = "Inconsistent RLE buffers")]
667    fn test_buffer_consistency() {
668        let decompressor = RleMiniBlockDecompressor::new(32);
669        let values = LanceBuffer::from(vec![1, 0, 0, 0]); // 1 i32 value
670        let lengths = LanceBuffer::from(vec![5, 10]); // 2 lengths - mismatch!
671        let _ = decompressor.decompress(vec![values, lengths], 15);
672    }
673
674    #[test]
675    fn test_empty_data_handling() {
676        let encoder = RleMiniBlockEncoder::new();
677
678        // Test empty block
679        let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
680            bits_per_value: 32,
681            data: LanceBuffer::from(vec![]),
682            num_values: 0,
683            block_info: BlockInfo::default(),
684        });
685
686        let (compressed, _) = encoder.compress(empty_block).unwrap();
687        assert_eq!(compressed.num_values, 0);
688        assert!(compressed.data.is_empty());
689
690        // Test decompression of empty data
691        let decompressor = RleMiniBlockDecompressor::new(32);
692        let decompressed = decompressor.decompress(vec![], 0).unwrap();
693
694        match decompressed {
695            DataBlock::FixedWidth(ref block) => {
696                assert_eq!(block.num_values, 0);
697                assert_eq!(block.data.len(), 0);
698            }
699            _ => panic!("Expected FixedWidth block"),
700        }
701    }
702
703    // ========== Integration Test ==========
704
705    #[test]
706    fn test_multi_chunk_round_trip() {
707        let encoder = RleMiniBlockEncoder::new();
708
709        // Create data that spans multiple chunks with mixed patterns
710        let mut data = Vec::new();
711
712        // High compression section
713        data.extend(vec![999i32; 2000]);
714        // Low compression section
715        data.extend(0..1000);
716        // Another high compression section
717        data.extend(vec![777i32; 2000]);
718
719        let array = Int32Array::from(data.clone());
720        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
721
722        // Manually decompress all chunks
723        let mut reconstructed = Vec::new();
724        let mut values_offset = 0usize;
725        let mut lengths_offset = 0usize;
726        let mut values_processed = 0u64;
727
728        // We now have exactly 2 global buffers
729        assert_eq!(compressed.data.len(), 2);
730        let global_values = &compressed.data[0];
731        let global_lengths = &compressed.data[1];
732
733        for chunk in &compressed.chunks {
734            let chunk_values = if chunk.log_num_values > 0 {
735                1u64 << chunk.log_num_values
736            } else {
737                compressed.num_values - values_processed
738            };
739
740            // Extract chunk buffers from global buffers using buffer_sizes
741            let values_size = chunk.buffer_sizes[0] as usize;
742            let lengths_size = chunk.buffer_sizes[1] as usize;
743
744            let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
745            let chunk_lengths_buffer =
746                global_lengths.slice_with_length(lengths_offset, lengths_size);
747
748            let decompressor = RleMiniBlockDecompressor::new(32);
749            let chunk_data = decompressor
750                .decompress(
751                    vec![chunk_values_buffer, chunk_lengths_buffer],
752                    chunk_values,
753                )
754                .unwrap();
755
756            values_offset += values_size;
757            lengths_offset += lengths_size;
758            values_processed += chunk_values;
759
760            match chunk_data {
761                DataBlock::FixedWidth(ref block) => {
762                    let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
763                    reconstructed.extend_from_slice(values);
764                }
765                _ => panic!("Expected FixedWidth block"),
766            }
767        }
768
769        assert_eq!(reconstructed, data);
770    }
771
772    #[test]
773    fn test_1024_boundary_conditions() {
774        // Comprehensive test for various boundary conditions at 1024 values
775        // This consolidates multiple bug tests that were previously separate
776        let encoder = RleMiniBlockEncoder::new();
777        let decompressor = RleMiniBlockDecompressor::new(32);
778
779        let test_cases = [
780            ("runs_of_2", {
781                let mut data = Vec::new();
782                for i in 0..512 {
783                    data.push(i);
784                    data.push(i);
785                }
786                data
787            }),
788            ("single_run_1024", vec![42i32; 1024]),
789            ("alternating_values", {
790                let mut data = Vec::new();
791                for i in 0..1024 {
792                    data.push(i % 2);
793                }
794                data
795            }),
796            ("run_boundary_255s", {
797                let mut data = Vec::new();
798                data.extend(vec![1i32; 255]);
799                data.extend(vec![2i32; 255]);
800                data.extend(vec![3i32; 255]);
801                data.extend(vec![4i32; 255]);
802                data.extend(vec![5i32; 4]);
803                data
804            }),
805            ("unique_values_1024", (0..1024).collect::<Vec<_>>()),
806            ("unique_plus_duplicate", {
807                // 1023 unique values followed by one duplicate (regression test)
808                let mut data = Vec::new();
809                for i in 0..1023 {
810                    data.push(i);
811                }
812                data.push(1022i32); // Last value same as second-to-last
813                data
814            }),
815            ("bug_4092_pattern", {
816                // Test exact scenario that produces 4092 bytes instead of 4096
817                let mut data = Vec::new();
818                for i in 0..1022 {
819                    data.push(i);
820                }
821                data.push(999999i32);
822                data.push(999999i32);
823                data
824            }),
825        ];
826
827        for (test_name, data) in test_cases.iter() {
828            assert_eq!(data.len(), 1024, "Test case {} has wrong length", test_name);
829
830            // Compress the data
831            let array = Int32Array::from(data.clone());
832            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
833
834            // Decompress and verify
835            match decompressor.decompress(compressed.data, compressed.num_values) {
836                Ok(decompressed) => match decompressed {
837                    DataBlock::FixedWidth(ref block) => {
838                        let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
839                        assert_eq!(
840                            values.len(),
841                            1024,
842                            "Test case {} got {} values, expected 1024",
843                            test_name,
844                            values.len()
845                        );
846                        assert_eq!(
847                            block.data.len(),
848                            4096,
849                            "Test case {} got {} bytes, expected 4096",
850                            test_name,
851                            block.data.len()
852                        );
853                        assert_eq!(values, &data[..], "Test case {} data mismatch", test_name);
854                    }
855                    _ => panic!("Test case {} expected FixedWidth block", test_name),
856                },
857                Err(e) => {
858                    if e.to_string().contains("4092") {
859                        panic!("Test case {} found bug 4092: {}", test_name, e);
860                    }
861                    panic!("Test case {} failed with error: {}", test_name, e);
862                }
863            }
864        }
865    }
866
867    #[test]
868    fn test_low_repetition_50pct_bug() {
869        // Test case that reproduces the 4092 bytes bug with low repetition (50%)
870        // This simulates the 1M benchmark case
871        let encoder = RleMiniBlockEncoder::new();
872
873        // Create 1M values with low repetition (50% chance of change)
874        let num_values = 1_048_576; // 1M values
875        let mut data = Vec::with_capacity(num_values);
876        let mut value = 0i32;
877        let mut rng = 12345u64; // Simple deterministic RNG
878
879        for _ in 0..num_values {
880            data.push(value);
881            // Simple LCG for deterministic randomness
882            rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
883            // 50% chance to increment value
884            if (rng >> 16) & 1 == 1 {
885                value += 1;
886            }
887        }
888
889        let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
890
891        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
892            bits_per_value: 32,
893            data: LanceBuffer::from(bytes),
894            num_values: num_values as u64,
895            block_info: BlockInfo::default(),
896        });
897
898        let (compressed, _) = encoder.compress(block).unwrap();
899
900        // Debug first few chunks
901        for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
902            let _chunk_values = if chunk.log_num_values > 0 {
903                1 << chunk.log_num_values
904            } else {
905                // Last chunk - calculate remaining
906                let prev_total: usize = compressed.chunks[..i]
907                    .iter()
908                    .map(|c| 1usize << c.log_num_values)
909                    .sum();
910                num_values - prev_total
911            };
912        }
913
914        // Try to decompress
915        let decompressor = RleMiniBlockDecompressor::new(32);
916        match decompressor.decompress(compressed.data, compressed.num_values) {
917            Ok(decompressed) => match decompressed {
918                DataBlock::FixedWidth(ref block) => {
919                    assert_eq!(
920                        block.data.len(),
921                        num_values * 4,
922                        "Expected {} bytes but got {}",
923                        num_values * 4,
924                        block.data.len()
925                    );
926                }
927                _ => panic!("Expected FixedWidth block"),
928            },
929            Err(e) => {
930                if e.to_string().contains("4092") {
931                    panic!("Bug reproduced! {}", e);
932                } else {
933                    panic!("Unexpected error: {}", e);
934                }
935            }
936        }
937    }
938
939    // ========== Encoding Verification Tests ==========
940
941    #[test_log::test(tokio::test)]
942    async fn test_rle_encoding_verification() {
943        use crate::testing::{check_round_trip_encoding_of_data, TestCases};
944        use crate::version::LanceFileVersion;
945        use arrow_array::{Array, Int32Array};
946        use lance_datagen::{ArrayGenerator, RowCount};
947        use std::collections::HashMap;
948        use std::sync::Arc;
949
950        let test_cases = TestCases::default()
951            .with_expected_encoding("rle")
952            .with_min_file_version(LanceFileVersion::V2_1);
953
954        // Test both explicit metadata and automatic selection
955        // 1. Test with explicit RLE threshold metadata (also disable BSS)
956        let mut metadata_explicit = HashMap::new();
957        metadata_explicit.insert(
958            "lance-encoding:rle-threshold".to_string(),
959            "0.8".to_string(),
960        );
961        metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
962
963        let mut generator = RleDataGenerator::new(vec![
964            i32::MIN,
965            i32::MIN,
966            i32::MIN,
967            i32::MIN,
968            i32::MIN + 1,
969            i32::MIN + 1,
970            i32::MIN + 1,
971            i32::MIN + 1,
972            i32::MIN + 2,
973            i32::MIN + 2,
974            i32::MIN + 2,
975            i32::MIN + 2,
976        ]);
977        let data_explicit = generator.generate_default(RowCount::from(10000)).unwrap();
978        check_round_trip_encoding_of_data(vec![data_explicit], &test_cases, metadata_explicit)
979            .await;
980
981        // 2. Test automatic RLE selection based on data characteristics
982        // 80% repetition should trigger RLE (> default 50% threshold).
983        //
984        // Use values with the high bit set so bitpacking can't shrink the values.
985        // Explicitly disable BSS to ensure RLE is tested
986        let mut metadata = HashMap::new();
987        metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
988
989        let mut values = vec![i32::MIN; 8000]; // 80% repetition
990        values.extend(
991            [
992                i32::MIN + 1,
993                i32::MIN + 2,
994                i32::MIN + 3,
995                i32::MIN + 4,
996                i32::MIN + 5,
997            ]
998            .repeat(400),
999        ); // 20% variety
1000        let arr = Arc::new(Int32Array::from(values)) as Arc<dyn Array>;
1001        check_round_trip_encoding_of_data(vec![arr], &test_cases, metadata).await;
1002    }
1003
1004    /// Generator that produces repetitive patterns suitable for RLE
1005    #[derive(Debug)]
1006    struct RleDataGenerator {
1007        pattern: Vec<i32>,
1008        idx: usize,
1009    }
1010
1011    impl RleDataGenerator {
1012        fn new(pattern: Vec<i32>) -> Self {
1013            Self { pattern, idx: 0 }
1014        }
1015    }
1016
1017    impl lance_datagen::ArrayGenerator for RleDataGenerator {
1018        fn generate(
1019            &mut self,
1020            _length: lance_datagen::RowCount,
1021            _rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
1022        ) -> std::result::Result<std::sync::Arc<dyn arrow_array::Array>, arrow_schema::ArrowError>
1023        {
1024            use arrow_array::Int32Array;
1025            use std::sync::Arc;
1026
1027            // Generate enough repetitive data to trigger RLE
1028            let mut values = Vec::new();
1029            for _ in 0..10000 {
1030                values.push(self.pattern[self.idx]);
1031                self.idx = (self.idx + 1) % self.pattern.len();
1032            }
1033            Ok(Arc::new(Int32Array::from(values)))
1034        }
1035
1036        fn data_type(&self) -> &arrow_schema::DataType {
1037            &arrow_schema::DataType::Int32
1038        }
1039
1040        fn element_size_bytes(&self) -> Option<lance_datagen::ByteCount> {
1041            Some(lance_datagen::ByteCount::from(4))
1042        }
1043    }
1044}