lance_encoding/encodings/physical/
rle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # RLE (Run-Length Encoding) Miniblock Format
5//!
6//! RLE compression for Lance miniblock format, optimized for data with repeated values.
7//!
8//! ## Encoding Format
9//!
10//! RLE uses a dual-buffer format to store compressed data:
11//!
12//! - **Values Buffer**: Stores unique values in their original data type
13//! - **Lengths Buffer**: Stores the repeat count for each value as u8
14//!
15//! ### Example
16//!
17//! Input data: `[1, 1, 1, 2, 2, 3, 3, 3, 3]`
18//!
19//! Encoded as:
20//! - Values buffer: `[1, 2, 3]` (3 × 4 bytes for i32)
21//! - Lengths buffer: `[3, 2, 4]` (3 × 1 byte for u8)
22//!
23//! ### Long Run Handling
24//!
25//! When a run exceeds 255 values, it is split into multiple runs of 255
26//! followed by a final run with the remainder. For example, a run of 1000
27//! identical values becomes 4 runs: [255, 255, 255, 235].
28//!
29//! ## Supported Types
30//!
31//! RLE supports all fixed-width primitive types:
32//! - 8-bit: u8, i8
33//! - 16-bit: u16, i16
34//! - 32-bit: u32, i32, f32
35//! - 64-bit: u64, i64, f64
36//!
37//! ## Compression Strategy
38//!
39//! RLE is automatically selected when:
40//! - The run count (number of value transitions) < 50% of total values
41//! - This indicates sufficient repetition for RLE to be effective
42//!
43//! ## Chunk Handling
44//!
45//! - Maximum chunk size: 4096 values (miniblock constraint)
46//! - All chunks share two global buffers (values and lengths)
47//! - Each chunk's buffer_sizes indicate its portion of the global buffers
48//! - Non-last chunks always contain power-of-2 values
49//! - Byte limits are enforced dynamically during encoding
50
51use log::trace;
52use snafu::location;
53
54use crate::buffer::LanceBuffer;
55use crate::compression::MiniBlockDecompressor;
56use crate::data::DataBlock;
57use crate::data::{BlockInfo, FixedWidthDataBlock};
58use crate::encodings::logical::primitive::miniblock::{
59    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
60    MAX_MINIBLOCK_VALUES,
61};
62use crate::format::{pb, ProtobufUtils};
63
64use lance_core::{Error, Result};
65
66/// RLE encoder for miniblock format
67#[derive(Debug, Default)]
68pub struct RleMiniBlockEncoder;
69
70impl RleMiniBlockEncoder {
71    pub fn new() -> Self {
72        Self
73    }
74
75    fn encode_data(
76        &self,
77        data: &LanceBuffer,
78        num_values: u64,
79        bits_per_value: u64,
80    ) -> Result<(Vec<LanceBuffer>, Vec<MiniBlockChunk>)> {
81        if num_values == 0 {
82            return Ok((Vec::new(), Vec::new()));
83        }
84
85        let bytes_per_value = (bits_per_value / 8) as usize;
86
87        // Pre-allocate global buffers with estimated capacity
88        // Assume average compression ratio of ~10:1 (10 values per run)
89        let estimated_runs = num_values as usize / 10;
90        let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value);
91        let mut all_lengths = Vec::with_capacity(estimated_runs);
92        let mut chunks = Vec::new();
93
94        let mut offset = 0usize;
95        let mut values_remaining = num_values as usize;
96
97        while values_remaining > 0 {
98            let values_start = all_values.len();
99            let lengths_start = all_lengths.len();
100
101            let (_num_runs, values_processed, is_last_chunk) = match bits_per_value {
102                8 => self.encode_chunk_rolling::<u8>(
103                    data,
104                    offset,
105                    values_remaining,
106                    &mut all_values,
107                    &mut all_lengths,
108                ),
109                16 => self.encode_chunk_rolling::<u16>(
110                    data,
111                    offset,
112                    values_remaining,
113                    &mut all_values,
114                    &mut all_lengths,
115                ),
116                32 => self.encode_chunk_rolling::<u32>(
117                    data,
118                    offset,
119                    values_remaining,
120                    &mut all_values,
121                    &mut all_lengths,
122                ),
123                64 => self.encode_chunk_rolling::<u64>(
124                    data,
125                    offset,
126                    values_remaining,
127                    &mut all_values,
128                    &mut all_lengths,
129                ),
130                _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"),
131            };
132
133            if values_processed == 0 {
134                break;
135            }
136
137            let log_num_values = if is_last_chunk {
138                0
139            } else {
140                assert!(
141                    values_processed.is_power_of_two(),
142                    "Non-last chunk must have power-of-2 values"
143                );
144                values_processed.ilog2() as u8
145            };
146
147            let values_size = all_values.len() - values_start;
148            let lengths_size = all_lengths.len() - lengths_start;
149
150            let chunk = MiniBlockChunk {
151                buffer_sizes: vec![values_size as u16, lengths_size as u16],
152                log_num_values,
153            };
154
155            chunks.push(chunk);
156
157            offset += values_processed;
158            values_remaining -= values_processed;
159        }
160
161        // Return exactly two buffers: values and lengths
162        Ok((
163            vec![
164                LanceBuffer::Owned(all_values),
165                LanceBuffer::Owned(all_lengths),
166            ],
167            chunks,
168        ))
169    }
170
171    /// Encodes a chunk of data using RLE compression with dynamic boundary detection.
172    ///
173    /// This function processes values sequentially, detecting runs (sequences of identical values)
174    /// and encoding them as (value, length) pairs. It dynamically determines whether this chunk
175    /// should be the last chunk based on how many values were processed.
176    ///
177    /// # Key Features:
178    /// - Tracks byte usage to ensure we don't exceed MAX_MINIBLOCK_BYTES
179    /// - Maintains power-of-2 checkpoints for non-last chunks
180    /// - Splits long runs (>255) into multiple entries
181    /// - Dynamically determines if this is the last chunk
182    ///
183    /// # Returns:
184    /// - num_runs: Number of runs encoded
185    /// - values_processed: Number of input values processed
186    /// - is_last_chunk: Whether this chunk processed all remaining values
187    fn encode_chunk_rolling<T>(
188        &self,
189        data: &LanceBuffer,
190        offset: usize,
191        values_remaining: usize,
192        all_values: &mut Vec<u8>,
193        all_lengths: &mut Vec<u8>,
194    ) -> (usize, usize, bool)
195    where
196        T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug,
197    {
198        let type_size = std::mem::size_of::<T>();
199        let data_slice = data.as_ref();
200
201        let chunk_start = offset * type_size;
202        let max_by_count = MAX_MINIBLOCK_VALUES as usize;
203        let max_values = values_remaining.min(max_by_count);
204        let chunk_end = chunk_start + max_values * type_size;
205
206        if chunk_start >= data_slice.len() {
207            return (0, 0, false);
208        }
209
210        let chunk_data = &data_slice[chunk_start..chunk_end.min(data_slice.len())];
211        let typed_data: &[T] = bytemuck::cast_slice(chunk_data);
212
213        if typed_data.is_empty() {
214            return (0, 0, false);
215        }
216
217        // Record starting positions for this chunk
218        let values_start = all_values.len();
219
220        let mut current_value = typed_data[0];
221        let mut current_length = 1u64;
222        let mut bytes_used = 0usize;
223        let mut total_values_encoded = 0usize; // Track total encoded values
224
225        // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes
226        // For smaller data types like u8, we can use larger initial checkpoints
227        // since they take less space per value
228        let checkpoints = match type_size {
229            1 => vec![256, 512, 1024, 2048, 4096], // u8 can start from 256
230            2 => vec![128, 256, 512, 1024, 2048, 4096], // u16 can start from 128
231            _ => vec![64, 128, 256, 512, 1024, 2048, 4096], // u32/u64: no difference
232        };
233        let valid_checkpoints: Vec<usize> = checkpoints
234            .into_iter()
235            .filter(|&p| p <= values_remaining)
236            .collect();
237        let mut checkpoint_idx = 0;
238
239        // Save state at checkpoints so we can roll back if needed
240        let mut last_checkpoint_state = None;
241
242        for &value in typed_data[1..].iter() {
243            if value == current_value {
244                current_length += 1;
245            } else {
246                // Calculate space needed (may need multiple u8s if run > 255)
247                let run_chunks = current_length.div_ceil(255) as usize;
248                let bytes_needed = run_chunks * (type_size + 1);
249
250                // Stop if adding this run would exceed byte limit
251                if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize {
252                    if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
253                        // Roll back to last power-of-2 checkpoint
254                        all_values.truncate(val_pos);
255                        all_lengths.truncate(len_pos);
256                        let num_runs = (val_pos - values_start) / type_size;
257                        return (num_runs, checkpoint_values, false);
258                    }
259                    break;
260                }
261
262                bytes_used += self.add_run(&current_value, current_length, all_values, all_lengths);
263                total_values_encoded += current_length as usize;
264                current_value = value;
265                current_length = 1;
266            }
267
268            // Check if we reached a power-of-2 checkpoint
269            if checkpoint_idx < valid_checkpoints.len()
270                && total_values_encoded >= valid_checkpoints[checkpoint_idx]
271            {
272                last_checkpoint_state = Some((
273                    all_values.len(),
274                    all_lengths.len(),
275                    bytes_used,
276                    valid_checkpoints[checkpoint_idx],
277                ));
278                checkpoint_idx += 1;
279            }
280        }
281
282        // After the loop, we always have a pending run that needs to be added
283        // unless we've exceeded the byte limit
284        if current_length > 0 {
285            let run_chunks = current_length.div_ceil(255) as usize;
286            let bytes_needed = run_chunks * (type_size + 1);
287
288            if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize {
289                let _ = self.add_run(&current_value, current_length, all_values, all_lengths);
290                total_values_encoded += current_length as usize;
291            }
292        }
293
294        // Determine if we've processed all remaining values
295        let is_last_chunk = total_values_encoded == values_remaining;
296
297        // Non-last chunks must have power-of-2 values for miniblock format
298        if !is_last_chunk {
299            if total_values_encoded.is_power_of_two() {
300                // Already at power-of-2 boundary
301            } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state {
302                // Roll back to last valid checkpoint
303                all_values.truncate(val_pos);
304                all_lengths.truncate(len_pos);
305                let num_runs = (val_pos - values_start) / type_size;
306                return (num_runs, checkpoint_values, false);
307            } else {
308                // No valid checkpoint, can't create a valid chunk
309                return (0, 0, false);
310            }
311        }
312
313        let num_runs = (all_values.len() - values_start) / type_size;
314        (num_runs, total_values_encoded, is_last_chunk)
315    }
316
317    fn add_run<T>(
318        &self,
319        value: &T,
320        length: u64,
321        all_values: &mut Vec<u8>,
322        all_lengths: &mut Vec<u8>,
323    ) -> usize
324    where
325        T: bytemuck::Pod,
326    {
327        let value_bytes = bytemuck::bytes_of(value);
328        let type_size = std::mem::size_of::<T>();
329        let num_full_chunks = (length / 255) as usize;
330        let remainder = (length % 255) as u8;
331
332        let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 };
333        all_values.reserve(total_chunks * type_size);
334        all_lengths.reserve(total_chunks);
335
336        for _ in 0..num_full_chunks {
337            all_values.extend_from_slice(value_bytes);
338            all_lengths.push(255);
339        }
340
341        if remainder > 0 {
342            all_values.extend_from_slice(value_bytes);
343            all_lengths.push(remainder);
344        }
345
346        total_chunks * (type_size + 1)
347    }
348}
349
350impl MiniBlockCompressor for RleMiniBlockEncoder {
351    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
352        match data {
353            DataBlock::FixedWidth(fixed_width) => {
354                let num_values = fixed_width.num_values;
355                let bits_per_value = fixed_width.bits_per_value;
356
357                let (all_buffers, chunks) =
358                    self.encode_data(&fixed_width.data, num_values, bits_per_value)?;
359
360                let compressed = MiniBlockCompressed {
361                    data: all_buffers,
362                    chunks,
363                    num_values,
364                };
365
366                let encoding = ProtobufUtils::rle(bits_per_value);
367
368                Ok((compressed, encoding))
369            }
370            _ => Err(Error::InvalidInput {
371                location: location!(),
372                source: "RLE encoding only supports FixedWidth data blocks".into(),
373            }),
374        }
375    }
376}
377
378/// RLE decompressor for miniblock format
379#[derive(Debug)]
380pub struct RleMiniBlockDecompressor {
381    bits_per_value: u64,
382}
383
384impl RleMiniBlockDecompressor {
385    pub fn new(bits_per_value: u64) -> Self {
386        Self { bits_per_value }
387    }
388
389    fn decode_data(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
390        if num_values == 0 {
391            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
392                bits_per_value: self.bits_per_value,
393                data: LanceBuffer::Owned(vec![]),
394                num_values: 0,
395                block_info: BlockInfo::default(),
396            }));
397        }
398
399        assert_eq!(
400            data.len(),
401            2,
402            "RLE decompressor expects exactly 2 buffers, got {}",
403            data.len()
404        );
405
406        let values_buffer = &data[0];
407        let lengths_buffer = &data[1];
408
409        let decoded_data = match self.bits_per_value {
410            8 => self.decode_generic::<u8>(values_buffer, lengths_buffer, num_values)?,
411            16 => self.decode_generic::<u16>(values_buffer, lengths_buffer, num_values)?,
412            32 => self.decode_generic::<u32>(values_buffer, lengths_buffer, num_values)?,
413            64 => self.decode_generic::<u64>(values_buffer, lengths_buffer, num_values)?,
414            _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"),
415        };
416
417        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
418            bits_per_value: self.bits_per_value,
419            data: LanceBuffer::Owned(decoded_data),
420            num_values,
421            block_info: BlockInfo::default(),
422        }))
423    }
424
425    fn decode_generic<T>(
426        &self,
427        values_buffer: &LanceBuffer,
428        lengths_buffer: &LanceBuffer,
429        num_values: u64,
430    ) -> Result<Vec<u8>>
431    where
432        T: bytemuck::Pod + Copy + std::fmt::Debug,
433    {
434        let values_bytes = values_buffer.as_ref();
435        let lengths_bytes = lengths_buffer.as_ref();
436
437        let type_size = std::mem::size_of::<T>();
438
439        if values_bytes.is_empty() || lengths_bytes.is_empty() {
440            if num_values == 0 {
441                return Ok(Vec::new());
442            } else {
443                return Err(Error::InvalidInput {
444                    location: location!(),
445                    source: format!("Empty buffers but expected {} values", num_values).into(),
446                });
447            }
448        }
449
450        if values_bytes.len() % type_size != 0 || lengths_bytes.is_empty() {
451            return Err(Error::InvalidInput {
452                location: location!(),
453                source: format!(
454                    "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes",
455                    std::any::type_name::<T>(),
456                    values_bytes.len(),
457                    type_size,
458                    lengths_bytes.len()
459                )
460                .into(),
461            });
462        }
463
464        let num_runs = values_bytes.len() / type_size;
465        let num_length_entries = lengths_bytes.len();
466        assert_eq!(
467            num_runs, num_length_entries,
468            "Inconsistent RLE buffers: {} runs but {} length entries",
469            num_runs, num_length_entries
470        );
471
472        let values: &[T] = bytemuck::cast_slice(values_bytes);
473        let lengths: &[u8] = lengths_bytes;
474
475        let expected_byte_count = num_values as usize * type_size;
476        let mut decoded = Vec::with_capacity(expected_byte_count);
477
478        for (value, &length) in values.iter().zip(lengths.iter()) {
479            let run_length = length as usize;
480            let bytes_to_write = run_length * type_size;
481            let bytes_of_value = bytemuck::bytes_of(value);
482
483            if decoded.len() + bytes_to_write > expected_byte_count {
484                let remaining_bytes = expected_byte_count - decoded.len();
485                let remaining_values = remaining_bytes / type_size;
486
487                for _ in 0..remaining_values {
488                    decoded.extend_from_slice(bytes_of_value);
489                }
490                break;
491            }
492
493            for _ in 0..run_length {
494                decoded.extend_from_slice(bytes_of_value);
495            }
496        }
497
498        if decoded.len() != expected_byte_count {
499            return Err(Error::InvalidInput {
500                location: location!(),
501                source: format!(
502                    "RLE decoding produced {} bytes, expected {}",
503                    decoded.len(),
504                    expected_byte_count
505                )
506                .into(),
507            });
508        }
509
510        trace!(
511            "RLE decoded {} {} values",
512            num_values,
513            std::any::type_name::<T>()
514        );
515        Ok(decoded)
516    }
517}
518
519impl MiniBlockDecompressor for RleMiniBlockDecompressor {
520    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
521        self.decode_data(data, num_values)
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
529    use crate::data::DataBlock;
530    use arrow_array::Int32Array;
531    use lance_core::datatypes::Field;
532
533    // ========== Core Functionality Tests ==========
534
535    #[test]
536    fn test_basic_rle_encoding() {
537        let encoder = RleMiniBlockEncoder::new();
538
539        // Test basic RLE pattern: [1, 1, 1, 2, 2, 3, 3, 3, 3]
540        let array = Int32Array::from(vec![1, 1, 1, 2, 2, 3, 3, 3, 3]);
541        let data_block = DataBlock::from_array(array);
542
543        let (compressed, _) = encoder.compress(data_block).unwrap();
544
545        assert_eq!(compressed.num_values, 9);
546        assert_eq!(compressed.chunks.len(), 1);
547
548        // Verify compression happened (3 runs instead of 9 values)
549        let values_buffer = &compressed.data[0];
550        let lengths_buffer = &compressed.data[1];
551        assert_eq!(values_buffer.len(), 12); // 3 i32 values
552        assert_eq!(lengths_buffer.len(), 3); // 3 u8 lengths
553    }
554
555    #[test]
556    fn test_long_run_splitting() {
557        let encoder = RleMiniBlockEncoder::new();
558
559        // Create a run longer than 255 to test splitting
560        let mut data = vec![42i32; 1000]; // Will be split into 255+255+255+235
561        data.extend(&[100i32; 300]); // Will be split into 255+45
562
563        let array = Int32Array::from(data);
564        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
565
566        // Should have 6 runs total (4 for first value, 2 for second)
567        let lengths_buffer = &compressed.data[1];
568        assert_eq!(lengths_buffer.len(), 6);
569    }
570
571    #[test]
572    fn test_compression_strategy_selection() {
573        let strategy = DefaultCompressionStrategy;
574        let field = Field::new_arrow("test", arrow_schema::DataType::Int32, false).unwrap();
575
576        // High repetition - should select RLE
577        let repetitive_array = Int32Array::from(vec![1; 1000]);
578        let repetitive_block = DataBlock::from_array(repetitive_array);
579
580        let compressor = strategy
581            .create_miniblock_compressor(&field, &repetitive_block)
582            .unwrap();
583        assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
584
585        // No repetition - should NOT select RLE
586        let unique_array = Int32Array::from((0..1000).collect::<Vec<i32>>());
587        let unique_block = DataBlock::from_array(unique_array);
588
589        let compressor = strategy
590            .create_miniblock_compressor(&field, &unique_block)
591            .unwrap();
592        assert!(!format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
593    }
594
595    // ========== Round-trip Tests for Different Types ==========
596
597    #[test]
598    fn test_round_trip_all_types() {
599        // Test u8
600        test_round_trip_helper(vec![42u8, 42, 42, 100, 100, 255, 255, 255, 255], 8);
601
602        // Test u16
603        test_round_trip_helper(vec![1000u16, 1000, 2000, 2000, 2000, 3000], 16);
604
605        // Test i32
606        test_round_trip_helper(vec![100i32, 100, 100, -200, -200, 300, 300, 300, 300], 32);
607
608        // Test u64
609        test_round_trip_helper(vec![1_000_000_000u64; 5], 64);
610    }
611
612    fn test_round_trip_helper<T>(data: Vec<T>, bits_per_value: u64)
613    where
614        T: bytemuck::Pod + PartialEq + std::fmt::Debug,
615    {
616        let encoder = RleMiniBlockEncoder::new();
617        let bytes: Vec<u8> = data
618            .iter()
619            .flat_map(|v| bytemuck::bytes_of(v))
620            .copied()
621            .collect();
622
623        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
624            bits_per_value,
625            data: LanceBuffer::Owned(bytes),
626            num_values: data.len() as u64,
627            block_info: BlockInfo::default(),
628        });
629
630        let (compressed, _) = encoder.compress(block).unwrap();
631        let decompressor = RleMiniBlockDecompressor::new(bits_per_value);
632        let decompressed = decompressor
633            .decompress(compressed.data, compressed.num_values)
634            .unwrap();
635
636        match decompressed {
637            DataBlock::FixedWidth(ref block) => {
638                // Verify the decompressed data length matches expected
639                assert_eq!(block.data.len(), data.len() * std::mem::size_of::<T>());
640            }
641            _ => panic!("Expected FixedWidth block"),
642        }
643    }
644
645    // ========== Chunk Boundary Tests ==========
646
647    #[test]
648    fn test_power_of_two_chunking() {
649        let encoder = RleMiniBlockEncoder::new();
650
651        // Create data that will require multiple chunks
652        let test_sizes = vec![1000, 2500, 5000, 10000];
653
654        for size in test_sizes {
655            let data: Vec<i32> = (0..size)
656                .map(|i| i / 50) // Create runs of 50
657                .collect();
658
659            let array = Int32Array::from(data);
660            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
661
662            // Verify all non-last chunks have power-of-2 values
663            for (i, chunk) in compressed.chunks.iter().enumerate() {
664                if i < compressed.chunks.len() - 1 {
665                    assert!(chunk.log_num_values > 0);
666                    let chunk_values = 1u64 << chunk.log_num_values;
667                    assert!(chunk_values.is_power_of_two());
668                    assert!(chunk_values <= MAX_MINIBLOCK_VALUES);
669                } else {
670                    assert_eq!(chunk.log_num_values, 0);
671                }
672            }
673        }
674    }
675
676    // ========== Error Handling Tests ==========
677
678    #[test]
679    #[should_panic(expected = "RLE decompressor expects exactly 2 buffers")]
680    fn test_invalid_buffer_count() {
681        let decompressor = RleMiniBlockDecompressor::new(32);
682        let _ = decompressor.decompress(vec![LanceBuffer::Owned(vec![1, 2, 3, 4])], 10);
683    }
684
685    #[test]
686    #[should_panic(expected = "Inconsistent RLE buffers")]
687    fn test_buffer_consistency() {
688        let decompressor = RleMiniBlockDecompressor::new(32);
689        let values = LanceBuffer::Owned(vec![1, 0, 0, 0]); // 1 i32 value
690        let lengths = LanceBuffer::Owned(vec![5, 10]); // 2 lengths - mismatch!
691        let _ = decompressor.decompress(vec![values, lengths], 15);
692    }
693
694    #[test]
695    fn test_empty_data_handling() {
696        let encoder = RleMiniBlockEncoder::new();
697
698        // Test empty block
699        let empty_block = DataBlock::FixedWidth(FixedWidthDataBlock {
700            bits_per_value: 32,
701            data: LanceBuffer::Owned(vec![]),
702            num_values: 0,
703            block_info: BlockInfo::default(),
704        });
705
706        let (compressed, _) = encoder.compress(empty_block).unwrap();
707        assert_eq!(compressed.num_values, 0);
708        assert!(compressed.data.is_empty());
709
710        // Test decompression of empty data
711        let decompressor = RleMiniBlockDecompressor::new(32);
712        let decompressed = decompressor.decompress(vec![], 0).unwrap();
713
714        match decompressed {
715            DataBlock::FixedWidth(ref block) => {
716                assert_eq!(block.num_values, 0);
717                assert_eq!(block.data.len(), 0);
718            }
719            _ => panic!("Expected FixedWidth block"),
720        }
721    }
722
723    // ========== Integration Test ==========
724
725    #[test]
726    fn test_multi_chunk_round_trip() {
727        let encoder = RleMiniBlockEncoder::new();
728
729        // Create data that spans multiple chunks with mixed patterns
730        let mut data = Vec::new();
731
732        // High compression section
733        data.extend(vec![999i32; 2000]);
734        // Low compression section
735        data.extend(0..1000);
736        // Another high compression section
737        data.extend(vec![777i32; 2000]);
738
739        let array = Int32Array::from(data.clone());
740        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
741
742        // Manually decompress all chunks
743        let mut reconstructed = Vec::new();
744        let mut values_offset = 0usize;
745        let mut lengths_offset = 0usize;
746        let mut values_processed = 0u64;
747
748        // We now have exactly 2 global buffers
749        assert_eq!(compressed.data.len(), 2);
750        let global_values = &compressed.data[0];
751        let global_lengths = &compressed.data[1];
752
753        for chunk in &compressed.chunks {
754            let chunk_values = if chunk.log_num_values > 0 {
755                1u64 << chunk.log_num_values
756            } else {
757                compressed.num_values - values_processed
758            };
759
760            // Extract chunk buffers from global buffers using buffer_sizes
761            let values_size = chunk.buffer_sizes[0] as usize;
762            let lengths_size = chunk.buffer_sizes[1] as usize;
763
764            let chunk_values_buffer = global_values.slice_with_length(values_offset, values_size);
765            let chunk_lengths_buffer =
766                global_lengths.slice_with_length(lengths_offset, lengths_size);
767
768            let decompressor = RleMiniBlockDecompressor::new(32);
769            let chunk_data = decompressor
770                .decompress(
771                    vec![chunk_values_buffer, chunk_lengths_buffer],
772                    chunk_values,
773                )
774                .unwrap();
775
776            values_offset += values_size;
777            lengths_offset += lengths_size;
778            values_processed += chunk_values;
779
780            match chunk_data {
781                DataBlock::FixedWidth(ref block) => {
782                    let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
783                    reconstructed.extend_from_slice(values);
784                }
785                _ => panic!("Expected FixedWidth block"),
786            }
787        }
788
789        assert_eq!(reconstructed, data);
790    }
791
792    #[test]
793    fn test_exact_1024_values_bug() {
794        let encoder = RleMiniBlockEncoder::new();
795
796        // Try different patterns that might trigger the bug
797        let test_patterns = [
798            // Pattern 1: runs of 2
799            {
800                let mut data = Vec::new();
801                for i in 0..512 {
802                    data.push(i);
803                    data.push(i);
804                }
805                data
806            },
807            // Pattern 2: single run that ends at exactly 1024
808            vec![42i32; 1024],
809            // Pattern 3: alternating values
810            {
811                let mut data = Vec::new();
812                for i in 0..1024 {
813                    data.push(i % 2);
814                }
815                data
816            },
817            // Pattern 4: runs that end right at boundary
818            {
819                let mut data = Vec::new();
820                // 255 + 255 + 255 + 255 + 4 = 1024
821                data.extend(vec![1i32; 255]);
822                data.extend(vec![2i32; 255]);
823                data.extend(vec![3i32; 255]);
824                data.extend(vec![4i32; 255]);
825                data.extend(vec![5i32; 4]);
826                data
827            },
828            // Pattern 5: unique values ending exactly at 1024
829            { (0..1024).collect::<Vec<_>>() },
830        ];
831
832        for (idx, data) in test_patterns.iter().enumerate() {
833            assert_eq!(data.len(), 1024);
834
835            let array = Int32Array::from(data.clone());
836            let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
837
838            // Decompress and verify
839            let decompressor = RleMiniBlockDecompressor::new(32);
840            match decompressor.decompress(compressed.data, compressed.num_values) {
841                Ok(decompressed) => match decompressed {
842                    DataBlock::FixedWidth(ref block) => {
843                        let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
844                        assert_eq!(
845                            values.len(),
846                            1024,
847                            "Pattern {} failed: got {} values",
848                            idx,
849                            values.len()
850                        );
851                        assert_eq!(values, &data[..], "Pattern {} data mismatch", idx);
852                    }
853                    _ => panic!("Expected FixedWidth block"),
854                },
855                Err(e) => {
856                    panic!("Pattern {} failed with error: {}", idx, e);
857                }
858            }
859        }
860    }
861
862    #[test]
863    fn test_unique_values_at_boundary() {
864        let encoder = RleMiniBlockEncoder::new();
865
866        // This specific pattern should trigger the bug:
867        // 1023 unique values followed by one that's the same
868        let mut data = Vec::new();
869        for i in 0..1023 {
870            data.push(i);
871        }
872        data.push(1022i32); // Last value is the same as second-to-last
873        assert_eq!(data.len(), 1024);
874
875        let array = Int32Array::from(data.clone());
876        let (compressed, _) = encoder.compress(DataBlock::from_array(array)).unwrap();
877
878        // Decompress and verify
879        let decompressor = RleMiniBlockDecompressor::new(32);
880        match decompressor.decompress(compressed.data, compressed.num_values) {
881            Ok(decompressed) => match decompressed {
882                DataBlock::FixedWidth(ref block) => {
883                    let values: &[i32] = bytemuck::cast_slice(block.data.as_ref());
884                    assert_eq!(
885                        values.len(),
886                        1024,
887                        "Got {} values, expected 1024",
888                        values.len()
889                    );
890                    assert_eq!(values, &data[..]);
891                }
892                _ => panic!("Expected FixedWidth block"),
893            },
894            Err(e) => {
895                panic!("Decompression failed: {}", e);
896            }
897        }
898    }
899
900    #[test]
901    fn test_bug_4092_bytes() {
902        // Test the exact scenario that produces 4092 bytes instead of 4096
903        let encoder = RleMiniBlockEncoder::new();
904
905        // Create pattern: 1022 unique values, then one value repeated 3 times
906        let mut data = Vec::new();
907        for i in 0..1022 {
908            data.push(i);
909        }
910        data.push(999999i32);
911        data.push(999999i32);
912        assert_eq!(data.len(), 1024);
913
914        let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
915        assert_eq!(bytes.len(), 4096);
916
917        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
918            bits_per_value: 32,
919            data: LanceBuffer::Owned(bytes),
920            num_values: 1024,
921            block_info: BlockInfo::default(),
922        });
923
924        let (compressed, _) = encoder.compress(block).unwrap();
925
926        // Try to decode and see what happens
927        let decompressor = RleMiniBlockDecompressor::new(32);
928        match decompressor.decompress(compressed.data, compressed.num_values) {
929            Ok(decompressed) => match decompressed {
930                DataBlock::FixedWidth(ref block) => {
931                    assert_eq!(
932                        block.data.len(),
933                        4096,
934                        "Expected 4096 bytes but got {}",
935                        block.data.len()
936                    );
937                }
938                _ => panic!("Expected FixedWidth block"),
939            },
940            Err(e) => {
941                if e.to_string().contains("4092") {
942                    panic!("Found the bug! {}", e);
943                }
944            }
945        }
946    }
947
948    #[test]
949    fn test_low_repetition_50pct_bug() {
950        // Test case that reproduces the 4092 bytes bug with low repetition (50%)
951        // This simulates the 1M benchmark case
952        let encoder = RleMiniBlockEncoder::new();
953
954        // Create 1M values with low repetition (50% chance of change)
955        let num_values = 1_048_576; // 1M values
956        let mut data = Vec::with_capacity(num_values);
957        let mut value = 0i32;
958        let mut rng = 12345u64; // Simple deterministic RNG
959
960        for _ in 0..num_values {
961            data.push(value);
962            // Simple LCG for deterministic randomness
963            rng = rng.wrapping_mul(1664525).wrapping_add(1013904223);
964            // 50% chance to increment value
965            if (rng >> 16) & 1 == 1 {
966                value += 1;
967            }
968        }
969
970        let bytes: Vec<u8> = data.iter().flat_map(|v| v.to_le_bytes()).collect();
971
972        let block = DataBlock::FixedWidth(FixedWidthDataBlock {
973            bits_per_value: 32,
974            data: LanceBuffer::Owned(bytes),
975            num_values: num_values as u64,
976            block_info: BlockInfo::default(),
977        });
978
979        let (compressed, _) = encoder.compress(block).unwrap();
980
981        // Debug first few chunks
982        for (i, chunk) in compressed.chunks.iter().take(5).enumerate() {
983            let _chunk_values = if chunk.log_num_values > 0 {
984                1 << chunk.log_num_values
985            } else {
986                // Last chunk - calculate remaining
987                let prev_total: usize = compressed.chunks[..i]
988                    .iter()
989                    .map(|c| 1usize << c.log_num_values)
990                    .sum();
991                num_values - prev_total
992            };
993        }
994
995        // Try to decompress
996        let decompressor = RleMiniBlockDecompressor::new(32);
997        match decompressor.decompress(compressed.data, compressed.num_values) {
998            Ok(decompressed) => match decompressed {
999                DataBlock::FixedWidth(ref block) => {
1000                    assert_eq!(
1001                        block.data.len(),
1002                        num_values * 4,
1003                        "Expected {} bytes but got {}",
1004                        num_values * 4,
1005                        block.data.len()
1006                    );
1007                }
1008                _ => panic!("Expected FixedWidth block"),
1009            },
1010            Err(e) => {
1011                if e.to_string().contains("4092") {
1012                    panic!("Bug reproduced! {}", e);
1013                } else {
1014                    panic!("Unexpected error: {}", e);
1015                }
1016            }
1017        }
1018    }
1019}