Skip to main content

lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use crate::{
16    constants::{
17        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18    },
19    data::DictionaryDataBlock,
20    encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21    format::{
22        ProtobufUtils21,
23        pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
24    },
25};
26use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
27use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
31use itertools::Itertools;
32use lance_arrow::DataTypeExt;
33use lance_arrow::deepcopy::deep_copy_nulls;
34use lance_core::{
35    cache::{CacheKey, Context, DeepSizeOf},
36    error::{Error, LanceOptionExt},
37    utils::bit::pad_bytes,
38};
39use log::trace;
40
41use crate::{
42    compression::{
43        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44    },
45    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46    utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50    encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56    encodings::logical::primitive::miniblock::MiniBlockCompressed,
57    statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60    repdef::{
61        CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
62        RepDefSlicer, build_control_word_iterator,
63    },
64    utils::accumulation::AccumulationQueue,
65};
66use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
67
68use crate::constants::{
69    COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
70    DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
71    DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
72    DICT_VALUES_COMPRESSION_META_KEY,
73};
74use crate::version::LanceFileVersion;
75use crate::{
76    EncodingsIo,
77    buffer::LanceBuffer,
78    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
79    decoder::{
80        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
81        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
82        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
83        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
84    },
85    encoder::{
86        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
87    },
88    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
89};
90
91pub mod blob;
92pub mod constant;
93pub mod dict;
94pub mod fullzip;
95pub mod miniblock;
96
97const FILL_BYTE: u8 = 0xFE;
98const DEFAULT_DICT_DIVISOR: u64 = 2;
99const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
100const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
101const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
102
103struct PageLoadTask {
104    decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
105    num_rows: u64,
106}
107
108/// A trait for figuring out how to schedule the data within
109/// a single page.
110trait StructuralPageScheduler: std::fmt::Debug + Send {
111    /// Fetches any metadata required for the page
112    fn initialize<'a>(
113        &'a mut self,
114        io: &Arc<dyn EncodingsIo>,
115    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
116    /// Loads metadata from a previous initialize call
117    fn load(&mut self, data: &Arc<dyn CachedPageData>);
118    /// Schedules the read of the given ranges in the page
119    ///
120    /// The read may be split into multiple "shards" if the page is extremely large.
121    /// Each shard maps to one or more rows and can be decoded independently.
122    ///
123    /// Note: this sharding is for splitting up very large pages into smaller reads to
124    /// avoid buffering too much data in memory.  It is not related to the batch size or
125    /// compute units in any way.
126    fn schedule_ranges(
127        &self,
128        ranges: &[Range<u64>],
129        io: &Arc<dyn EncodingsIo>,
130    ) -> Result<Vec<PageLoadTask>>;
131}
132
133/// Metadata describing the decoded size of a mini-block
134#[derive(Debug)]
135struct ChunkMeta {
136    num_values: u64,
137    chunk_size_bytes: u64,
138    offset_bytes: u64,
139}
140
141/// A mini-block chunk that has been decoded and decompressed
142#[derive(Debug, Clone)]
143struct DecodedMiniBlockChunk {
144    rep: Option<ScalarBuffer<u16>>,
145    def: Option<ScalarBuffer<u16>>,
146    values: DataBlock,
147}
148
149/// A task to decode a one or more mini-blocks of data into an output batch
150///
151/// Note: Two batches might share the same mini-block of data.  When this happens
152/// then each batch gets a copy of the block and each batch decodes the block independently.
153///
154/// This means we have duplicated work but it is necessary to avoid having to synchronize
155/// the decoding of the block. (TODO: test this theory)
156#[derive(Debug)]
157struct DecodeMiniBlockTask {
158    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
159    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
160    value_decompressor: Arc<dyn MiniBlockDecompressor>,
161    dictionary_data: Option<Arc<DataBlock>>,
162    def_meaning: Arc<[DefinitionInterpretation]>,
163    num_buffers: u64,
164    max_visible_level: u16,
165    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
166    has_large_chunk: bool,
167}
168
169impl DecodeMiniBlockTask {
170    fn decode_levels(
171        rep_decompressor: &dyn BlockDecompressor,
172        levels: LanceBuffer,
173        num_levels: u16,
174    ) -> Result<ScalarBuffer<u16>> {
175        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
176        let rep = rep.as_fixed_width().unwrap();
177        debug_assert_eq!(rep.num_values, num_levels as u64);
178        debug_assert_eq!(rep.bits_per_value, 16);
179        Ok(rep.data.borrow_to_typed_slice::<u16>())
180    }
181
182    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
183    // values from `level_buf` starting at `offset`.
184    //
185    // We need to handle both the case where `levels` is None (no nulls encountered
186    // yet) and the case where `level_buf` is None (the input we are copying from has
187    // no nulls)
188    fn extend_levels(
189        range: Range<u64>,
190        levels: &mut Option<LevelBuffer>,
191        level_buf: &Option<impl AsRef<[u16]>>,
192        dest_offset: usize,
193    ) {
194        if let Some(level_buf) = level_buf {
195            if levels.is_none() {
196                // This is the first non-empty def buf we've hit, fill in the past
197                // with 0 (valid)
198                let mut new_levels_vec =
199                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
200                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
201                *levels = Some(new_levels_vec);
202            }
203            levels.as_mut().unwrap().extend(
204                level_buf.as_ref()[range.start as usize..range.end as usize]
205                    .iter()
206                    .copied(),
207            );
208        } else if let Some(levels) = levels {
209            let num_values = (range.end - range.start) as usize;
210            // This is an all-valid level_buf but we had nulls earlier and so we
211            // need to materialize it
212            levels.extend(iter::repeat_n(0, num_values));
213        }
214    }
215
216    /// Maps a range of rows to a range of items and a range of levels
217    ///
218    /// If there is no repetition information this just returns the range as-is.
219    ///
220    /// If there is repetition information then we need to do some work to figure out what
221    /// range of items corresponds to the requested range of rows.
222    ///
223    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
224    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
225    ///
226    /// Idx: 0 1 2 3 4 5 6
227    /// Rep: 1 0 0 1 0 1 0
228    ///
229    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
230    ///
231    /// If there are invisible items then we don't count them when calculating the range of items we
232    /// are interested in but we do count them when calculating the range of levels we are interested
233    /// in.  As a result we have to return both the item range (first return value) and the level range
234    /// (second return value).
235    ///
236    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
237    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
238    ///
239    /// Idx: 0 1 2 3 4 5 6 7 8
240    /// Rep: 1 0 0 1 0 1 1 0 0
241    /// Def: 0 0 0 0 0 1 0 0 0
242    /// Itm: 1 2 3 4 5 6 7 8
243    ///
244    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
245    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
246    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
247    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
248    ///
249    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
250    ///
251    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
252    fn map_range(
253        range: Range<u64>,
254        rep: Option<&impl AsRef<[u16]>>,
255        def: Option<&impl AsRef<[u16]>>,
256        max_rep: u16,
257        max_visible_def: u16,
258        // The total number of items (not rows) in the chunk.  This is not quite the same as
259        // rep.len() / def.len() because it doesn't count invisible items
260        total_items: u64,
261        preamble_action: PreambleAction,
262    ) -> (Range<u64>, Range<u64>) {
263        if let Some(rep) = rep {
264            let mut rep = rep.as_ref();
265            // If there is a preamble and we need to skip it then do that first.  The work is the same
266            // whether there is def information or not
267            let mut items_in_preamble = 0_u64;
268            let first_row_start = match preamble_action {
269                PreambleAction::Skip | PreambleAction::Take => {
270                    let first_row_start = if let Some(def) = def.as_ref() {
271                        let mut first_row_start = None;
272                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
273                            if *rep == max_rep {
274                                first_row_start = Some(idx as u64);
275                                break;
276                            }
277                            if *def <= max_visible_def {
278                                items_in_preamble += 1;
279                            }
280                        }
281                        first_row_start
282                    } else {
283                        let first_row_start =
284                            rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
285                        items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
286                        first_row_start
287                    };
288                    // It is possible for a chunk to be entirely partial values but if it is then it
289                    // should never show up as a preamble to skip
290                    if first_row_start.is_none() {
291                        assert!(preamble_action == PreambleAction::Take);
292                        return (0..total_items, 0..rep.len() as u64);
293                    }
294                    let first_row_start = first_row_start.unwrap();
295                    rep = &rep[first_row_start as usize..];
296                    first_row_start
297                }
298                PreambleAction::Absent => {
299                    debug_assert!(rep[0] == max_rep);
300                    0
301                }
302            };
303
304            // We hit this case when all we needed was the preamble
305            if range.start == range.end {
306                debug_assert!(preamble_action == PreambleAction::Take);
307                debug_assert!(items_in_preamble <= total_items);
308                return (0..items_in_preamble, 0..first_row_start);
309            }
310            assert!(range.start < range.end);
311
312            let mut rows_seen = 0;
313            let mut new_start = 0;
314            let mut new_levels_start = 0;
315
316            if let Some(def) = def {
317                let def = &def.as_ref()[first_row_start as usize..];
318
319                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
320                let mut lead_invis_seen = 0;
321
322                if range.start > 0 {
323                    if def[0] > max_visible_def {
324                        lead_invis_seen += 1;
325                    }
326                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
327                        if *rep == max_rep {
328                            rows_seen += 1;
329                            if rows_seen == range.start {
330                                new_start = idx as u64 + 1 - lead_invis_seen;
331                                new_levels_start = idx as u64 + 1;
332                                break;
333                            }
334                        }
335                        if *def > max_visible_def {
336                            lead_invis_seen += 1;
337                        }
338                    }
339                }
340
341                rows_seen += 1;
342
343                let mut new_end = u64::MAX;
344                let mut new_levels_end = rep.len() as u64;
345                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
346                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
347                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
348                    .iter()
349                    .zip(&def[(new_levels_start + 1) as usize..])
350                    .enumerate()
351                {
352                    if *rep == max_rep {
353                        rows_seen += 1;
354                        if rows_seen == range.end + 1 {
355                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
356                            new_levels_end = idx as u64 + new_levels_start + 1;
357                            break;
358                        }
359                    }
360                    if *def > max_visible_def {
361                        tail_invis_seen += 1;
362                    }
363                }
364
365                if new_end == u64::MAX {
366                    new_levels_end = rep.len() as u64;
367                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
368                    new_end = rep.len() as u64 - total_invis_seen;
369                }
370
371                assert_ne!(new_end, u64::MAX);
372
373                // Adjust for any skipped preamble
374                if preamble_action == PreambleAction::Skip {
375                    new_start += items_in_preamble;
376                    new_end += items_in_preamble;
377                    new_levels_start += first_row_start;
378                    new_levels_end += first_row_start;
379                } else if preamble_action == PreambleAction::Take {
380                    debug_assert_eq!(new_start, 0);
381                    debug_assert_eq!(new_levels_start, 0);
382                    new_end += items_in_preamble;
383                    new_levels_end += first_row_start;
384                }
385
386                debug_assert!(new_end <= total_items);
387                (new_start..new_end, new_levels_start..new_levels_end)
388            } else {
389                // Easy case, there are no invisible items, so we don't need to check for them
390                // The items range and levels range will be the same.  We do still need to walk
391                // the rep levels to find the row boundaries
392
393                // range.start == 0 always maps to 0, otherwise we need to walk
394                if range.start > 0 {
395                    for (idx, rep) in rep.iter().skip(1).enumerate() {
396                        if *rep == max_rep {
397                            rows_seen += 1;
398                            if rows_seen == range.start {
399                                new_start = idx as u64 + 1;
400                                break;
401                            }
402                        }
403                    }
404                }
405                let mut new_end = rep.len() as u64;
406                // range.end == max_items always maps to rep.len(), otherwise we need to walk
407                if range.end < total_items {
408                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
409                        if *rep == max_rep {
410                            rows_seen += 1;
411                            if rows_seen == range.end {
412                                new_end = idx as u64 + new_start + 1;
413                                break;
414                            }
415                        }
416                    }
417                }
418
419                // Adjust for any skipped preamble
420                if preamble_action == PreambleAction::Skip {
421                    new_start += first_row_start;
422                    new_end += first_row_start;
423                } else if preamble_action == PreambleAction::Take {
424                    debug_assert_eq!(new_start, 0);
425                    new_end += first_row_start;
426                }
427
428                debug_assert!(new_end <= total_items);
429                (new_start..new_end, new_start..new_end)
430            }
431        } else {
432            // No repetition info, easy case, just use the range as-is and the item
433            // and level ranges are the same
434            (range.clone(), range)
435        }
436    }
437
438    // read `num_buffers` buffer sizes from `buf` starting at `offset`
439    fn read_buffer_sizes<const LARGE: bool>(
440        buf: &[u8],
441        offset: &mut usize,
442        num_buffers: u64,
443    ) -> Vec<u32> {
444        let read_size = if LARGE { 4 } else { 2 };
445        (0..num_buffers)
446            .map(|_| {
447                let bytes = &buf[*offset..*offset + read_size];
448                let size = if LARGE {
449                    u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
450                } else {
451                    // the buffer size is read from u16 but is stored as u32 after decoding for consistency
452                    u16::from_le_bytes([bytes[0], bytes[1]]) as u32
453                };
454                *offset += read_size;
455                size
456            })
457            .collect()
458    }
459
460    // Unserialize a miniblock into a collection of vectors
461    fn decode_miniblock_chunk(
462        &self,
463        buf: &LanceBuffer,
464        items_in_chunk: u64,
465    ) -> Result<DecodedMiniBlockChunk> {
466        let mut offset = 0;
467        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
468        offset += 2;
469
470        let rep_size = if self.rep_decompressor.is_some() {
471            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
472            offset += 2;
473            Some(rep_size)
474        } else {
475            None
476        };
477        let def_size = if self.def_decompressor.is_some() {
478            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
479            offset += 2;
480            Some(def_size)
481        } else {
482            None
483        };
484
485        let buffer_sizes = if self.has_large_chunk {
486            Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
487        } else {
488            Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
489        };
490
491        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
492
493        let rep = rep_size.map(|rep_size| {
494            let rep = buf.slice_with_length(offset, rep_size as usize);
495            offset += rep_size as usize;
496            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
497            rep
498        });
499
500        let def = def_size.map(|def_size| {
501            let def = buf.slice_with_length(offset, def_size as usize);
502            offset += def_size as usize;
503            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504            def
505        });
506
507        let buffers = buffer_sizes
508            .into_iter()
509            .map(|buf_size| {
510                let buf = buf.slice_with_length(offset, buf_size as usize);
511                offset += buf_size as usize;
512                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
513                buf
514            })
515            .collect::<Vec<_>>();
516
517        let values = self
518            .value_decompressor
519            .decompress(buffers, items_in_chunk)?;
520
521        let rep = rep
522            .map(|rep| {
523                Self::decode_levels(
524                    self.rep_decompressor.as_ref().unwrap().as_ref(),
525                    rep,
526                    num_levels,
527                )
528            })
529            .transpose()?;
530        let def = def
531            .map(|def| {
532                Self::decode_levels(
533                    self.def_decompressor.as_ref().unwrap().as_ref(),
534                    def,
535                    num_levels,
536                )
537            })
538            .transpose()?;
539
540        Ok(DecodedMiniBlockChunk { rep, def, values })
541    }
542}
543
544impl DecodePageTask for DecodeMiniBlockTask {
545    fn decode(self: Box<Self>) -> Result<DecodedPage> {
546        // First, we create output buffers for the rep and def and data
547        let mut repbuf: Option<LevelBuffer> = None;
548        let mut defbuf: Option<LevelBuffer> = None;
549
550        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
551
552        // This is probably an over-estimate but it's quick and easy to calculate
553        let estimated_size_bytes = self
554            .instructions
555            .iter()
556            .map(|(_, chunk)| chunk.data.len())
557            .sum::<usize>()
558            * 2;
559        let mut data_builder =
560            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
561
562        // We need to keep track of the offset into repbuf/defbuf that we are building up
563        let mut level_offset = 0;
564
565        // Pre-compute caching needs for each chunk by checking if the next chunk is the same
566        let needs_caching: Vec<bool> = self
567            .instructions
568            .windows(2)
569            .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
570            .chain(std::iter::once(false)) // the last one never needs caching
571            .collect();
572
573        // Cache for storing decoded chunks when beneficial
574        let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
575
576        // Now we iterate through each instruction and process it
577        for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
578            let should_cache_this_chunk = needs_caching[idx];
579
580            let decoded_chunk = match &chunk_cache {
581                Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
582                    // Clone only when we have a cache hit (much cheaper than decoding)
583                    cached_chunk.clone()
584                }
585                _ => {
586                    // Cache miss, need to decode
587                    let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
588
589                    // Only update cache if this chunk will benefit the next access
590                    if should_cache_this_chunk {
591                        chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
592                    }
593                    decoded
594                }
595            };
596
597            let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
598
599            // Our instructions tell us which rows we want to take from this chunk
600            let row_range_start =
601                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
602            let row_range_end = row_range_start + instructions.rows_to_take;
603
604            // We use the rep info to map the row range to an item range / levels range
605            let (item_range, level_range) = Self::map_range(
606                row_range_start..row_range_end,
607                rep.as_ref(),
608                def.as_ref(),
609                max_rep,
610                self.max_visible_level,
611                chunk.items_in_chunk,
612                instructions.preamble_action,
613            );
614            if item_range.end - item_range.start > chunk.items_in_chunk {
615                return Err(lance_core::Error::internal(format!(
616                    "Item range {:?} is greater than chunk items in chunk {:?}",
617                    item_range, chunk.items_in_chunk
618                )));
619            }
620
621            // Now we append the data to the output buffers
622            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
623            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
624            level_offset += (level_range.end - level_range.start) as usize;
625            data_builder.append(&values, item_range);
626        }
627
628        let mut data = data_builder.finish();
629
630        let unraveler =
631            RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
632
633        if let Some(dictionary) = &self.dictionary_data {
634            // Don't decode here, that happens later (if needed)
635            let DataBlock::FixedWidth(indices) = data else {
636                return Err(lance_core::Error::internal(format!(
637                    "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
638                    data
639                )));
640            };
641            data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
642                indices,
643                dictionary.as_ref().clone(),
644            ));
645        }
646
647        Ok(DecodedPage {
648            data,
649            repdef: unraveler,
650        })
651    }
652}
653
654/// A chunk that has been loaded by the miniblock scheduler (but not
655/// yet decoded)
656#[derive(Debug)]
657struct LoadedChunk {
658    data: LanceBuffer,
659    items_in_chunk: u64,
660    byte_range: Range<u64>,
661    chunk_idx: usize,
662}
663
664impl Clone for LoadedChunk {
665    fn clone(&self) -> Self {
666        Self {
667            // Safe as we always create borrowed buffers here
668            data: self.data.clone(),
669            items_in_chunk: self.items_in_chunk,
670            byte_range: self.byte_range.clone(),
671            chunk_idx: self.chunk_idx,
672        }
673    }
674}
675
676/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
677/// details on the different layouts.
678#[derive(Debug)]
679struct MiniBlockDecoder {
680    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
681    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
682    value_decompressor: Arc<dyn MiniBlockDecompressor>,
683    def_meaning: Arc<[DefinitionInterpretation]>,
684    loaded_chunks: VecDeque<LoadedChunk>,
685    instructions: VecDeque<ChunkInstructions>,
686    offset_in_current_chunk: u64,
687    num_rows: u64,
688    num_buffers: u64,
689    dictionary: Option<Arc<DataBlock>>,
690    has_large_chunk: bool,
691}
692
693/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
694/// process for miniblock encoded data.
695impl StructuralPageDecoder for MiniBlockDecoder {
696    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
697        let mut items_desired = num_rows;
698        let mut need_preamble = false;
699        let mut skip_in_chunk = self.offset_in_current_chunk;
700        let mut drain_instructions = Vec::new();
701        while items_desired > 0 || need_preamble {
702            let (instructions, consumed) = self
703                .instructions
704                .front()
705                .unwrap()
706                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
707
708            while self.loaded_chunks.front().unwrap().chunk_idx
709                != instructions.chunk_instructions.chunk_idx
710            {
711                self.loaded_chunks.pop_front();
712            }
713            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
714            if consumed {
715                self.instructions.pop_front();
716            }
717        }
718        // We can throw away need_preamble here because it must be false.  If it were true it would mean
719        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
720        self.offset_in_current_chunk = skip_in_chunk;
721
722        let max_visible_level = self
723            .def_meaning
724            .iter()
725            .take_while(|l| !l.is_list())
726            .map(|l| l.num_def_levels())
727            .sum::<u16>();
728
729        Ok(Box::new(DecodeMiniBlockTask {
730            instructions: drain_instructions,
731            def_decompressor: self.def_decompressor.clone(),
732            rep_decompressor: self.rep_decompressor.clone(),
733            value_decompressor: self.value_decompressor.clone(),
734            dictionary_data: self.dictionary.clone(),
735            def_meaning: self.def_meaning.clone(),
736            num_buffers: self.num_buffers,
737            max_visible_level,
738            has_large_chunk: self.has_large_chunk,
739        }))
740    }
741
742    fn num_rows(&self) -> u64 {
743        self.num_rows
744    }
745}
746
747#[derive(Debug)]
748struct CachedComplexAllNullState {
749    rep: Option<ScalarBuffer<u16>>,
750    def: Option<ScalarBuffer<u16>>,
751}
752
753impl DeepSizeOf for CachedComplexAllNullState {
754    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
755        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
757    }
758}
759
760impl CachedPageData for CachedComplexAllNullState {
761    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
762        self
763    }
764}
765
766/// A scheduler for all-null data that has repetition and definition levels
767///
768/// We still need to do some I/O in this case because we need to figure out what kind of null we
769/// are dealing with (null list, null struct, what level null struct, etc.)
770///
771/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
772/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
773/// and fast so...maybe not :)
774#[derive(Debug)]
775pub struct ComplexAllNullScheduler {
776    // Set from protobuf
777    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
778    def_meaning: Arc<[DefinitionInterpretation]>,
779    repdef: Option<Arc<CachedComplexAllNullState>>,
780    max_rep: u16,
781    max_visible_level: u16,
782    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
783    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
784    num_rep_values: u64,
785    num_def_values: u64,
786}
787
788impl ComplexAllNullScheduler {
789    pub fn new(
790        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
791        def_meaning: Arc<[DefinitionInterpretation]>,
792        rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
793        def_decompressor: Option<Arc<dyn BlockDecompressor>>,
794        num_rep_values: u64,
795        num_def_values: u64,
796    ) -> Self {
797        let max_rep = def_meaning.iter().filter(|l| l.is_list()).count() as u16;
798        let max_visible_level = def_meaning
799            .iter()
800            .take_while(|l| !l.is_list())
801            .map(|l| l.num_def_levels())
802            .sum::<u16>();
803        Self {
804            buffer_offsets_and_sizes,
805            def_meaning,
806            repdef: None,
807            max_rep,
808            max_visible_level,
809            rep_decompressor,
810            def_decompressor,
811            num_rep_values,
812            num_def_values,
813        }
814    }
815}
816
817impl StructuralPageScheduler for ComplexAllNullScheduler {
818    fn initialize<'a>(
819        &'a mut self,
820        io: &Arc<dyn EncodingsIo>,
821    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
822        // Fully load the rep & def buffers, as needed
823        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
824        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
825        let has_rep = rep_size > 0;
826        let has_def = def_size > 0;
827
828        let mut reads = Vec::with_capacity(2);
829        if has_rep {
830            reads.push(rep_pos..rep_pos + rep_size);
831        }
832        if has_def {
833            reads.push(def_pos..def_pos + def_size);
834        }
835
836        let data = io.submit_request(reads, 0);
837        let rep_decompressor = self.rep_decompressor.clone();
838        let def_decompressor = self.def_decompressor.clone();
839        let num_rep_values = self.num_rep_values;
840        let num_def_values = self.num_def_values;
841
842        async move {
843            let data = data.await?;
844            let mut data_iter = data.into_iter();
845
846            let decompress_levels = |compressed_bytes: Bytes,
847                                     decompressor: &Arc<dyn BlockDecompressor>,
848                                     num_values: u64,
849                                     level_type: &str|
850             -> Result<ScalarBuffer<u16>> {
851                let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
852                let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
853                match decompressed {
854                    DataBlock::FixedWidth(block) => {
855                        if block.num_values != num_values {
856                            return Err(Error::invalid_input_source(format!(
857                                "Unexpected {} level count after decompression: expected {}, got {}",
858                                level_type, num_values, block.num_values
859                            )
860                            .into()));
861                        }
862                        if block.bits_per_value != 16 {
863                            return Err(Error::invalid_input_source(format!(
864                                "Unexpected {} level bit width after decompression: expected 16, got {}",
865                                level_type, block.bits_per_value
866                            )
867                            .into()));
868                        }
869                        Ok(block.data.borrow_to_typed_slice::<u16>())
870                    }
871                    _ => Err(Error::invalid_input_source(format!(
872                        "Expected fixed-width data block for {} levels",
873                        level_type
874                    )
875                    .into())),
876                }
877            };
878
879            let rep = if has_rep {
880                let rep = data_iter.next().unwrap();
881                if let Some(rep_decompressor) = rep_decompressor.as_ref() {
882                    Some(decompress_levels(
883                        rep,
884                        rep_decompressor,
885                        num_rep_values,
886                        "repetition",
887                    )?)
888                } else {
889                    let rep = LanceBuffer::from_bytes(rep, 2);
890                    let rep = rep.borrow_to_typed_slice::<u16>();
891                    Some(rep)
892                }
893            } else {
894                None
895            };
896
897            let def = if has_def {
898                let def = data_iter.next().unwrap();
899                if let Some(def_decompressor) = def_decompressor.as_ref() {
900                    Some(decompress_levels(
901                        def,
902                        def_decompressor,
903                        num_def_values,
904                        "definition",
905                    )?)
906                } else {
907                    let def = LanceBuffer::from_bytes(def, 2);
908                    let def = def.borrow_to_typed_slice::<u16>();
909                    Some(def)
910                }
911            } else {
912                None
913            };
914
915            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
916
917            self.repdef = Some(repdef.clone());
918
919            Ok(repdef as Arc<dyn CachedPageData>)
920        }
921        .boxed()
922    }
923
924    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
925        self.repdef = Some(
926            data.clone()
927                .as_arc_any()
928                .downcast::<CachedComplexAllNullState>()
929                .unwrap(),
930        );
931    }
932
933    fn schedule_ranges(
934        &self,
935        ranges: &[Range<u64>],
936        _io: &Arc<dyn EncodingsIo>,
937    ) -> Result<Vec<PageLoadTask>> {
938        let ranges = VecDeque::from_iter(ranges.iter().cloned());
939        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
940        let decoder = Box::new(ComplexAllNullPageDecoder {
941            ranges,
942            rep: self.repdef.as_ref().unwrap().rep.clone(),
943            def: self.repdef.as_ref().unwrap().def.clone(),
944            num_rows,
945            def_meaning: self.def_meaning.clone(),
946            max_rep: self.max_rep,
947            max_visible_level: self.max_visible_level,
948            cursor_row: 0,
949            cursor_level: 0,
950        }) as Box<dyn StructuralPageDecoder>;
951        let page_load_task = PageLoadTask {
952            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
953            num_rows,
954        };
955        Ok(vec![page_load_task])
956    }
957}
958
959#[derive(Debug)]
960pub struct ComplexAllNullPageDecoder {
961    ranges: VecDeque<Range<u64>>,
962    rep: Option<ScalarBuffer<u16>>,
963    def: Option<ScalarBuffer<u16>>,
964    num_rows: u64,
965    def_meaning: Arc<[DefinitionInterpretation]>,
966    max_rep: u16,
967    max_visible_level: u16,
968    cursor_row: u64,
969    cursor_level: usize,
970}
971
972impl ComplexAllNullPageDecoder {
973    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
974        let mut rows_desired = num_rows;
975        let mut ranges = Vec::with_capacity(self.ranges.len());
976        while rows_desired > 0 {
977            let front = self.ranges.front_mut().unwrap();
978            let avail = front.end - front.start;
979            if avail > rows_desired {
980                ranges.push(front.start..front.start + rows_desired);
981                front.start += rows_desired;
982                rows_desired = 0;
983            } else {
984                ranges.push(self.ranges.pop_front().unwrap());
985                rows_desired -= avail;
986            }
987        }
988        ranges
989    }
990
991    fn take_row(&mut self) -> Result<(Range<usize>, u64)> {
992        let start = self.cursor_level;
993        let end = if let Some(rep) = &self.rep {
994            if start >= rep.len() {
995                return Err(Error::internal(
996                    "Invalid complex all-null layout: repetition buffer too short",
997                ));
998            }
999            if rep[start] != self.max_rep {
1000                return Err(Error::internal(
1001                    "Invalid complex all-null layout: row did not start at max repetition level",
1002                ));
1003            }
1004            let mut end = start + 1;
1005            while end < rep.len() && rep[end] != self.max_rep {
1006                end += 1;
1007            }
1008            end
1009        } else {
1010            start + 1
1011        };
1012
1013        let visible = if let Some(def) = &self.def {
1014            if end > def.len() {
1015                return Err(Error::internal(
1016                    "Invalid complex all-null layout: definition buffer too short",
1017                ));
1018            }
1019            def[start..end]
1020                .iter()
1021                .filter(|d| **d <= self.max_visible_level)
1022                .count() as u64
1023        } else {
1024            (end - start) as u64
1025        };
1026
1027        self.cursor_level = end;
1028        self.cursor_row += 1;
1029        Ok((start..end, visible))
1030    }
1031
1032    fn skip_to_row(&mut self, target_row: u64) -> Result<()> {
1033        while self.cursor_row < target_row {
1034            self.take_row()?;
1035        }
1036        Ok(())
1037    }
1038}
1039
1040impl StructuralPageDecoder for ComplexAllNullPageDecoder {
1041    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1042        let drained_ranges = self.drain_ranges(num_rows);
1043        let mut level_slices: Vec<Range<usize>> = Vec::new();
1044        let mut visible_items_total = 0;
1045
1046        for range in drained_ranges {
1047            self.skip_to_row(range.start)?;
1048            for _ in range.start..range.end {
1049                let (level_range, visible) = self.take_row()?;
1050                visible_items_total += visible;
1051                if let Some(last) = level_slices.last_mut()
1052                    && last.end == level_range.start
1053                {
1054                    last.end = level_range.end;
1055                    continue;
1056                }
1057                level_slices.push(level_range);
1058            }
1059        }
1060
1061        Ok(Box::new(DecodeComplexAllNullTask {
1062            level_slices,
1063            visible_items_total,
1064            rep: self.rep.clone(),
1065            def: self.def.clone(),
1066            def_meaning: self.def_meaning.clone(),
1067            max_visible_level: self.max_visible_level,
1068        }))
1069    }
1070
1071    fn num_rows(&self) -> u64 {
1072        self.num_rows
1073    }
1074}
1075
1076/// We use `level_slices` to slice into `rep` and `def` and create rep/def buffers
1077/// for the null data.
1078#[derive(Debug)]
1079pub struct DecodeComplexAllNullTask {
1080    level_slices: Vec<Range<usize>>,
1081    visible_items_total: u64,
1082    rep: Option<ScalarBuffer<u16>>,
1083    def: Option<ScalarBuffer<u16>>,
1084    def_meaning: Arc<[DefinitionInterpretation]>,
1085    max_visible_level: u16,
1086}
1087
1088impl DecodeComplexAllNullTask {
1089    fn decode_level(&self, levels: &Option<ScalarBuffer<u16>>) -> Option<Vec<u16>> {
1090        levels.as_ref().map(|levels| {
1091            let num_levels = self
1092                .level_slices
1093                .iter()
1094                .map(|range| range.end - range.start)
1095                .sum();
1096            let mut referenced_levels = Vec::with_capacity(num_levels);
1097            for range in &self.level_slices {
1098                referenced_levels.extend(levels[range.start..range.end].iter().copied());
1099            }
1100            referenced_levels
1101        })
1102    }
1103}
1104
1105impl DecodePageTask for DecodeComplexAllNullTask {
1106    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1107        let rep = self.decode_level(&self.rep);
1108        let def = self.decode_level(&self.def);
1109
1110        // If there are definition levels there may be empty / null lists which are not visible
1111        // in the items array.  We need to account for that here to figure out how many values
1112        // should be in the items array.
1113        let num_values = if let Some(def) = &def {
1114            def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
1115        } else {
1116            self.visible_items_total
1117        };
1118
1119        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1120        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
1121        Ok(DecodedPage {
1122            data,
1123            repdef: unraveler,
1124        })
1125    }
1126}
1127
1128/// A scheduler for simple all-null data
1129///
1130/// "simple" all-null data is data that is all null and only has a single level of definition and
1131/// no repetition.  We don't need to read any data at all in this case.
1132#[derive(Debug, Default)]
1133pub struct SimpleAllNullScheduler {}
1134
1135impl StructuralPageScheduler for SimpleAllNullScheduler {
1136    fn initialize<'a>(
1137        &'a mut self,
1138        _io: &Arc<dyn EncodingsIo>,
1139    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1140        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1141    }
1142
1143    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1144
1145    fn schedule_ranges(
1146        &self,
1147        ranges: &[Range<u64>],
1148        _io: &Arc<dyn EncodingsIo>,
1149    ) -> Result<Vec<PageLoadTask>> {
1150        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1151        let decoder =
1152            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1153        let page_load_task = PageLoadTask {
1154            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1155            num_rows,
1156        };
1157        Ok(vec![page_load_task])
1158    }
1159}
1160
1161/// A page decode task for all-null data without any
1162/// repetition and only a single level of definition
1163#[derive(Debug)]
1164struct SimpleAllNullDecodePageTask {
1165    num_values: u64,
1166}
1167impl DecodePageTask for SimpleAllNullDecodePageTask {
1168    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1169        let unraveler = RepDefUnraveler::new(
1170            None,
1171            Some(vec![1; self.num_values as usize]),
1172            Arc::new([DefinitionInterpretation::NullableItem]),
1173            self.num_values,
1174        );
1175        Ok(DecodedPage {
1176            data: DataBlock::AllNull(AllNullDataBlock {
1177                num_values: self.num_values,
1178            }),
1179            repdef: unraveler,
1180        })
1181    }
1182}
1183
1184#[derive(Debug)]
1185pub struct SimpleAllNullPageDecoder {
1186    num_rows: u64,
1187}
1188
1189impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1190    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1191        Ok(Box::new(SimpleAllNullDecodePageTask {
1192            num_values: num_rows,
1193        }))
1194    }
1195
1196    fn num_rows(&self) -> u64 {
1197        self.num_rows
1198    }
1199}
1200
1201#[derive(Debug, Clone)]
1202struct MiniBlockSchedulerDictionary {
1203    // These come from the protobuf
1204    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1205    dictionary_buf_position_and_size: (u64, u64),
1206    dictionary_data_alignment: u64,
1207    num_dictionary_items: u64,
1208}
1209
1210/// Individual block metadata within a MiniBlock repetition index.
1211#[derive(Debug)]
1212struct MiniBlockRepIndexBlock {
1213    // The index of the first row that starts after the beginning of this block.  If the block
1214    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1215    // then this will be a row that starts in some future block.
1216    first_row: u64,
1217    // The number of rows in the block, including the trailer but not the preamble.
1218    // Can be 0 if the block is entirely preamble
1219    starts_including_trailer: u64,
1220    // Whether the block has a preamble
1221    has_preamble: bool,
1222    // Whether the block has a trailer
1223    has_trailer: bool,
1224}
1225
1226impl DeepSizeOf for MiniBlockRepIndexBlock {
1227    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1228        0
1229    }
1230}
1231
1232/// Repetition index for MiniBlock encoding.
1233///
1234/// Stores block-level offset information to enable efficient random
1235/// access to nested data structures within mini-blocks.
1236#[derive(Debug)]
1237struct MiniBlockRepIndex {
1238    blocks: Vec<MiniBlockRepIndexBlock>,
1239}
1240
1241impl DeepSizeOf for MiniBlockRepIndex {
1242    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1243        self.blocks.deep_size_of_children(context)
1244    }
1245}
1246
1247impl MiniBlockRepIndex {
1248    /// Decode repetition index from chunk metadata using default values.
1249    ///
1250    /// This creates a repetition index where each chunk has no partial values
1251    /// and no trailers, suitable for simple sequential data layouts.
1252    pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1253        let mut blocks = Vec::with_capacity(chunks.len());
1254        let mut offset: u64 = 0;
1255
1256        for c in chunks {
1257            blocks.push(MiniBlockRepIndexBlock {
1258                first_row: offset,
1259                starts_including_trailer: c.num_values,
1260                has_preamble: false,
1261                has_trailer: false,
1262            });
1263
1264            offset += c.num_values;
1265        }
1266
1267        Self { blocks }
1268    }
1269
1270    /// Decode repetition index from raw bytes in little-endian format.
1271    ///
1272    /// The bytes should contain u64 values arranged in groups of `stride` elements,
1273    /// where the first two values of each group represent ends_count and partial_count.
1274    /// Returns an empty index if no bytes are provided.
1275    pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1276        // Convert bytes to u64 slice, handling alignment automatically
1277        let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1278        let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1279        let n = u64_slice.len() / stride;
1280
1281        let mut blocks = Vec::with_capacity(n);
1282        let mut chunk_has_preamble = false;
1283        let mut offset: u64 = 0;
1284
1285        // Extract first two values from each block: ends_count and partial_count
1286        for i in 0..n {
1287            let base_idx = i * stride;
1288            let ends = u64_slice[base_idx];
1289            let partial = u64_slice[base_idx + 1];
1290
1291            let has_trailer = partial > 0;
1292            // Convert branches to arithmetic for better compiler optimization
1293            let starts_including_trailer =
1294                ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1295
1296            blocks.push(MiniBlockRepIndexBlock {
1297                first_row: offset,
1298                starts_including_trailer,
1299                has_preamble: chunk_has_preamble,
1300                has_trailer,
1301            });
1302
1303            chunk_has_preamble = has_trailer;
1304            offset += starts_including_trailer;
1305        }
1306
1307        Self { blocks }
1308    }
1309}
1310
1311/// State that is loaded once and cached for future lookups
1312#[derive(Debug)]
1313struct MiniBlockCacheableState {
1314    /// Metadata that describes each chunk in the page
1315    chunk_meta: Vec<ChunkMeta>,
1316    /// The decoded repetition index
1317    rep_index: MiniBlockRepIndex,
1318    /// The dictionary for the page, if any
1319    dictionary: Option<Arc<DataBlock>>,
1320}
1321
1322impl DeepSizeOf for MiniBlockCacheableState {
1323    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1324        self.rep_index.deep_size_of_children(context)
1325            + self
1326                .dictionary
1327                .as_ref()
1328                .map(|dict| dict.data_size() as usize)
1329                .unwrap_or(0)
1330    }
1331}
1332
1333impl CachedPageData for MiniBlockCacheableState {
1334    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1335        self
1336    }
1337}
1338
1339/// A scheduler for a page that has been encoded with the mini-block layout
1340///
1341/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1342/// in practice.
1343///
1344/// First, during initialization, we load the chunk metadata, the repetition index,
1345/// and the dictionary (these last two may not be present)
1346///
1347/// Then, during scheduling, we use the user's requested row ranges and the repetition
1348/// index to determine which chunks we need and which rows we need from those chunks.
1349///
1350/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1351/// from the user is 40..60 then we need to:
1352///
1353///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1354///    then read 3 items for the 11th row of our range.
1355///  - Read the second chunk and read the remaining items in our 11th row and then read
1356///    the remaining 9 full rows.
1357///
1358/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1359/// The first two decode tasks will just need the first chunk.  The third decode task will
1360/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1361/// chunk.  The final decode task will just need the second chunk.
1362///
1363/// The above prose descriptions are what are represented by `ChunkInstructions` and
1364/// `ChunkDrainInstructions`.
1365#[derive(Debug)]
1366pub struct MiniBlockScheduler {
1367    // These come from the protobuf
1368    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1369    priority: u64,
1370    items_in_page: u64,
1371    repetition_index_depth: u16,
1372    num_buffers: u64,
1373    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1374    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1375    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1376    def_meaning: Arc<[DefinitionInterpretation]>,
1377    dictionary: Option<MiniBlockSchedulerDictionary>,
1378    // This is set after initialization
1379    page_meta: Option<Arc<MiniBlockCacheableState>>,
1380    has_large_chunk: bool,
1381}
1382
1383impl MiniBlockScheduler {
1384    fn try_new(
1385        buffer_offsets_and_sizes: &[(u64, u64)],
1386        priority: u64,
1387        items_in_page: u64,
1388        layout: &pb21::MiniBlockLayout,
1389        decompressors: &dyn DecompressionStrategy,
1390    ) -> Result<Self> {
1391        let rep_decompressor = layout
1392            .rep_compression
1393            .as_ref()
1394            .map(|rep_compression| {
1395                decompressors
1396                    .create_block_decompressor(rep_compression)
1397                    .map(Arc::from)
1398            })
1399            .transpose()?;
1400        let def_decompressor = layout
1401            .def_compression
1402            .as_ref()
1403            .map(|def_compression| {
1404                decompressors
1405                    .create_block_decompressor(def_compression)
1406                    .map(Arc::from)
1407            })
1408            .transpose()?;
1409        let def_meaning = layout
1410            .layers
1411            .iter()
1412            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1413            .collect::<Vec<_>>();
1414        let value_decompressor = decompressors.create_miniblock_decompressor(
1415            layout.value_compression.as_ref().unwrap(),
1416            decompressors,
1417        )?;
1418
1419        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1420            let num_dictionary_items = layout.num_dictionary_items;
1421            let dictionary_decompressor = decompressors
1422                .create_block_decompressor(dictionary_encoding)?
1423                .into();
1424            let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
1425            {
1426                Compression::Variable(_) => 4,
1427                Compression::Flat(_) => 16,
1428                Compression::General(_) => 1,
1429                Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
1430                    crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
1431                }
1432                _ => {
1433                    return Err(Error::invalid_input_source(
1434                        format!(
1435                            "Unsupported mini-block dictionary encoding: {:?}",
1436                            dictionary_encoding.compression.as_ref().unwrap()
1437                        )
1438                        .into(),
1439                    ));
1440                }
1441            };
1442            Some(MiniBlockSchedulerDictionary {
1443                dictionary_decompressor,
1444                dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1445                dictionary_data_alignment,
1446                num_dictionary_items,
1447            })
1448        } else {
1449            None
1450        };
1451
1452        Ok(Self {
1453            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1454            rep_decompressor,
1455            def_decompressor,
1456            value_decompressor: value_decompressor.into(),
1457            repetition_index_depth: layout.repetition_index_depth as u16,
1458            num_buffers: layout.num_buffers,
1459            priority,
1460            items_in_page,
1461            dictionary,
1462            def_meaning: def_meaning.into(),
1463            page_meta: None,
1464            has_large_chunk: layout.has_large_chunk,
1465        })
1466    }
1467
1468    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1469        let page_meta = self.page_meta.as_ref().unwrap();
1470        chunk_indices
1471            .iter()
1472            .map(|&chunk_idx| {
1473                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1474                let bytes_start = chunk_meta.offset_bytes;
1475                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1476                LoadedChunk {
1477                    byte_range: bytes_start..bytes_end,
1478                    items_in_chunk: chunk_meta.num_values,
1479                    chunk_idx,
1480                    data: LanceBuffer::empty(),
1481                }
1482            })
1483            .collect()
1484    }
1485}
1486
1487#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1488enum PreambleAction {
1489    Take,
1490    Skip,
1491    Absent,
1492}
1493
1494// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1495// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1496// us how exactly to read from the chunk.
1497//
1498// Examples:
1499//
1500// | Chunk 0     | Chunk 1   | Chunk 2   | Chunk 3 |
1501// | xxxxyyyyzzz | zzzzzzzzz | zzzzzzzzz | aaabbcc |
1502//
1503// Full read (0..6)
1504//
1505// Chunk 0: (several rows, ends with trailer)
1506//   preamble: absent
1507//   rows_to_skip: 0
1508//   rows_to_take: 3 (x, y, z)
1509//   take_trailer: true
1510//
1511// Chunk 1: (all preamble, ends with trailer)
1512//   preamble: take
1513//   rows_to_skip: 0
1514//   rows_to_take: 0
1515//   take_trailer: true
1516//
1517// Chunk 2: (all preamble, no trailer)
1518//   preamble: take
1519//   rows_to_skip: 0
1520//   rows_to_take: 0
1521//   take_trailer: false
1522//
1523// Chunk 3: (several rows, no trailer or preamble)
1524//   preamble: absent
1525//   rows_to_skip: 0
1526//   rows_to_take: 3 (a, b, c)
1527//   take_trailer: false
1528#[derive(Clone, Debug, PartialEq, Eq)]
1529struct ChunkInstructions {
1530    // The index of the chunk to read
1531    chunk_idx: usize,
1532    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1533    // is no repetition index there is never a preamble.
1534    //
1535    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1536    // that spans several chunks.
1537    preamble: PreambleAction,
1538    // How many complete rows (not including the preamble or trailer) to skip
1539    //
1540    // If this is non-zero then premable must not be Take
1541    rows_to_skip: u64,
1542    // How many rows to take.  If a row splits across chunks then we will count the row in the first
1543    // chunk that contains the row.
1544    rows_to_take: u64,
1545    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1546    // never a trailer.
1547    //
1548    // A chunk that is all preamble may or may not have a trailer.
1549    //
1550    // If this is true then we want to include the trailer
1551    take_trailer: bool,
1552}
1553
1554// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1555// start decoding them, based on a batch size, which might not align with what we scheduled.
1556//
1557// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1558//
1559// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1560// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1561// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1562// then take 10 rows.
1563//
1564// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1565//  -no preamble, skip 5, take 10, take trailer
1566//  -take preamble, skip 0, take 50, no trailer
1567//
1568// and we are draining 20 rows then the drain instructions for the first batch will be:
1569//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1570//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1571#[derive(Debug, PartialEq, Eq)]
1572struct ChunkDrainInstructions {
1573    chunk_instructions: ChunkInstructions,
1574    rows_to_skip: u64,
1575    rows_to_take: u64,
1576    preamble_action: PreambleAction,
1577}
1578
1579impl ChunkInstructions {
1580    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1581    //
1582    // We assume that `user_ranges` are in sorted order and non-overlapping
1583    //
1584    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1585    fn schedule_instructions(
1586        rep_index: &MiniBlockRepIndex,
1587        user_ranges: &[Range<u64>],
1588    ) -> Vec<Self> {
1589        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1590        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1591        // per row which can happen with lists.
1592        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1593
1594        for user_range in user_ranges {
1595            let mut rows_needed = user_range.end - user_range.start;
1596            let mut need_preamble = false;
1597
1598            // Need to find the first chunk with a first row >= user_range.start.  If there are
1599            // multiple chunks with the same first row we need to take the first one.
1600            let mut block_index = match rep_index
1601                .blocks
1602                .binary_search_by_key(&user_range.start, |block| block.first_row)
1603            {
1604                Ok(idx) => {
1605                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1606                    // are grabbing first eligible chunk
1607                    let mut idx = idx;
1608                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1609                        idx -= 1;
1610                    }
1611                    idx
1612                }
1613                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1614                Err(idx) => idx - 1,
1615            };
1616
1617            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1618
1619            while rows_needed > 0 || need_preamble {
1620                // Check if we've gone past the last block (should not happen)
1621                if block_index >= rep_index.blocks.len() {
1622                    log::warn!(
1623                        "schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
1624                    );
1625                    break;
1626                }
1627
1628                let chunk = &rep_index.blocks[block_index];
1629                let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1630
1631                // Handle blocks that are entirely preamble (rows_avail = 0)
1632                // These blocks have no rows to take but may have a preamble we need
1633                // We only look for preamble if to_skip == 0 (we're not skipping rows)
1634                if rows_avail == 0 && to_skip == 0 {
1635                    // Only process if this chunk has a preamble we need
1636                    if chunk.has_preamble && need_preamble {
1637                        chunk_instructions.push(Self {
1638                            chunk_idx: block_index,
1639                            preamble: PreambleAction::Take,
1640                            rows_to_skip: 0,
1641                            rows_to_take: 0,
1642                            // We still need to look at has_trailer to distinguish between "all preamble
1643                            // and row ends at end of chunk" and "all preamble and row bleeds into next
1644                            // chunk".  Both cases will have 0 rows available.
1645                            take_trailer: chunk.has_trailer,
1646                        });
1647                        // Only set need_preamble = false if the chunk has at least one row,
1648                        // Or we are reaching the last block,
1649                        // Otherwise, the chunk is entirely preamble and we need the next chunk's preamble too
1650                        if chunk.starts_including_trailer > 0
1651                            || block_index == rep_index.blocks.len() - 1
1652                        {
1653                            need_preamble = false;
1654                        }
1655                    }
1656                    // Move to next block
1657                    block_index += 1;
1658                    continue;
1659                }
1660
1661                // Edge case: if rows_avail == 0 but to_skip > 0
1662                // This theoretically shouldn't happen (binary search should avoid it)
1663                // but handle it for safety
1664                if rows_avail == 0 && to_skip > 0 {
1665                    // This block doesn't have enough rows to skip, move to next block
1666                    // Adjust to_skip by the number of rows in this block
1667                    to_skip -= chunk.starts_including_trailer;
1668                    block_index += 1;
1669                    continue;
1670                }
1671
1672                let rows_to_take = rows_avail.min(rows_needed);
1673                rows_needed -= rows_to_take;
1674
1675                let mut take_trailer = false;
1676                let preamble = if chunk.has_preamble {
1677                    if need_preamble {
1678                        PreambleAction::Take
1679                    } else {
1680                        PreambleAction::Skip
1681                    }
1682                } else {
1683                    PreambleAction::Absent
1684                };
1685
1686                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1687                if rows_to_take == rows_avail && chunk.has_trailer {
1688                    take_trailer = true;
1689                    need_preamble = true;
1690                } else {
1691                    need_preamble = false;
1692                };
1693
1694                chunk_instructions.push(Self {
1695                    preamble,
1696                    chunk_idx: block_index,
1697                    rows_to_skip: to_skip,
1698                    rows_to_take,
1699                    take_trailer,
1700                });
1701
1702                to_skip = 0;
1703                block_index += 1;
1704            }
1705        }
1706
1707        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1708        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1709        // rows of chunk 0 starting at 0")
1710        if user_ranges.len() > 1 {
1711            // TODO: Could probably optimize this allocation away
1712            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1713            let mut instructions_iter = chunk_instructions.into_iter();
1714            merged_instructions.push(instructions_iter.next().unwrap());
1715            for instruction in instructions_iter {
1716                let last = merged_instructions.last_mut().unwrap();
1717                if last.chunk_idx == instruction.chunk_idx
1718                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1719                {
1720                    last.rows_to_take += instruction.rows_to_take;
1721                    last.take_trailer |= instruction.take_trailer;
1722                } else {
1723                    merged_instructions.push(instruction);
1724                }
1725            }
1726            merged_instructions
1727        } else {
1728            chunk_instructions
1729        }
1730    }
1731
1732    fn drain_from_instruction(
1733        &self,
1734        rows_desired: &mut u64,
1735        need_preamble: &mut bool,
1736        skip_in_chunk: &mut u64,
1737    ) -> (ChunkDrainInstructions, bool) {
1738        // If we need the premable then we shouldn't be skipping anything
1739        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1740        let rows_avail = self.rows_to_take - *skip_in_chunk;
1741        let has_preamble = self.preamble != PreambleAction::Absent;
1742        let preamble_action = match (*need_preamble, has_preamble) {
1743            (true, true) => PreambleAction::Take,
1744            (true, false) => panic!("Need preamble but there isn't one"),
1745            (false, true) => PreambleAction::Skip,
1746            (false, false) => PreambleAction::Absent,
1747        };
1748
1749        // How many rows are we actually taking in this take step (including the preamble
1750        // and trailer both as individual rows)
1751        let rows_taking = if *rows_desired >= rows_avail {
1752            // We want all the rows.  If there is a trailer we are grabbing it and will need
1753            // the preamble of the next chunk
1754            // If there is a trailer and we are taking all the rows then we need the preamble
1755            // of the next chunk.
1756            //
1757            // Also, if this chunk is entirely preamble (rows_avail == 0 && !take_trailer) then we
1758            // need the preamble of the next chunk.
1759            *need_preamble = self.take_trailer;
1760            rows_avail
1761        } else {
1762            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1763            // it so we will not need the preamble
1764            *need_preamble = false;
1765            *rows_desired
1766        };
1767        let rows_skipped = *skip_in_chunk;
1768
1769        // Update the state for the next iteration
1770        let consumed_chunk = if *rows_desired >= rows_avail {
1771            *rows_desired -= rows_avail;
1772            *skip_in_chunk = 0;
1773            true
1774        } else {
1775            *skip_in_chunk += *rows_desired;
1776            *rows_desired = 0;
1777            false
1778        };
1779
1780        (
1781            ChunkDrainInstructions {
1782                chunk_instructions: self.clone(),
1783                rows_to_skip: rows_skipped,
1784                rows_to_take: rows_taking,
1785                preamble_action,
1786            },
1787            consumed_chunk,
1788        )
1789    }
1790}
1791
1792enum Words {
1793    U16(ScalarBuffer<u16>),
1794    U32(ScalarBuffer<u32>),
1795}
1796
1797struct WordsIter<'a> {
1798    iter: Box<dyn Iterator<Item = u32> + 'a>,
1799}
1800
1801impl Words {
1802    pub fn len(&self) -> usize {
1803        match self {
1804            Self::U16(b) => b.len(),
1805            Self::U32(b) => b.len(),
1806        }
1807    }
1808
1809    pub fn iter(&self) -> WordsIter<'_> {
1810        match self {
1811            Self::U16(buf) => WordsIter {
1812                iter: Box::new(buf.iter().map(|&x| x as u32)),
1813            },
1814            Self::U32(buf) => WordsIter {
1815                iter: Box::new(buf.iter().copied()),
1816            },
1817        }
1818    }
1819
1820    pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1821        let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1822        assert_eq!(bytes.len() % bytes_per_value, 0);
1823        let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1824        if has_large_chunk {
1825            Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1826        } else {
1827            Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1828        }
1829    }
1830}
1831
1832impl<'a> Iterator for WordsIter<'a> {
1833    type Item = u32;
1834
1835    fn next(&mut self) -> Option<Self::Item> {
1836        self.iter.next()
1837    }
1838}
1839
1840impl StructuralPageScheduler for MiniBlockScheduler {
1841    fn initialize<'a>(
1842        &'a mut self,
1843        io: &Arc<dyn EncodingsIo>,
1844    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1845        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1846        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1847        // need.
1848        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1849        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1850        let mut bufs_needed = 1;
1851        if self.dictionary.is_some() {
1852            bufs_needed += 1;
1853        }
1854        if self.repetition_index_depth > 0 {
1855            bufs_needed += 1;
1856        }
1857        let mut required_ranges = Vec::with_capacity(bufs_needed);
1858        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1859        if let Some(ref dictionary) = self.dictionary {
1860            required_ranges.push(
1861                dictionary.dictionary_buf_position_and_size.0
1862                    ..dictionary.dictionary_buf_position_and_size.0
1863                        + dictionary.dictionary_buf_position_and_size.1,
1864            );
1865        }
1866        if self.repetition_index_depth > 0 {
1867            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1868            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1869        }
1870        let io_req = io.submit_request(required_ranges, 0);
1871
1872        async move {
1873            let mut buffers = io_req.await?.into_iter().fuse();
1874            let meta_bytes = buffers.next().unwrap();
1875            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1876            let rep_index_bytes = buffers.next();
1877
1878            // Parse the metadata and build the chunk meta
1879            let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1880            let mut chunk_meta = Vec::with_capacity(words.len());
1881
1882            let mut rows_counter = 0;
1883            let mut offset_bytes = value_buf_position;
1884            for (word_idx, word) in words.iter().enumerate() {
1885                let log_num_values = word & 0x0F;
1886                let divided_bytes = word >> 4;
1887                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1888                debug_assert!(num_bytes > 0);
1889                let num_values = if word_idx < words.len() - 1 {
1890                    debug_assert!(log_num_values > 0);
1891                    1 << log_num_values
1892                } else {
1893                    debug_assert!(
1894                        log_num_values == 0
1895                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1896                    );
1897                    self.items_in_page - rows_counter
1898                };
1899                rows_counter += num_values;
1900
1901                chunk_meta.push(ChunkMeta {
1902                    num_values,
1903                    chunk_size_bytes: num_bytes as u64,
1904                    offset_bytes,
1905                });
1906                offset_bytes += num_bytes as u64;
1907            }
1908
1909            // Build the repetition index
1910            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1911                assert!(rep_index_data.len() % 8 == 0);
1912                let stride = self.repetition_index_depth as usize + 1;
1913                MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1914            } else {
1915                MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1916            };
1917
1918            let mut page_meta = MiniBlockCacheableState {
1919                chunk_meta,
1920                rep_index,
1921                dictionary: None,
1922            };
1923
1924            // decode dictionary
1925            if let Some(ref mut dictionary) = self.dictionary {
1926                let dictionary_data = dictionary_bytes.unwrap();
1927                page_meta.dictionary =
1928                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1929                        LanceBuffer::from_bytes(
1930                            dictionary_data,
1931                            dictionary.dictionary_data_alignment,
1932                        ),
1933                        dictionary.num_dictionary_items,
1934                    )?));
1935            };
1936            let page_meta = Arc::new(page_meta);
1937            self.page_meta = Some(page_meta.clone());
1938            Ok(page_meta as Arc<dyn CachedPageData>)
1939        }
1940        .boxed()
1941    }
1942
1943    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1944        self.page_meta = Some(
1945            data.clone()
1946                .as_arc_any()
1947                .downcast::<MiniBlockCacheableState>()
1948                .unwrap(),
1949        );
1950    }
1951
1952    fn schedule_ranges(
1953        &self,
1954        ranges: &[Range<u64>],
1955        io: &Arc<dyn EncodingsIo>,
1956    ) -> Result<Vec<PageLoadTask>> {
1957        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1958
1959        let page_meta = self.page_meta.as_ref().unwrap();
1960
1961        let chunk_instructions =
1962            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1963
1964        debug_assert_eq!(
1965            num_rows,
1966            chunk_instructions
1967                .iter()
1968                .map(|ci| ci.rows_to_take)
1969                .sum::<u64>()
1970        );
1971
1972        let chunks_needed = chunk_instructions
1973            .iter()
1974            .map(|ci| ci.chunk_idx)
1975            .unique()
1976            .collect::<Vec<_>>();
1977
1978        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1979        let chunk_ranges = loaded_chunks
1980            .iter()
1981            .map(|c| c.byte_range.clone())
1982            .collect::<Vec<_>>();
1983        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1984
1985        let rep_decompressor = self.rep_decompressor.clone();
1986        let def_decompressor = self.def_decompressor.clone();
1987        let value_decompressor = self.value_decompressor.clone();
1988        let num_buffers = self.num_buffers;
1989        let has_large_chunk = self.has_large_chunk;
1990        let dictionary = page_meta
1991            .dictionary
1992            .as_ref()
1993            .map(|dictionary| dictionary.clone());
1994        let def_meaning = self.def_meaning.clone();
1995
1996        let res = async move {
1997            let loaded_chunk_data = loaded_chunk_data.await?;
1998            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1999                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
2000            }
2001
2002            Ok(Box::new(MiniBlockDecoder {
2003                rep_decompressor,
2004                def_decompressor,
2005                value_decompressor,
2006                def_meaning,
2007                loaded_chunks: VecDeque::from_iter(loaded_chunks),
2008                instructions: VecDeque::from(chunk_instructions),
2009                offset_in_current_chunk: 0,
2010                dictionary,
2011                num_rows,
2012                num_buffers,
2013                has_large_chunk,
2014            }) as Box<dyn StructuralPageDecoder>)
2015        }
2016        .boxed();
2017        let page_load_task = PageLoadTask {
2018            decoder_fut: res,
2019            num_rows,
2020        };
2021        Ok(vec![page_load_task])
2022    }
2023}
2024
2025#[derive(Debug, Clone, Copy)]
2026struct FullZipRepIndexDetails {
2027    buf_position: u64,
2028    bytes_per_value: u64, // Will be 1, 2, 4, or 8
2029}
2030
2031#[derive(Debug)]
2032enum PerValueDecompressor {
2033    Fixed(Arc<dyn FixedPerValueDecompressor>),
2034    Variable(Arc<dyn VariablePerValueDecompressor>),
2035}
2036
2037#[derive(Debug)]
2038struct FullZipDecodeDetails {
2039    value_decompressor: PerValueDecompressor,
2040    def_meaning: Arc<[DefinitionInterpretation]>,
2041    ctrl_word_parser: ControlWordParser,
2042    max_rep: u16,
2043    max_visible_def: u16,
2044}
2045
2046/// Describes where FullZip byte ranges should be read from.
2047///
2048/// FullZip decoding always needs a list of byte ranges, but those bytes can come
2049/// from two different places:
2050/// - Remote I/O (normal path): ranges are fetched from the underlying `EncodingsIo`.
2051/// - A prefetched full page (full scan fast path): the entire page has already been
2052///   loaded once and ranges should be sliced from memory.
2053///
2054/// This abstraction keeps scheduling code focused on "which ranges are needed"
2055/// instead of "how bytes are fetched", and it lets full-page scans avoid the
2056/// two-stage rep-index -> data I/O pipeline.
2057#[derive(Debug, Clone)]
2058enum FullZipReadSource {
2059    /// Fetch ranges from the storage backend through the encoding I/O interface.
2060    Remote(Arc<dyn EncodingsIo>),
2061    /// Slice ranges from an already-loaded FullZip page buffer.
2062    PrefetchedPage { base_offset: u64, data: LanceBuffer },
2063}
2064
2065impl FullZipReadSource {
2066    /// Materialize the requested ranges as decode-ready `LanceBuffer`s.
2067    ///
2068    /// The returned buffers preserve the input range order.
2069    fn fetch(
2070        &self,
2071        ranges: &[Range<u64>],
2072        priority: u64,
2073    ) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
2074        match self {
2075            Self::Remote(io) => {
2076                let io = io.clone();
2077                let ranges = ranges.to_vec();
2078                async move {
2079                    let data = io.submit_request(ranges, priority).await?;
2080                    Ok(data
2081                        .into_iter()
2082                        .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2083                        .collect::<VecDeque<_>>())
2084                }
2085                .boxed()
2086            }
2087            Self::PrefetchedPage { base_offset, data } => {
2088                let base_offset = *base_offset;
2089                let data = data.clone();
2090                let page_end = base_offset + data.len() as u64;
2091                std::future::ready(
2092                    ranges
2093                        .iter()
2094                        .map(|range| {
2095                            if range.start > range.end
2096                                || range.start < base_offset
2097                                || range.end > page_end
2098                            {
2099                                return Err(Error::internal(format!(
2100                                    "Requested range {:?} is outside page range {}..{}",
2101                                    range, base_offset, page_end
2102                                )));
2103                            }
2104                            let start = (range.start - base_offset) as usize;
2105                            let len = (range.end - range.start) as usize;
2106                            Ok(data.slice_with_length(start, len))
2107                        })
2108                        .collect::<Result<VecDeque<_>>>(),
2109                )
2110                .boxed()
2111            }
2112        }
2113    }
2114}
2115
2116/// A scheduler for full-zip encoded data
2117///
2118/// When the data type has a fixed-width then we simply need to map from
2119/// row ranges to byte ranges using the fixed-width of the data type.
2120///
2121/// When the data type is variable-width or has any repetition then a
2122/// repetition index is required.
2123#[derive(Debug)]
2124pub struct FullZipScheduler {
2125    data_buf_position: u64,
2126    data_buf_size: u64,
2127    rep_index: Option<FullZipRepIndexDetails>,
2128    priority: u64,
2129    rows_in_page: u64,
2130    bits_per_offset: u8,
2131    details: Arc<FullZipDecodeDetails>,
2132    /// Cached state containing the decoded repetition index
2133    cached_state: Option<Arc<FullZipCacheableState>>,
2134    /// Whether repetition index metadata should be cached during initialize.
2135    enable_cache: bool,
2136}
2137
2138impl FullZipScheduler {
2139    fn try_new(
2140        buffer_offsets_and_sizes: &[(u64, u64)],
2141        priority: u64,
2142        rows_in_page: u64,
2143        layout: &pb21::FullZipLayout,
2144        decompressors: &dyn DecompressionStrategy,
2145    ) -> Result<Self> {
2146        let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
2147        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
2148            let num_reps = rows_in_page + 1;
2149            let bytes_per_rep = len / num_reps;
2150            debug_assert_eq!(len % num_reps, 0);
2151            debug_assert!(
2152                bytes_per_rep == 1
2153                    || bytes_per_rep == 2
2154                    || bytes_per_rep == 4
2155                    || bytes_per_rep == 8
2156            );
2157            FullZipRepIndexDetails {
2158                buf_position: *pos,
2159                bytes_per_value: bytes_per_rep,
2160            }
2161        });
2162
2163        let value_decompressor = match layout.details {
2164            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
2165                let decompressor = decompressors.create_fixed_per_value_decompressor(
2166                    layout.value_compression.as_ref().unwrap(),
2167                )?;
2168                PerValueDecompressor::Fixed(decompressor.into())
2169            }
2170            Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
2171                let decompressor = decompressors.create_variable_per_value_decompressor(
2172                    layout.value_compression.as_ref().unwrap(),
2173                )?;
2174                PerValueDecompressor::Variable(decompressor.into())
2175            }
2176            None => {
2177                panic!("Full-zip layout must have a `details` field");
2178            }
2179        };
2180        let ctrl_word_parser = ControlWordParser::new(
2181            layout.bits_rep.try_into().unwrap(),
2182            layout.bits_def.try_into().unwrap(),
2183        );
2184        let def_meaning = layout
2185            .layers
2186            .iter()
2187            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2188            .collect::<Vec<_>>();
2189
2190        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
2191        let max_visible_def = def_meaning
2192            .iter()
2193            .filter(|d| !d.is_list())
2194            .map(|d| d.num_def_levels())
2195            .sum();
2196
2197        let bits_per_offset = match layout.details {
2198            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
2199            Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
2200                bits_per_offset as u8
2201            }
2202            None => panic!("Full-zip layout must have a `details` field"),
2203        };
2204
2205        let details = Arc::new(FullZipDecodeDetails {
2206            value_decompressor,
2207            def_meaning: def_meaning.into(),
2208            ctrl_word_parser,
2209            max_rep,
2210            max_visible_def,
2211        });
2212        Ok(Self {
2213            data_buf_position,
2214            data_buf_size,
2215            rep_index,
2216            details,
2217            priority,
2218            rows_in_page,
2219            bits_per_offset,
2220            cached_state: None,
2221            enable_cache: false,
2222        })
2223    }
2224
2225    fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
2226        if ranges.is_empty() {
2227            return false;
2228        }
2229        let mut expected_start = 0;
2230        for range in ranges {
2231            if range.start != expected_start || range.end > rows_in_page || range.end < range.start
2232            {
2233                return false;
2234            }
2235            expected_start = range.end;
2236        }
2237        expected_start == rows_in_page
2238    }
2239
2240    fn create_page_load_task(
2241        io_future: BoxFuture<'static, Result<Vec<Bytes>>>,
2242        num_rows: u64,
2243        details: Arc<FullZipDecodeDetails>,
2244        bits_per_offset: u8,
2245    ) -> PageLoadTask {
2246        let load_task = async move {
2247            let buffers = io_future.await?;
2248            let data = buffers
2249                .into_iter()
2250                .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2251                .collect::<VecDeque<_>>();
2252            Self::create_decoder(details, data, num_rows, bits_per_offset)
2253        }
2254        .boxed();
2255        PageLoadTask {
2256            decoder_fut: load_task,
2257            num_rows,
2258        }
2259    }
2260
2261    /// Creates a decoder from the loaded data
2262    fn create_decoder(
2263        details: Arc<FullZipDecodeDetails>,
2264        data: VecDeque<LanceBuffer>,
2265        num_rows: u64,
2266        bits_per_offset: u8,
2267    ) -> Result<Box<dyn StructuralPageDecoder>> {
2268        match &details.value_decompressor {
2269            PerValueDecompressor::Fixed(decompressor) => {
2270                let bits_per_value = decompressor.bits_per_value();
2271                if bits_per_value % 8 != 0 {
2272                    return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
2273                }
2274                let bytes_per_value = bits_per_value / 8;
2275                let total_bytes_per_value =
2276                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2277                if total_bytes_per_value == 0 {
2278                    return Err(lance_core::Error::internal(
2279                        "Invalid encoding: per-row byte width must be greater than 0",
2280                    ));
2281                }
2282                Ok(Box::new(FixedFullZipDecoder {
2283                    details,
2284                    data,
2285                    num_rows,
2286                    offset_in_current: 0,
2287                    bytes_per_value: bytes_per_value as usize,
2288                    total_bytes_per_value,
2289                }) as Box<dyn StructuralPageDecoder>)
2290            }
2291            PerValueDecompressor::Variable(_decompressor) => {
2292                Ok(Box::new(VariableFullZipDecoder::new(
2293                    details,
2294                    data,
2295                    num_rows,
2296                    bits_per_offset,
2297                    bits_per_offset,
2298                )))
2299            }
2300        }
2301    }
2302
2303    /// Extracts byte ranges from a repetition index buffer
2304    /// The buffer contains pairs of (start, end) values for each range
2305    fn extract_byte_ranges_from_pairs(
2306        buffer: LanceBuffer,
2307        bytes_per_value: u64,
2308        data_buf_position: u64,
2309    ) -> Vec<Range<u64>> {
2310        ByteUnpacker::new(buffer, bytes_per_value as usize)
2311            .chunks(2)
2312            .into_iter()
2313            .map(|mut c| {
2314                let start = c.next().unwrap() + data_buf_position;
2315                let end = c.next().unwrap() + data_buf_position;
2316                start..end
2317            })
2318            .collect::<Vec<_>>()
2319    }
2320
2321    /// Extracts byte ranges from a cached repetition index buffer
2322    /// The buffer contains all values and we need to extract specific ranges
2323    fn extract_byte_ranges_from_cached(
2324        buffer: &LanceBuffer,
2325        ranges: &[Range<u64>],
2326        bytes_per_value: u64,
2327        data_buf_position: u64,
2328    ) -> Vec<Range<u64>> {
2329        ranges
2330            .iter()
2331            .map(|r| {
2332                let start_offset = (r.start * bytes_per_value) as usize;
2333                let end_offset = (r.end * bytes_per_value) as usize;
2334
2335                let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2336                let start_val =
2337                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2338                        .next()
2339                        .unwrap();
2340
2341                let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2342                let end_val =
2343                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2344                        .next()
2345                        .unwrap();
2346
2347                (data_buf_position + start_val)..(data_buf_position + end_val)
2348            })
2349            .collect()
2350    }
2351
2352    /// Computes the ranges in the repetition index that need to be loaded
2353    fn compute_rep_index_ranges(
2354        ranges: &[Range<u64>],
2355        rep_index: &FullZipRepIndexDetails,
2356    ) -> Vec<Range<u64>> {
2357        ranges
2358            .iter()
2359            .flat_map(|r| {
2360                let first_val_start =
2361                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2362                let first_val_end = first_val_start + rep_index.bytes_per_value;
2363                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2364                let last_val_end = last_val_start + rep_index.bytes_per_value;
2365                [first_val_start..first_val_end, last_val_start..last_val_end]
2366            })
2367            .collect()
2368    }
2369
2370    /// Schedules ranges in the presence of a repetition index
2371    fn schedule_ranges_rep(
2372        &self,
2373        ranges: &[Range<u64>],
2374        io: &Arc<dyn EncodingsIo>,
2375        rep_index: FullZipRepIndexDetails,
2376    ) -> Result<Vec<PageLoadTask>> {
2377        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2378        let data_buf_position = self.data_buf_position;
2379        let priority = self.priority;
2380        let details = self.details.clone();
2381        let bits_per_offset = self.bits_per_offset;
2382
2383        if Self::covers_entire_page(ranges, self.rows_in_page) {
2384            let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
2385            let page_data = io.submit_single(full_range.clone(), priority);
2386            let load_task = async move {
2387                let page_data = page_data.await?;
2388                let source = FullZipReadSource::PrefetchedPage {
2389                    base_offset: full_range.start,
2390                    data: LanceBuffer::from_bytes(page_data, 1),
2391                };
2392                let read_ranges = vec![full_range];
2393                let data = source.fetch(&read_ranges, priority).await?;
2394                Self::create_decoder(details, data, num_rows, bits_per_offset)
2395            }
2396            .boxed();
2397            let page_load_task = PageLoadTask {
2398                decoder_fut: load_task,
2399                num_rows,
2400            };
2401            return Ok(vec![page_load_task]);
2402        }
2403
2404        if let Some(cached_state) = &self.cached_state {
2405            let byte_ranges = Self::extract_byte_ranges_from_cached(
2406                &cached_state.rep_index_buffer,
2407                ranges,
2408                rep_index.bytes_per_value,
2409                data_buf_position,
2410            );
2411            let io_future = io.submit_request(byte_ranges, priority);
2412            let page_load_task =
2413                Self::create_page_load_task(io_future, num_rows, details, bits_per_offset);
2414            return Ok(vec![page_load_task]);
2415        }
2416
2417        let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
2418        let rep_data = io.submit_request(rep_ranges, priority);
2419        let io_clone = io.clone();
2420        let load_task = async move {
2421            let rep_data = rep_data.await?;
2422            let rep_buffer = LanceBuffer::concat(
2423                &rep_data
2424                    .into_iter()
2425                    .map(|d| LanceBuffer::from_bytes(d, 1))
2426                    .collect::<Vec<_>>(),
2427            );
2428            let byte_ranges = Self::extract_byte_ranges_from_pairs(
2429                rep_buffer,
2430                rep_index.bytes_per_value,
2431                data_buf_position,
2432            );
2433            let source = FullZipReadSource::Remote(io_clone);
2434            let data = source.fetch(&byte_ranges, priority).await?;
2435            Self::create_decoder(details, data, num_rows, bits_per_offset)
2436        }
2437        .boxed();
2438        let page_load_task = PageLoadTask {
2439            decoder_fut: load_task,
2440            num_rows,
2441        };
2442        Ok(vec![page_load_task])
2443    }
2444
2445    // In the simple case there is no repetition and we just have large fixed-width
2446    // rows of data.  We can just map row ranges to byte ranges directly using the
2447    // fixed-width of the data type.
2448    fn schedule_ranges_simple(
2449        &self,
2450        ranges: &[Range<u64>],
2451        io: &Arc<dyn EncodingsIo>,
2452    ) -> Result<Vec<PageLoadTask>> {
2453        // Convert row ranges to item ranges (i.e. multiply by items per row)
2454        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2455
2456        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2457            unreachable!()
2458        };
2459
2460        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2461        let bits_per_value = decompressor.bits_per_value();
2462        assert_eq!(bits_per_value % 8, 0);
2463        let bytes_per_value = bits_per_value / 8;
2464        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2465        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2466        let byte_ranges = ranges
2467            .iter()
2468            .map(|r| {
2469                debug_assert!(r.end <= self.rows_in_page);
2470                let start = self.data_buf_position + r.start * total_bytes_per_value;
2471                let end = self.data_buf_position + r.end * total_bytes_per_value;
2472                start..end
2473            })
2474            .collect::<Vec<_>>();
2475
2476        let io_future = io.submit_request(byte_ranges, self.priority);
2477        let page_load_task = Self::create_page_load_task(
2478            io_future,
2479            num_rows,
2480            self.details.clone(),
2481            self.bits_per_offset,
2482        );
2483        Ok(vec![page_load_task])
2484    }
2485}
2486
2487/// Cacheable state for FullZip encoding, storing the decoded repetition index
2488#[derive(Debug)]
2489struct FullZipCacheableState {
2490    /// The raw repetition index buffer for future decoding
2491    rep_index_buffer: LanceBuffer,
2492}
2493
2494impl DeepSizeOf for FullZipCacheableState {
2495    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2496        self.rep_index_buffer.len()
2497    }
2498}
2499
2500impl CachedPageData for FullZipCacheableState {
2501    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2502        self
2503    }
2504}
2505
2506impl StructuralPageScheduler for FullZipScheduler {
2507    fn initialize<'a>(
2508        &'a mut self,
2509        io: &Arc<dyn EncodingsIo>,
2510    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2511        if self.enable_cache
2512            && let Some(rep_index) = self.rep_index
2513        {
2514            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2515            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2516            let io_clone = io.clone();
2517            return async move {
2518                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2519                let state = Arc::new(FullZipCacheableState {
2520                    rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
2521                });
2522                self.cached_state = Some(state.clone());
2523                Ok(state as Arc<dyn CachedPageData>)
2524            }
2525            .boxed();
2526        }
2527        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2528    }
2529
2530    /// Loads previously cached repetition index data from the cache system.
2531    /// This method is called when a scheduler instance needs to use cached data
2532    /// that was initialized by another instance or in a previous operation.
2533    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2534        // Try to downcast to our specific cache type
2535        if let Ok(cached_state) = cache
2536            .clone()
2537            .as_arc_any()
2538            .downcast::<FullZipCacheableState>()
2539        {
2540            // Store the cached state for use in schedule_ranges
2541            self.cached_state = Some(cached_state);
2542        }
2543    }
2544
2545    fn schedule_ranges(
2546        &self,
2547        ranges: &[Range<u64>],
2548        io: &Arc<dyn EncodingsIo>,
2549    ) -> Result<Vec<PageLoadTask>> {
2550        if let Some(rep_index) = self.rep_index {
2551            self.schedule_ranges_rep(ranges, io, rep_index)
2552        } else {
2553            self.schedule_ranges_simple(ranges, io)
2554        }
2555    }
2556}
2557
2558/// A decoder for full-zip encoded data when the data has a fixed-width
2559///
2560/// Here we need to unzip the control words from the values themselves and
2561/// then decompress the requested values.
2562///
2563/// We use a PerValueDecompressor because we will only be decompressing the
2564/// requested data.  This decoder / scheduler does not do any read amplification.
2565#[derive(Debug)]
2566struct FixedFullZipDecoder {
2567    details: Arc<FullZipDecodeDetails>,
2568    data: VecDeque<LanceBuffer>,
2569    offset_in_current: usize,
2570    bytes_per_value: usize,
2571    total_bytes_per_value: usize,
2572    num_rows: u64,
2573}
2574
2575impl FixedFullZipDecoder {
2576    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2577        debug_assert!(num_rows > 0);
2578        let cur_buf = self.data.front_mut().unwrap();
2579        let start = self.offset_in_current;
2580        if self.details.ctrl_word_parser.has_rep() {
2581            // This is a slightly slower path.  In order to figure out where to split we need to
2582            // examine the rep index so we can convert num_lists to num_rows
2583            let mut rows_started = 0;
2584            // We always need at least one value.  Now loop through until we have passed num_rows
2585            // values
2586            let mut num_items = 0;
2587            while self.offset_in_current < cur_buf.len() {
2588                let control = self.details.ctrl_word_parser.parse_desc(
2589                    &cur_buf[self.offset_in_current..],
2590                    self.details.max_rep,
2591                    self.details.max_visible_def,
2592                );
2593                if control.is_new_row {
2594                    if rows_started == num_rows {
2595                        break;
2596                    }
2597                    rows_started += 1;
2598                }
2599                num_items += 1;
2600                if control.is_visible {
2601                    self.offset_in_current += self.total_bytes_per_value;
2602                } else {
2603                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2604                }
2605            }
2606
2607            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2608            if self.offset_in_current == cur_buf.len() {
2609                self.data.pop_front();
2610                self.offset_in_current = 0;
2611            }
2612
2613            FullZipDecodeTaskItem {
2614                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2615                    data: task_slice,
2616                    bits_per_value: self.bytes_per_value as u64 * 8,
2617                    num_values: num_items,
2618                    block_info: BlockInfo::new(),
2619                }),
2620                rows_in_buf: rows_started,
2621            }
2622        } else {
2623            // If there's no repetition we can calculate the slicing point by just multiplying
2624            // the number of rows by the total bytes per value
2625            let cur_buf = self.data.front_mut().unwrap();
2626            let bytes_avail = cur_buf.len() - self.offset_in_current;
2627            let offset_in_cur = self.offset_in_current;
2628
2629            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2630            let mut rows_taken = num_rows;
2631            let task_slice = if bytes_needed >= bytes_avail {
2632                self.offset_in_current = 0;
2633                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2634                self.data
2635                    .pop_front()
2636                    .unwrap()
2637                    .slice_with_length(offset_in_cur, bytes_avail)
2638            } else {
2639                self.offset_in_current += bytes_needed;
2640                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2641            };
2642            FullZipDecodeTaskItem {
2643                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2644                    data: task_slice,
2645                    bits_per_value: self.bytes_per_value as u64 * 8,
2646                    num_values: rows_taken,
2647                    block_info: BlockInfo::new(),
2648                }),
2649                rows_in_buf: rows_taken,
2650            }
2651        }
2652    }
2653}
2654
2655impl StructuralPageDecoder for FixedFullZipDecoder {
2656    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2657        let mut task_data = Vec::with_capacity(self.data.len());
2658        let mut remaining = num_rows;
2659        while remaining > 0 {
2660            let task_item = self.slice_next_task(remaining);
2661            remaining -= task_item.rows_in_buf;
2662            task_data.push(task_item);
2663        }
2664        Ok(Box::new(FixedFullZipDecodeTask {
2665            details: self.details.clone(),
2666            data: task_data,
2667            bytes_per_value: self.bytes_per_value,
2668            num_rows: num_rows as usize,
2669        }))
2670    }
2671
2672    fn num_rows(&self) -> u64 {
2673        self.num_rows
2674    }
2675}
2676
2677/// A decoder for full-zip encoded data when the data has a variable-width
2678///
2679/// Here we need to unzip the control words AND lengths from the values and
2680/// then decompress the requested values.
2681#[derive(Debug)]
2682struct VariableFullZipDecoder {
2683    details: Arc<FullZipDecodeDetails>,
2684    decompressor: Arc<dyn VariablePerValueDecompressor>,
2685    data: LanceBuffer,
2686    offsets: LanceBuffer,
2687    rep: ScalarBuffer<u16>,
2688    def: ScalarBuffer<u16>,
2689    repdef_starts: Vec<usize>,
2690    data_starts: Vec<usize>,
2691    offset_starts: Vec<usize>,
2692    visible_item_counts: Vec<u64>,
2693    bits_per_offset: u8,
2694    current_idx: usize,
2695    num_rows: u64,
2696}
2697
2698impl VariableFullZipDecoder {
2699    fn new(
2700        details: Arc<FullZipDecodeDetails>,
2701        data: VecDeque<LanceBuffer>,
2702        num_rows: u64,
2703        in_bits_per_length: u8,
2704        out_bits_per_offset: u8,
2705    ) -> Self {
2706        let decompressor = match details.value_decompressor {
2707            PerValueDecompressor::Variable(ref d) => d.clone(),
2708            _ => unreachable!(),
2709        };
2710
2711        assert_eq!(in_bits_per_length % 8, 0);
2712        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2713
2714        let mut decoder = Self {
2715            details,
2716            decompressor,
2717            data: LanceBuffer::empty(),
2718            offsets: LanceBuffer::empty(),
2719            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2720            def: LanceBuffer::empty().borrow_to_typed_slice(),
2721            bits_per_offset: out_bits_per_offset,
2722            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2723            data_starts: Vec::with_capacity(num_rows as usize + 1),
2724            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2725            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2726            current_idx: 0,
2727            num_rows,
2728        };
2729
2730        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2731        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2732        // unpacked to know where the values start and end.
2733        //
2734        // We don't want to unzip on the decode thread because that is a single-threaded path
2735        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2736        //
2737        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2738        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2739        // we load all the data into memory and then throw it away only to load it all into memory again during
2740        // the decode.
2741        //
2742        // There are some alternatives to investigate:
2743        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2744        //     range in between.  This will give us the break points that we need for slicing and won't increase
2745        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2746        //     even when doing a full scan.
2747        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2748        //     has to do more work but the work is all fused.
2749        //   - We could just try doing this work on the decode thread and see if it is a problem.
2750        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2751
2752        decoder
2753    }
2754
2755    fn slice_batch_data_and_rebase_offsets_typed<T>(
2756        data: &LanceBuffer,
2757        offsets: &LanceBuffer,
2758    ) -> Result<(LanceBuffer, LanceBuffer)>
2759    where
2760        T: arrow_buffer::ArrowNativeType
2761            + Copy
2762            + PartialOrd
2763            + std::ops::Sub<Output = T>
2764            + std::fmt::Display
2765            + TryInto<usize>,
2766    {
2767        let offsets_slice = offsets.borrow_to_typed_slice::<T>();
2768        let offsets_slice = offsets_slice.as_ref();
2769        if offsets_slice.is_empty() {
2770            return Err(Error::internal(
2771                "Variable offsets cannot be empty".to_string(),
2772            ));
2773        }
2774
2775        let base = offsets_slice[0];
2776        let end = *offsets_slice.last().unwrap();
2777        if end < base {
2778            return Err(Error::internal(format!(
2779                "Invalid variable offsets: end ({end}) is less than base ({base})"
2780            )));
2781        }
2782
2783        let data_start = base.try_into().map_err(|_| {
2784            Error::internal(format!("Variable offset ({base}) does not fit into usize"))
2785        })?;
2786        let data_end = end.try_into().map_err(|_| {
2787            Error::internal(format!("Variable offset ({end}) does not fit into usize"))
2788        })?;
2789        if data_end > data.len() {
2790            return Err(Error::internal(format!(
2791                "Invalid variable offsets: end ({data_end}) exceeds data len ({})",
2792                data.len()
2793            )));
2794        }
2795
2796        let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
2797        for &offset in offsets_slice {
2798            if offset < base {
2799                return Err(Error::internal(format!(
2800                    "Invalid variable offsets: offset ({offset}) is less than base ({base})"
2801                )));
2802            }
2803            rebased_offsets.push(offset - base);
2804        }
2805
2806        let sliced_data = data.slice_with_length(data_start, data_end - data_start);
2807        // Copy into a compact buffer so each output batch owns only what it references.
2808        let sliced_data = LanceBuffer::copy_slice(&sliced_data);
2809        let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
2810        Ok((sliced_data, rebased_offsets))
2811    }
2812
2813    fn slice_batch_data_and_rebase_offsets(
2814        data: &LanceBuffer,
2815        offsets: &LanceBuffer,
2816        bits_per_offset: u8,
2817    ) -> Result<(LanceBuffer, LanceBuffer)> {
2818        match bits_per_offset {
2819            32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
2820            64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
2821            _ => Err(Error::internal(format!(
2822                "Unsupported bits_per_offset={bits_per_offset}"
2823            ))),
2824        }
2825    }
2826
2827    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2828        match bits_per_offset {
2829            8 => *data.get_unchecked(0) as u64,
2830            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2831            32 => u32::from_le_bytes([
2832                *data.get_unchecked(0),
2833                *data.get_unchecked(1),
2834                *data.get_unchecked(2),
2835                *data.get_unchecked(3),
2836            ]) as u64,
2837            64 => u64::from_le_bytes([
2838                *data.get_unchecked(0),
2839                *data.get_unchecked(1),
2840                *data.get_unchecked(2),
2841                *data.get_unchecked(3),
2842                *data.get_unchecked(4),
2843                *data.get_unchecked(5),
2844                *data.get_unchecked(6),
2845                *data.get_unchecked(7),
2846            ]),
2847            _ => unreachable!(),
2848        }
2849    }
2850
2851    fn unzip(
2852        &mut self,
2853        data: VecDeque<LanceBuffer>,
2854        in_bits_per_length: u8,
2855        out_bits_per_offset: u8,
2856        num_rows: u64,
2857    ) {
2858        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2859        let mut rep = Vec::with_capacity(num_rows as usize);
2860        let mut def = Vec::with_capacity(num_rows as usize);
2861        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2862
2863        // This undercounts if there are lists
2864        // It can also overcount if there are invisible items
2865        let bytes_per_offset = out_bits_per_offset as usize / 8;
2866        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2867        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2868
2869        let bytes_per_length = in_bits_per_length as usize / 8;
2870        let bytes_lengths = bytes_per_length * num_rows as usize;
2871
2872        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2873        // This overcounts since bytes_lengths and bytes_cw are undercounts
2874        // It can also undercount if there are invisible items (hence the saturating_sub)
2875        let mut unzipped_data =
2876            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2877
2878        let mut current_offset = 0_u64;
2879        let mut visible_item_count = 0_u64;
2880        for databuf in data.into_iter() {
2881            let mut databuf = databuf.as_ref();
2882            while !databuf.is_empty() {
2883                let data_start = unzipped_data.len();
2884                let offset_start = offsets_data.len();
2885                // We might have only-rep or only-def, neither, or both.  They move at the same
2886                // speed though so we only need one index into it
2887                let repdef_start = rep.len().max(def.len());
2888                // TODO: Kind of inefficient we parse the control word twice here
2889                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2890                    databuf,
2891                    self.details.max_rep,
2892                    self.details.max_visible_def,
2893                );
2894                self.details
2895                    .ctrl_word_parser
2896                    .parse(databuf, &mut rep, &mut def);
2897                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2898
2899                if ctrl_desc.is_new_row {
2900                    self.repdef_starts.push(repdef_start);
2901                    self.data_starts.push(data_start);
2902                    self.offset_starts.push(offset_start);
2903                    self.visible_item_counts.push(visible_item_count);
2904                }
2905                if ctrl_desc.is_visible {
2906                    visible_item_count += 1;
2907                    if ctrl_desc.is_valid_item {
2908                        // Safety: Data should have at least bytes_per_length bytes remaining
2909                        debug_assert!(databuf.len() >= bytes_per_length);
2910                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2911                        match out_bits_per_offset {
2912                            32 => offsets_data
2913                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2914                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2915                            _ => unreachable!(),
2916                        };
2917                        databuf = &databuf[bytes_per_offset..];
2918                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2919                        databuf = &databuf[length as usize..];
2920                        current_offset += length;
2921                    } else {
2922                        // Null items still get an offset
2923                        match out_bits_per_offset {
2924                            32 => offsets_data
2925                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2926                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2927                            _ => unreachable!(),
2928                        }
2929                    }
2930                }
2931            }
2932        }
2933        self.repdef_starts.push(rep.len().max(def.len()));
2934        self.data_starts.push(unzipped_data.len());
2935        self.offset_starts.push(offsets_data.len());
2936        self.visible_item_counts.push(visible_item_count);
2937        match out_bits_per_offset {
2938            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2939            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2940            _ => unreachable!(),
2941        };
2942        self.rep = ScalarBuffer::from(rep);
2943        self.def = ScalarBuffer::from(def);
2944        self.data = LanceBuffer::from(unzipped_data);
2945        self.offsets = LanceBuffer::from(offsets_data);
2946    }
2947}
2948
2949impl StructuralPageDecoder for VariableFullZipDecoder {
2950    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2951        let start = self.current_idx;
2952        let end = start + num_rows as usize;
2953
2954        let offset_start = self.offset_starts[start];
2955        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2956        let offsets = self
2957            .offsets
2958            .slice_with_length(offset_start, offset_end - offset_start);
2959        // Keep each batch's variable data buffer bounded to the selected rows.
2960        let (data, offsets) =
2961            Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
2962
2963        let repdef_start = self.repdef_starts[start];
2964        let repdef_end = self.repdef_starts[end];
2965        let rep = if self.rep.is_empty() {
2966            self.rep.clone()
2967        } else {
2968            self.rep.slice(repdef_start, repdef_end - repdef_start)
2969        };
2970        let def = if self.def.is_empty() {
2971            self.def.clone()
2972        } else {
2973            self.def.slice(repdef_start, repdef_end - repdef_start)
2974        };
2975
2976        let visible_item_counts_start = self.visible_item_counts[start];
2977        let visible_item_counts_end = self.visible_item_counts[end];
2978        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2979
2980        self.current_idx += num_rows as usize;
2981
2982        Ok(Box::new(VariableFullZipDecodeTask {
2983            details: self.details.clone(),
2984            decompressor: self.decompressor.clone(),
2985            data,
2986            offsets,
2987            bits_per_offset: self.bits_per_offset,
2988            num_visible_items,
2989            rep,
2990            def,
2991        }))
2992    }
2993
2994    fn num_rows(&self) -> u64 {
2995        self.num_rows
2996    }
2997}
2998
2999#[derive(Debug)]
3000struct VariableFullZipDecodeTask {
3001    details: Arc<FullZipDecodeDetails>,
3002    decompressor: Arc<dyn VariablePerValueDecompressor>,
3003    data: LanceBuffer,
3004    offsets: LanceBuffer,
3005    bits_per_offset: u8,
3006    num_visible_items: u64,
3007    rep: ScalarBuffer<u16>,
3008    def: ScalarBuffer<u16>,
3009}
3010
3011impl DecodePageTask for VariableFullZipDecodeTask {
3012    fn decode(self: Box<Self>) -> Result<DecodedPage> {
3013        let block = VariableWidthBlock {
3014            data: self.data,
3015            offsets: self.offsets,
3016            bits_per_offset: self.bits_per_offset,
3017            num_values: self.num_visible_items,
3018            block_info: BlockInfo::new(),
3019        };
3020        let decomopressed = self.decompressor.decompress(block)?;
3021        let rep = if self.rep.is_empty() {
3022            None
3023        } else {
3024            Some(self.rep.to_vec())
3025        };
3026        let def = if self.def.is_empty() {
3027            None
3028        } else {
3029            Some(self.def.to_vec())
3030        };
3031        let unraveler = RepDefUnraveler::new(
3032            rep,
3033            def,
3034            self.details.def_meaning.clone(),
3035            self.num_visible_items,
3036        );
3037        Ok(DecodedPage {
3038            data: decomopressed,
3039            repdef: unraveler,
3040        })
3041    }
3042}
3043
3044#[derive(Debug)]
3045struct FullZipDecodeTaskItem {
3046    data: PerValueDataBlock,
3047    rows_in_buf: u64,
3048}
3049
3050/// A task to unzip and decompress full-zip encoded data when that data
3051/// has a fixed-width.
3052#[derive(Debug)]
3053struct FixedFullZipDecodeTask {
3054    details: Arc<FullZipDecodeDetails>,
3055    data: Vec<FullZipDecodeTaskItem>,
3056    num_rows: usize,
3057    bytes_per_value: usize,
3058}
3059
3060impl DecodePageTask for FixedFullZipDecodeTask {
3061    fn decode(self: Box<Self>) -> Result<DecodedPage> {
3062        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
3063        let estimated_size_bytes = self
3064            .data
3065            .iter()
3066            .map(|task_item| task_item.data.data_size() as usize)
3067            .sum::<usize>()
3068            * 2;
3069        let mut data_builder =
3070            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
3071
3072        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
3073            // Fast path, no need to unzip because there is no rep/def
3074            //
3075            // We decompress each buffer and add it to our output buffer
3076            for task_item in self.data.into_iter() {
3077                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3078                    unreachable!()
3079                };
3080                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3081                else {
3082                    unreachable!()
3083                };
3084                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
3085                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
3086                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
3087            }
3088
3089            let unraveler = RepDefUnraveler::new(
3090                None,
3091                None,
3092                self.details.def_meaning.clone(),
3093                self.num_rows as u64,
3094            );
3095
3096            Ok(DecodedPage {
3097                data: data_builder.finish(),
3098                repdef: unraveler,
3099            })
3100        } else {
3101            // Slow path, unzipping needed
3102            let mut rep = Vec::with_capacity(self.num_rows);
3103            let mut def = Vec::with_capacity(self.num_rows);
3104
3105            for task_item in self.data.into_iter() {
3106                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3107                    unreachable!()
3108                };
3109                let mut buf_slice = fixed_data.data.as_ref();
3110                let num_values = fixed_data.num_values as usize;
3111                // We will be unzipping repdef in to `rep` and `def` and the
3112                // values into `values` (which contains the compressed values)
3113                let mut values = Vec::with_capacity(
3114                    fixed_data.data.len()
3115                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
3116                );
3117                let mut visible_items = 0;
3118                for _ in 0..num_values {
3119                    // Extract rep/def
3120                    self.details
3121                        .ctrl_word_parser
3122                        .parse(buf_slice, &mut rep, &mut def);
3123                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
3124
3125                    let is_visible = def
3126                        .last()
3127                        .map(|d| *d <= self.details.max_visible_def)
3128                        .unwrap_or(true);
3129                    if is_visible {
3130                        // Extract value
3131                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
3132                        buf_slice = &buf_slice[self.bytes_per_value..];
3133                        visible_items += 1;
3134                    }
3135                }
3136
3137                // Finally, we decompress the values and add them to our output buffer
3138                let values_buf = LanceBuffer::from(values);
3139                let fixed_data = FixedWidthDataBlock {
3140                    bits_per_value: self.bytes_per_value as u64 * 8,
3141                    block_info: BlockInfo::new(),
3142                    data: values_buf,
3143                    num_values: visible_items,
3144                };
3145                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3146                else {
3147                    unreachable!()
3148                };
3149                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
3150                data_builder.append(&decompressed, 0..visible_items);
3151            }
3152
3153            let repetition = if rep.is_empty() { None } else { Some(rep) };
3154            let definition = if def.is_empty() { None } else { Some(def) };
3155
3156            let unraveler = RepDefUnraveler::new(
3157                repetition,
3158                definition,
3159                self.details.def_meaning.clone(),
3160                self.num_rows as u64,
3161            );
3162            let data = data_builder.finish();
3163
3164            Ok(DecodedPage {
3165                data,
3166                repdef: unraveler,
3167            })
3168        }
3169    }
3170}
3171
3172#[derive(Debug)]
3173struct StructuralPrimitiveFieldSchedulingJob<'a> {
3174    scheduler: &'a StructuralPrimitiveFieldScheduler,
3175    ranges: Vec<Range<u64>>,
3176    page_idx: usize,
3177    range_idx: usize,
3178    global_row_offset: u64,
3179}
3180
3181impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
3182    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
3183        Self {
3184            scheduler,
3185            ranges,
3186            page_idx: 0,
3187            range_idx: 0,
3188            global_row_offset: 0,
3189        }
3190    }
3191}
3192
3193impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
3194    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
3195        if self.range_idx >= self.ranges.len() {
3196            return Ok(Vec::new());
3197        }
3198        // Get our current range
3199        let mut range = self.ranges[self.range_idx].clone();
3200        let priority = range.start;
3201
3202        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
3203        trace!(
3204            "Current range is {:?} and current page has {} rows",
3205            range, cur_page.num_rows
3206        );
3207        // Skip entire pages until we have some overlap with our next range
3208        while cur_page.num_rows + self.global_row_offset <= range.start {
3209            self.global_row_offset += cur_page.num_rows;
3210            self.page_idx += 1;
3211            trace!("Skipping entire page of {} rows", cur_page.num_rows);
3212            cur_page = &self.scheduler.page_schedulers[self.page_idx];
3213        }
3214
3215        // Now the cur_page has overlap with range.  Continue looping through ranges
3216        // until we find a range that exceeds the current page
3217
3218        let mut ranges_in_page = Vec::new();
3219        while cur_page.num_rows + self.global_row_offset > range.start {
3220            range.start = range.start.max(self.global_row_offset);
3221            let start_in_page = range.start - self.global_row_offset;
3222            let end_in_page = start_in_page + (range.end - range.start);
3223            let end_in_page = end_in_page.min(cur_page.num_rows);
3224            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
3225
3226            ranges_in_page.push(start_in_page..end_in_page);
3227            if last_in_range {
3228                self.range_idx += 1;
3229                if self.range_idx == self.ranges.len() {
3230                    break;
3231                }
3232                range = self.ranges[self.range_idx].clone();
3233            } else {
3234                break;
3235            }
3236        }
3237
3238        trace!(
3239            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
3240            ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
3241            ranges_in_page.len(),
3242            cur_page.num_rows,
3243            priority,
3244            self.scheduler.column_index,
3245            cur_page.page_index,
3246        );
3247
3248        self.global_row_offset += cur_page.num_rows;
3249        self.page_idx += 1;
3250
3251        let page_decoders = cur_page
3252            .scheduler
3253            .schedule_ranges(&ranges_in_page, context.io())?;
3254
3255        let cur_path = context.current_path();
3256        page_decoders
3257            .into_iter()
3258            .map(|page_load_task| {
3259                let cur_path = cur_path.clone();
3260                let page_decoder = page_load_task.decoder_fut;
3261                let unloaded_page = async move {
3262                    let page_decoder = page_decoder.await?;
3263                    Ok(LoadedPageShard {
3264                        decoder: page_decoder,
3265                        path: cur_path,
3266                    })
3267                }
3268                .boxed();
3269                Ok(ScheduledScanLine {
3270                    decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
3271                    rows_scheduled: page_load_task.num_rows,
3272                })
3273            })
3274            .collect::<Result<Vec<_>>>()
3275    }
3276}
3277
3278#[derive(Debug)]
3279struct PageInfoAndScheduler {
3280    page_index: usize,
3281    num_rows: u64,
3282    scheduler: Box<dyn StructuralPageScheduler>,
3283}
3284
3285/// A scheduler for a leaf node
3286///
3287/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
3288/// appropriate for the layout of the page.
3289#[derive(Debug)]
3290pub struct StructuralPrimitiveFieldScheduler {
3291    page_schedulers: Vec<PageInfoAndScheduler>,
3292    column_index: u32,
3293    // Identifies the requested decode shape (e.g. blob descriptor struct vs
3294    // raw bytes). Blob columns can produce multiple page scheduler variants
3295    // for the same physical column depending on the target field's data type,
3296    // and the cached page state types differ per variant. The view tag is
3297    // mixed into the cache key so different variants do not collide.
3298    view_tag: String,
3299}
3300
3301impl StructuralPrimitiveFieldScheduler {
3302    pub fn try_new(
3303        column_info: &ColumnInfo,
3304        decompressors: &dyn DecompressionStrategy,
3305        cache_repetition_index: bool,
3306        target_field: &Field,
3307    ) -> Result<Self> {
3308        let page_schedulers = column_info
3309            .page_infos
3310            .iter()
3311            .enumerate()
3312            .map(|(page_index, page_info)| {
3313                Self::page_info_to_scheduler(
3314                    page_info,
3315                    page_index,
3316                    decompressors,
3317                    cache_repetition_index,
3318                    target_field,
3319                )
3320            })
3321            .collect::<Result<Vec<_>>>()?;
3322        Ok(Self {
3323            page_schedulers,
3324            column_index: column_info.index,
3325            view_tag: format!("{:?}", target_field.data_type()),
3326        })
3327    }
3328
3329    fn page_layout_to_scheduler(
3330        page_info: &PageInfo,
3331        page_layout: &PageLayout,
3332        decompressors: &dyn DecompressionStrategy,
3333        cache_repetition_index: bool,
3334        target_field: &Field,
3335    ) -> Result<Box<dyn StructuralPageScheduler>> {
3336        use pb21::page_layout::Layout;
3337        Ok(match page_layout.layout.as_ref().expect_ok()? {
3338            Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3339                &page_info.buffer_offsets_and_sizes,
3340                page_info.priority,
3341                mini_block.num_items,
3342                mini_block,
3343                decompressors,
3344            )?),
3345            Layout::FullZipLayout(full_zip) => {
3346                let mut scheduler = FullZipScheduler::try_new(
3347                    &page_info.buffer_offsets_and_sizes,
3348                    page_info.priority,
3349                    page_info.num_rows,
3350                    full_zip,
3351                    decompressors,
3352                )?;
3353                scheduler.enable_cache = cache_repetition_index;
3354                Box::new(scheduler)
3355            }
3356            Layout::ConstantLayout(constant_layout) => {
3357                let def_meaning = constant_layout
3358                    .layers
3359                    .iter()
3360                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3361                    .collect::<Vec<_>>();
3362                let has_scalar_value = constant_layout.inline_value.is_some()
3363                    || page_info.buffer_offsets_and_sizes.len() == 1
3364                    || page_info.buffer_offsets_and_sizes.len() == 3;
3365                if has_scalar_value {
3366                    Box::new(constant::ConstantPageScheduler::try_new(
3367                        page_info.buffer_offsets_and_sizes.clone(),
3368                        constant_layout.inline_value.clone(),
3369                        target_field.data_type(),
3370                        def_meaning.into(),
3371                    )?) as Box<dyn StructuralPageScheduler>
3372                } else if def_meaning.len() == 1
3373                    && def_meaning[0] == DefinitionInterpretation::NullableItem
3374                {
3375                    Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3376                } else {
3377                    let rep_decompressor = constant_layout
3378                        .rep_compression
3379                        .as_ref()
3380                        .map(|encoding| decompressors.create_block_decompressor(encoding))
3381                        .transpose()?
3382                        .map(Arc::from);
3383
3384                    let def_decompressor = constant_layout
3385                        .def_compression
3386                        .as_ref()
3387                        .map(|encoding| decompressors.create_block_decompressor(encoding))
3388                        .transpose()?
3389                        .map(Arc::from);
3390
3391                    Box::new(ComplexAllNullScheduler::new(
3392                        page_info.buffer_offsets_and_sizes.clone(),
3393                        def_meaning.into(),
3394                        rep_decompressor,
3395                        def_decompressor,
3396                        constant_layout.num_rep_values,
3397                        constant_layout.num_def_values,
3398                    )) as Box<dyn StructuralPageScheduler>
3399                }
3400            }
3401            Layout::BlobLayout(blob) => {
3402                let inner_scheduler = Self::page_layout_to_scheduler(
3403                    page_info,
3404                    blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3405                    decompressors,
3406                    cache_repetition_index,
3407                    target_field,
3408                )?;
3409                let def_meaning = blob
3410                    .layers
3411                    .iter()
3412                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3413                    .collect::<Vec<_>>();
3414                if matches!(target_field.data_type(), DataType::Struct(_)) {
3415                    // User wants to decode blob into struct
3416                    Box::new(BlobDescriptionPageScheduler::new(
3417                        inner_scheduler,
3418                        def_meaning.into(),
3419                    ))
3420                } else {
3421                    // User wants to decode blob into binary data
3422                    Box::new(BlobPageScheduler::new(
3423                        inner_scheduler,
3424                        page_info.priority,
3425                        page_info.num_rows,
3426                        def_meaning.into(),
3427                    ))
3428                }
3429            }
3430        })
3431    }
3432
3433    fn page_info_to_scheduler(
3434        page_info: &PageInfo,
3435        page_index: usize,
3436        decompressors: &dyn DecompressionStrategy,
3437        cache_repetition_index: bool,
3438        target_field: &Field,
3439    ) -> Result<PageInfoAndScheduler> {
3440        let page_layout = page_info.encoding.as_structural();
3441        let scheduler = Self::page_layout_to_scheduler(
3442            page_info,
3443            page_layout,
3444            decompressors,
3445            cache_repetition_index,
3446            target_field,
3447        )?;
3448        Ok(PageInfoAndScheduler {
3449            page_index,
3450            num_rows: page_info.num_rows,
3451            scheduler,
3452        })
3453    }
3454}
3455
3456pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3457    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3458}
3459
3460pub struct NoCachedPageData;
3461
3462impl DeepSizeOf for NoCachedPageData {
3463    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3464        0
3465    }
3466}
3467impl CachedPageData for NoCachedPageData {
3468    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3469        self
3470    }
3471}
3472
3473pub struct CachedFieldData {
3474    pages: Vec<Arc<dyn CachedPageData>>,
3475}
3476
3477impl DeepSizeOf for CachedFieldData {
3478    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3479        self.pages.deep_size_of_children(ctx)
3480    }
3481}
3482
3483// Cache key for field data
3484//
3485// Both `column_index` and `view_tag` are part of the key because a single
3486// physical column can be decoded under more than one shape — a blob column,
3487// for instance, materializes as a `Struct<position, size>` descriptor in one
3488// scheduler variant and as the raw `LargeBinary` bytes in another. Each
3489// variant builds different `CachedPageData` types per page, so two readers
3490// that hit the same `column_index` with different shapes used to collide and
3491// crash with a downcast failure when loading cached state.
3492#[derive(Debug, Clone)]
3493pub struct FieldDataCacheKey {
3494    pub column_index: u32,
3495    pub view_tag: String,
3496}
3497
3498impl CacheKey for FieldDataCacheKey {
3499    type ValueType = CachedFieldData;
3500
3501    fn key(&self) -> std::borrow::Cow<'_, str> {
3502        format!("{}:{}", self.column_index, self.view_tag).into()
3503    }
3504
3505    fn type_name() -> &'static str {
3506        "FieldData"
3507    }
3508}
3509
3510impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3511    fn initialize<'a>(
3512        &'a mut self,
3513        _filter: &'a FilterExpression,
3514        context: &'a SchedulerContext,
3515    ) -> BoxFuture<'a, Result<()>> {
3516        let cache_key = FieldDataCacheKey {
3517            column_index: self.column_index,
3518            view_tag: self.view_tag.clone(),
3519        };
3520        let cache = context.cache().clone();
3521
3522        async move {
3523            if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3524                self.page_schedulers
3525                    .iter_mut()
3526                    .zip(cached_data.pages.iter())
3527                    .for_each(|(page_scheduler, cached_data)| {
3528                        page_scheduler.scheduler.load(cached_data);
3529                    });
3530                return Ok(());
3531            }
3532
3533            let page_data = self
3534                .page_schedulers
3535                .iter_mut()
3536                .map(|s| s.scheduler.initialize(context.io()))
3537                .collect::<FuturesOrdered<_>>();
3538
3539            let page_data = page_data.try_collect::<Vec<_>>().await?;
3540            let cached_data = Arc::new(CachedFieldData { pages: page_data });
3541            cache.insert_with_key(&cache_key, cached_data).await;
3542            Ok(())
3543        }
3544        .boxed()
3545    }
3546
3547    fn schedule_ranges<'a>(
3548        &'a self,
3549        ranges: &[Range<u64>],
3550        _filter: &FilterExpression,
3551    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3552        let ranges = ranges.to_vec();
3553        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3554            self, ranges,
3555        )))
3556    }
3557}
3558
3559/// Takes the output from several pages decoders and
3560/// concatenates them.
3561#[derive(Debug)]
3562pub struct StructuralCompositeDecodeArrayTask {
3563    tasks: Vec<Box<dyn DecodePageTask>>,
3564    should_validate: bool,
3565    data_type: DataType,
3566}
3567
3568impl StructuralCompositeDecodeArrayTask {
3569    fn restore_validity(
3570        array: Arc<dyn Array>,
3571        unraveler: &mut CompositeRepDefUnraveler,
3572    ) -> Arc<dyn Array> {
3573        let validity = unraveler.unravel_validity(array.len());
3574        let Some(validity) = validity else {
3575            return array;
3576        };
3577        if array.data_type() == &DataType::Null {
3578            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3579            return array;
3580        }
3581        assert_eq!(validity.len(), array.len());
3582        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3583        // adding null buffers to the array here
3584        make_array(unsafe {
3585            array
3586                .to_data()
3587                .into_builder()
3588                .nulls(Some(validity))
3589                .build_unchecked()
3590        })
3591    }
3592}
3593
3594impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3595    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3596        let mut arrays = Vec::with_capacity(self.tasks.len());
3597        let mut unravelers = Vec::with_capacity(self.tasks.len());
3598        let mut data_size = 0u64;
3599        for task in self.tasks {
3600            let decoded = task.decode()?;
3601            data_size += decoded.data.data_size();
3602            unravelers.push(decoded.repdef);
3603
3604            let array = make_array(
3605                decoded
3606                    .data
3607                    .into_arrow(self.data_type.clone(), self.should_validate)?,
3608            );
3609
3610            arrays.push(array);
3611        }
3612        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3613        let array = arrow_select::concat::concat(&array_refs)?;
3614        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3615
3616        let array = Self::restore_validity(array, &mut repdef);
3617
3618        Ok(DecodedArray {
3619            array,
3620            repdef,
3621            data_size,
3622        })
3623    }
3624}
3625
3626#[derive(Debug)]
3627pub struct StructuralPrimitiveFieldDecoder {
3628    field: Arc<ArrowField>,
3629    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3630    should_validate: bool,
3631    rows_drained_in_current: u64,
3632}
3633
3634impl StructuralPrimitiveFieldDecoder {
3635    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3636        Self {
3637            field: field.clone(),
3638            page_decoders: VecDeque::new(),
3639            should_validate,
3640            rows_drained_in_current: 0,
3641        }
3642    }
3643}
3644
3645impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3646    fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3647        assert!(child.path.is_empty());
3648        self.page_decoders.push_back(child.decoder);
3649        Ok(())
3650    }
3651
3652    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3653        let mut remaining = num_rows;
3654        let mut tasks = Vec::new();
3655        while remaining > 0 {
3656            let cur_page = self.page_decoders.front_mut().unwrap();
3657            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3658            let to_take = num_in_page.min(remaining);
3659
3660            let task = cur_page.drain(to_take)?;
3661            tasks.push(task);
3662
3663            if to_take == num_in_page {
3664                self.page_decoders.pop_front();
3665                self.rows_drained_in_current = 0;
3666            } else {
3667                self.rows_drained_in_current += to_take;
3668            }
3669
3670            remaining -= to_take;
3671        }
3672        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3673            tasks,
3674            should_validate: self.should_validate,
3675            data_type: self.field.data_type().clone(),
3676        }))
3677    }
3678
3679    fn data_type(&self) -> &DataType {
3680        self.field.data_type()
3681    }
3682}
3683
3684/// The serialized representation of full-zip data
3685struct SerializedFullZip {
3686    /// The zipped values buffer
3687    values: LanceBuffer,
3688    /// The repetition index (only present if there is repetition)
3689    repetition_index: Option<LanceBuffer>,
3690}
3691
3692// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3693// to allow us to store a chunk size in 12 bits.
3694//
3695// If we directly record the size in bytes with 12 bits we would be limited to
3696// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3697// words we can store the # of words instead which gives us 32KiB.  We want
3698// at least 24KiB so we can handle even the worst case of
3699// - 4Ki values compressed into an 8186 byte buffer
3700// - 4 bytes to describe rep & def lengths
3701// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3702//   plan for it)
3703//
3704// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3705// values like offsets to be stored in a mini-block and safely read back out.  It also
3706// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3707// needed).
3708//
3709// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3710// the page" and "aligned to 8 bytes from the start of the file."
3711const MINIBLOCK_ALIGNMENT: usize = 8;
3712
3713/// An encoder for primitive (leaf) arrays
3714///
3715/// This encoder is fairly complicated and follows a number of paths depending
3716/// on the data.
3717///
3718/// First, we convert the validity & offsets information into repetition and
3719/// definition levels.  Then we compress the data itself into a single buffer.
3720///
3721/// If the data is narrow then we encode the data in small chunks (each chunk
3722/// should be a few disk sectors and contains a buffer of repetition, a buffer
3723/// of definition, and a buffer of value data).  This approach is called
3724/// "mini-block".  These mini-blocks are stored into a single data buffer.
3725///
3726/// If the data is wide then we zip together the repetition and definition value
3727/// with the value data into a single buffer.  This approach is called "zipped".
3728///
3729/// If there is any repetition information then we create a repetition index
3730///
3731/// In addition, the compression process may create zero or more metadata buffers.
3732/// For example, a dictionary compression will create dictionary metadata.  Any
3733/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3734/// stored in a separate buffer on disk and read at initialization time.
3735///
3736/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3737/// at (roughly) the end of the file so there is, at most, one read per column of
3738/// metadata per file.
3739pub struct PrimitiveStructuralEncoder {
3740    // Accumulates arrays until we have enough data to justify a disk page
3741    accumulation_queue: AccumulationQueue,
3742
3743    keep_original_array: bool,
3744    support_large_chunk: bool,
3745    accumulated_repdefs: Vec<RepDefBuilder>,
3746    // The compression strategy we will use to compress the data
3747    compression_strategy: Arc<dyn CompressionStrategy>,
3748    column_index: u32,
3749    field: Field,
3750    encoding_metadata: Arc<HashMap<String, String>>,
3751    version: LanceFileVersion,
3752}
3753
3754struct CompressedLevelsChunk {
3755    data: LanceBuffer,
3756    num_levels: u16,
3757}
3758
3759struct CompressedLevels {
3760    data: Vec<CompressedLevelsChunk>,
3761    compression: CompressiveEncoding,
3762    rep_index: Option<LanceBuffer>,
3763}
3764
3765struct SerializedMiniBlockPage {
3766    num_buffers: u64,
3767    data: LanceBuffer,
3768    metadata: LanceBuffer,
3769}
3770
3771#[derive(Debug, Clone, Copy)]
3772struct DictEncodingBudget {
3773    max_dict_entries: u32,
3774    max_encoded_size: usize,
3775}
3776
3777impl PrimitiveStructuralEncoder {
3778    pub fn try_new(
3779        options: &EncodingOptions,
3780        compression_strategy: Arc<dyn CompressionStrategy>,
3781        column_index: u32,
3782        field: Field,
3783        encoding_metadata: Arc<HashMap<String, String>>,
3784    ) -> Result<Self> {
3785        Ok(Self {
3786            accumulation_queue: AccumulationQueue::new(
3787                options.cache_bytes_per_column,
3788                column_index,
3789                options.keep_original_array,
3790            ),
3791            support_large_chunk: options.support_large_chunk(),
3792            keep_original_array: options.keep_original_array,
3793            accumulated_repdefs: Vec::new(),
3794            column_index,
3795            compression_strategy,
3796            field,
3797            encoding_metadata,
3798            version: options.version,
3799        })
3800    }
3801
3802    // TODO: This is a heuristic we may need to tune at some point
3803    //
3804    // As data gets narrow then the "zipping" process gets too expensive
3805    //   and we prefer mini-block
3806    // As data gets wide then the # of values per block shrinks (very wide)
3807    //   data doesn't even fit in a mini-block and the block overhead gets
3808    //   too large and we prefer zipped.
3809    fn is_narrow(data_block: &DataBlock) -> bool {
3810        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3811
3812        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3813            let max_len_array = max_len_array
3814                .as_any()
3815                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3816                .unwrap();
3817            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3818                return true;
3819            }
3820        }
3821        false
3822    }
3823
3824    fn prefers_miniblock(
3825        data_block: &DataBlock,
3826        encoding_metadata: &HashMap<String, String>,
3827    ) -> bool {
3828        // If the user specifically requested miniblock then use it
3829        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3830            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3831        }
3832        // Otherwise only use miniblock if it is narrow
3833        Self::is_narrow(data_block)
3834    }
3835
3836    /// Checks if the rep/def levels are too sparse for miniblock encoding.
3837    ///
3838    /// Miniblock chunks are limited to ~32KiB total. Data can use up to ~16KiB,
3839    /// leaving ~16KiB for both rep and def buffers combined. Each chunk has at most
3840    /// MAX_MINIBLOCK_VALUES (4096) data values, but when data has many empty/null
3841    /// lists, the number of rep/def levels can far exceed the number of data values
3842    /// (each empty list adds a level entry with no corresponding data value).
3843    ///
3844    /// We estimate the compressed bits per level by computing the max value in each
3845    /// buffer and taking ceil(log2(max_val + 1)) — the minimum bits needed to
3846    /// bitpack each level. We then calculate the maximum number of levels that fit
3847    /// in 16KiB and compare against the actual levels-to-values ratio.
3848    fn repdef_too_sparse_for_miniblock(
3849        repdef: &crate::repdef::SerializedRepDefs,
3850        num_values: u64,
3851    ) -> bool {
3852        if num_values == 0 {
3853            return false;
3854        }
3855        let num_levels = repdef
3856            .repetition_levels
3857            .as_ref()
3858            .map(|r| r.len() as u64)
3859            .max(repdef.definition_levels.as_ref().map(|d| d.len() as u64))
3860            .unwrap_or(0);
3861        if num_levels == 0 {
3862            return false;
3863        }
3864
3865        // Compute bits needed per level for each buffer (ceil of log2(max+1))
3866        let bits_per_rep = repdef
3867            .repetition_levels
3868            .as_ref()
3869            .and_then(|r| r.iter().max().copied())
3870            .map(|max_val| u16::BITS - max_val.leading_zeros())
3871            .unwrap_or(0) as u64;
3872        let bits_per_def = repdef
3873            .definition_levels
3874            .as_ref()
3875            .and_then(|d| d.iter().max().copied())
3876            .map(|max_val| u16::BITS - max_val.leading_zeros())
3877            .unwrap_or(0) as u64;
3878
3879        let bits_per_level = bits_per_rep + bits_per_def;
3880        if bits_per_level == 0 {
3881            return false;
3882        }
3883
3884        // 16KiB budget for rep+def combined (half the ~32KiB chunk limit)
3885        const REPDEF_BUDGET_BITS: u64 = 16 * 1024 * 8;
3886        let max_levels_per_chunk = REPDEF_BUDGET_BITS / bits_per_level;
3887
3888        // A chunk has at most MAX_MINIBLOCK_VALUES data values. The levels-to-values
3889        // ratio tells us how many levels a chunk of that size would need.
3890        let levels_per_chunk =
3891            (num_levels as f64 / num_values as f64) * *miniblock::MAX_MINIBLOCK_VALUES as f64;
3892
3893        levels_per_chunk > max_levels_per_chunk as f64
3894    }
3895
3896    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3897        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3898        // user specifically requested not to use it (in which case we're probably going
3899        // to emit an error)
3900        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3901            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3902        }
3903        true
3904    }
3905
3906    // Converts value data, repetition levels, and definition levels into a single
3907    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3908    // which tells us the size of each block.  Finally, if repetition is present then
3909    // we also create a buffer for the repetition index.
3910    //
3911    // Each chunk is serialized as:
3912    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3913    //
3914    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3915    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3916    //
3917    // Each block has a u16 word of metadata.  The upper 12 bits contain the
3918    // # of 8-byte words in the block (if the block does not fill the final word
3919    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3920    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3921    // 0xA)  All blocks except the last must have power-of-two number of values.
3922    // This not only makes metadata smaller but it makes decoding easier since
3923    // batch sizes are typically a power of 2.  4 bits would allow us to express
3924    // up to 16Ki values but we restrict this further to 4Ki values.
3925    //
3926    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3927    //
3928    // All metadata words are serialized (as little endian) into a single buffer
3929    // of metadata values.
3930    //
3931    // If there is repetition then we also create a repetition index.  This is a
3932    // single buffer of integer vectors (stored in row major order).  There is one
3933    // entry for each chunk.  The size of the vector is based on the depth of random
3934    // access we want to support.
3935    //
3936    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3937    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3938    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3939    // levels of nested access and so on.
3940    //
3941    // The first number in the vector is the number of top-level rows that complete in
3942    // the chunk.  The second number is the number of second-level rows that complete
3943    // after the final top-level row completed (or beginning of the chunk if no top-level
3944    // row completes in the chunk).  And so on.  The final number in the vector is always
3945    // the number of leftover items not covered by earlier entries in the vector.
3946    //
3947    // Currently we are limited to 0 levels of nested access but that will change in the
3948    // future.
3949    //
3950    // The repetition index and the chunk metadata are read at initialization time and
3951    // cached in memory.
3952    fn serialize_miniblocks(
3953        miniblocks: MiniBlockCompressed,
3954        rep: Option<Vec<CompressedLevelsChunk>>,
3955        def: Option<Vec<CompressedLevelsChunk>>,
3956        support_large_chunk: bool,
3957    ) -> Result<SerializedMiniBlockPage> {
3958        let bytes_rep = rep
3959            .as_ref()
3960            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3961            .unwrap_or(0);
3962        let bytes_def = def
3963            .as_ref()
3964            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3965            .unwrap_or(0);
3966        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3967        let mut num_buffers = miniblocks.data.len();
3968        if rep.is_some() {
3969            num_buffers += 1;
3970        }
3971        if def.is_some() {
3972            num_buffers += 1;
3973        }
3974        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3975        let max_extra = 9 * num_buffers;
3976        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3977        let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3978        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3979
3980        let mut rep_iter = rep.map(|r| r.into_iter());
3981        let mut def_iter = def.map(|d| d.into_iter());
3982
3983        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3984        for chunk in miniblocks.chunks {
3985            let start_pos = data_buffer.len();
3986            // Start of chunk should be aligned
3987            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3988
3989            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3990            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3991
3992            // Write the number of levels, or 0 if there is no rep/def
3993            let num_levels = rep
3994                .as_ref()
3995                .map(|r| r.num_levels)
3996                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3997            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3998
3999            // Write the buffer lengths
4000            if let Some(rep) = rep.as_ref() {
4001                let bytes_rep = u16::try_from(rep.data.len()).map_err(|_| {
4002                    Error::internal(format!(
4003                        "Repetition buffer size ({} bytes) too large",
4004                        rep.data.len()
4005                    ))
4006                })?;
4007                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
4008            }
4009            if let Some(def) = def.as_ref() {
4010                let bytes_def = u16::try_from(def.data.len()).map_err(|_| {
4011                    Error::internal(format!(
4012                        "Definition buffer size ({} bytes) too large",
4013                        def.data.len()
4014                    ))
4015                })?;
4016                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
4017            }
4018
4019            if support_large_chunk {
4020                for &buffer_size in &chunk.buffer_sizes {
4021                    data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
4022                }
4023            } else {
4024                for &buffer_size in &chunk.buffer_sizes {
4025                    data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
4026                }
4027            }
4028
4029            // Pad
4030            let add_padding = |data_buffer: &mut Vec<u8>| {
4031                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
4032                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
4033            };
4034            add_padding(&mut data_buffer);
4035
4036            // Write the buffers themselves
4037            if let Some(rep) = rep.as_ref() {
4038                data_buffer.extend_from_slice(&rep.data);
4039                add_padding(&mut data_buffer);
4040            }
4041            if let Some(def) = def.as_ref() {
4042                data_buffer.extend_from_slice(&def.data);
4043                add_padding(&mut data_buffer);
4044            }
4045            for (buffer_size, (buffer, buffer_offset)) in chunk
4046                .buffer_sizes
4047                .iter()
4048                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
4049            {
4050                let start = *buffer_offset;
4051                let end = start + *buffer_size as usize;
4052                *buffer_offset += *buffer_size as usize;
4053                data_buffer.extend_from_slice(&buffer[start..end]);
4054                add_padding(&mut data_buffer);
4055            }
4056
4057            let chunk_bytes = data_buffer.len() - start_pos;
4058            let max_chunk_size = if support_large_chunk {
4059                4 * 1024 * 1024 * 1024 // 4GB limit with u32 metadata
4060            } else {
4061                32 * 1024 // 32KiB limit with u16 metadata
4062            };
4063            assert!(chunk_bytes <= max_chunk_size);
4064            assert!(chunk_bytes > 0);
4065            assert_eq!(chunk_bytes % 8, 0);
4066            // 4Ki values max
4067            assert!(chunk.log_num_values <= 12);
4068            // We subtract 1 here from chunk_bytes because we want to be able to express
4069            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
4070            // 0xFFF
4071            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
4072            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
4073
4074            let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
4075            if support_large_chunk {
4076                meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
4077            } else {
4078                meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
4079            }
4080        }
4081
4082        let data_buffer = LanceBuffer::from(data_buffer);
4083        let metadata_buffer = LanceBuffer::from(meta_buffer);
4084
4085        Ok(SerializedMiniBlockPage {
4086            num_buffers: miniblocks.data.len() as u64,
4087            data: data_buffer,
4088            metadata: metadata_buffer,
4089        })
4090    }
4091
4092    /// Compresses a buffer of levels into chunks
4093    ///
4094    /// If these are repetition levels then we also calculate the repetition index here (that
4095    /// is the third return value)
4096    fn compress_levels(
4097        mut levels: RepDefSlicer<'_>,
4098        num_elements: u64,
4099        compression_strategy: &dyn CompressionStrategy,
4100        chunks: &[MiniBlockChunk],
4101        // This will be 0 if we are compressing def levels
4102        max_rep: u16,
4103    ) -> Result<CompressedLevels> {
4104        let mut rep_index = if max_rep > 0 {
4105            Vec::with_capacity(chunks.len())
4106        } else {
4107            vec![]
4108        };
4109        // Make the levels into a FixedWidth data block
4110        let num_levels = levels.num_levels() as u64;
4111        let levels_buf = levels.all_levels().clone();
4112
4113        let mut fixed_width_block = FixedWidthDataBlock {
4114            data: levels_buf,
4115            bits_per_value: 16,
4116            num_values: num_levels,
4117            block_info: BlockInfo::new(),
4118        };
4119        // Compute statistics to enable optimal compression for rep/def levels
4120        fixed_width_block.compute_stat();
4121
4122        let levels_block = DataBlock::FixedWidth(fixed_width_block);
4123        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4124        // Pick a block compressor
4125        let (compressor, compressor_desc) =
4126            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4127        // Compress blocks of levels (sized according to the chunks)
4128        let mut level_chunks = Vec::with_capacity(chunks.len());
4129        let mut values_counter = 0;
4130        for (chunk_idx, chunk) in chunks.iter().enumerate() {
4131            let chunk_num_values = chunk.num_values(values_counter, num_elements);
4132            debug_assert!(chunk_num_values > 0);
4133            values_counter += chunk_num_values;
4134            let chunk_levels = if chunk_idx < chunks.len() - 1 {
4135                levels.slice_next(chunk_num_values as usize)
4136            } else {
4137                levels.slice_rest()
4138            };
4139            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
4140            if max_rep > 0 {
4141                // If max_rep > 0 then we are working with rep levels and we need
4142                // to calculate the repetition index.  The repetition index for a
4143                // chunk is currently 2 values (in the future it may be more).
4144                //
4145                // The first value is the number of rows that _finish_ in the
4146                // chunk.
4147                //
4148                // The second value is the number of "leftovers" after the last
4149                // finished row in the chunk.
4150                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
4151                let rep_values = rep_values.as_ref();
4152
4153                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
4154                // will count it in the previous chunk)
4155                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
4156                let num_leftovers = if chunk_idx < chunks.len() - 1 {
4157                    rep_values
4158                        .iter()
4159                        .rev()
4160                        .position(|v| *v == max_rep)
4161                        // # of leftovers includes the max_rep spot
4162                        .map(|pos| pos + 1)
4163                        .unwrap_or(rep_values.len())
4164                } else {
4165                    // Last chunk can't have leftovers
4166                    0
4167                };
4168
4169                if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
4170                    // This chunk starts with a new row and so, if we thought we had leftovers
4171                    // in the previous chunk, we were mistaken
4172                    // TODO: Can use unchecked here
4173                    let rep_len = rep_index.len();
4174                    if rep_index[rep_len - 1] != 0 {
4175                        // We thought we had leftovers but that was actually a full row
4176                        rep_index[rep_len - 2] += 1;
4177                        rep_index[rep_len - 1] = 0;
4178                    }
4179                }
4180
4181                if chunk_idx == chunks.len() - 1 {
4182                    // The final list
4183                    num_rows += 1;
4184                }
4185                rep_index.push(num_rows as u64);
4186                rep_index.push(num_leftovers as u64);
4187            }
4188            let mut chunk_fixed_width = FixedWidthDataBlock {
4189                data: chunk_levels,
4190                bits_per_value: 16,
4191                num_values: num_chunk_levels,
4192                block_info: BlockInfo::new(),
4193            };
4194            chunk_fixed_width.compute_stat();
4195            let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
4196            let compressed_levels = compressor.compress(chunk_levels_block)?;
4197            level_chunks.push(CompressedLevelsChunk {
4198                data: compressed_levels,
4199                num_levels: num_chunk_levels as u16,
4200            });
4201        }
4202        debug_assert_eq!(levels.num_levels_remaining(), 0);
4203        let rep_index = if rep_index.is_empty() {
4204            None
4205        } else {
4206            Some(LanceBuffer::reinterpret_vec(rep_index))
4207        };
4208        Ok(CompressedLevels {
4209            data: level_chunks,
4210            compression: compressor_desc,
4211            rep_index,
4212        })
4213    }
4214
4215    fn encode_simple_all_null(
4216        column_idx: u32,
4217        num_rows: u64,
4218        row_number: u64,
4219    ) -> Result<EncodedPage> {
4220        let description =
4221            ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
4222        Ok(EncodedPage {
4223            column_idx,
4224            data: vec![],
4225            description: PageEncoding::Structural(description),
4226            num_rows,
4227            row_number,
4228        })
4229    }
4230
4231    fn encode_complex_all_null_vals(
4232        data: &Arc<[u16]>,
4233        compression_strategy: &dyn CompressionStrategy,
4234    ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
4235        let buffer = LanceBuffer::reinterpret_slice(data.clone());
4236        let mut fixed_width_block = FixedWidthDataBlock {
4237            data: buffer,
4238            bits_per_value: 16,
4239            num_values: data.len() as u64,
4240            block_info: BlockInfo::new(),
4241        };
4242        fixed_width_block.compute_stat();
4243
4244        let levels_block = DataBlock::FixedWidth(fixed_width_block);
4245        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4246        let (compressor, encoding) =
4247            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4248        let compressed_buffer = compressor.compress(levels_block)?;
4249        Ok((compressed_buffer, encoding))
4250    }
4251
4252    // Encodes a page where all values are null but we have rep/def
4253    // information that we need to store (e.g. to distinguish between
4254    // different kinds of null)
4255    fn encode_complex_all_null(
4256        column_idx: u32,
4257        repdef: crate::repdef::SerializedRepDefs,
4258        row_number: u64,
4259        num_rows: u64,
4260        version: LanceFileVersion,
4261        compression_strategy: &dyn CompressionStrategy,
4262    ) -> Result<EncodedPage> {
4263        if version.resolve() < LanceFileVersion::V2_2 {
4264            let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
4265                LanceBuffer::reinterpret_slice(rep.clone())
4266            } else {
4267                LanceBuffer::empty()
4268            };
4269
4270            let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
4271                LanceBuffer::reinterpret_slice(def.clone())
4272            } else {
4273                LanceBuffer::empty()
4274            };
4275
4276            let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
4277            return Ok(EncodedPage {
4278                column_idx,
4279                data: vec![rep_bytes, def_bytes],
4280                description: PageEncoding::Structural(description),
4281                num_rows,
4282                row_number,
4283            });
4284        }
4285
4286        let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
4287            repdef.repetition_levels.as_ref()
4288        {
4289            let num_values = rep.len() as u64;
4290            let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
4291            (buffer, Some(encoding), num_values)
4292        } else {
4293            (LanceBuffer::empty(), None, 0)
4294        };
4295
4296        let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
4297            repdef.definition_levels.as_ref()
4298        {
4299            let num_values = def.len() as u64;
4300            let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
4301            (buffer, Some(encoding), num_values)
4302        } else {
4303            (LanceBuffer::empty(), None, 0)
4304        };
4305
4306        let description = ProtobufUtils21::compressed_all_null_constant_layout(
4307            &repdef.def_meaning,
4308            rep_encoding,
4309            def_encoding,
4310            num_rep_values,
4311            num_def_values,
4312        );
4313        Ok(EncodedPage {
4314            column_idx,
4315            data: vec![rep_bytes, def_bytes],
4316            description: PageEncoding::Structural(description),
4317            num_rows,
4318            row_number,
4319        })
4320    }
4321
4322    fn leaf_validity(
4323        repdef: &crate::repdef::SerializedRepDefs,
4324        num_values: usize,
4325    ) -> Result<Option<BooleanBuffer>> {
4326        let rep = repdef
4327            .repetition_levels
4328            .as_ref()
4329            .map(|rep| rep.as_ref().to_vec());
4330        let def = repdef
4331            .definition_levels
4332            .as_ref()
4333            .map(|def| def.as_ref().to_vec());
4334        let mut unraveler = RepDefUnraveler::new(
4335            rep,
4336            def,
4337            repdef.def_meaning.clone().into(),
4338            num_values as u64,
4339        );
4340        if unraveler.is_all_valid() {
4341            return Ok(None);
4342        }
4343        let mut validity = BooleanBufferBuilder::new(num_values);
4344        unraveler.unravel_validity(&mut validity);
4345        Ok(Some(validity.finish()))
4346    }
4347
4348    fn is_constant_values(
4349        arrays: &[ArrayRef],
4350        scalar: &ArrayRef,
4351        validity: Option<&BooleanBuffer>,
4352    ) -> Result<bool> {
4353        debug_assert_eq!(scalar.len(), 1);
4354        debug_assert_eq!(scalar.null_count(), 0);
4355
4356        match scalar.data_type() {
4357            DataType::Boolean => {
4358                let mut global_idx = 0usize;
4359                let scalar_val = scalar.as_boolean().value(0);
4360                for arr in arrays {
4361                    let bool_arr = arr.as_boolean();
4362                    for i in 0..arr.len() {
4363                        let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4364                        global_idx += 1;
4365                        if !is_valid {
4366                            continue;
4367                        }
4368                        if bool_arr.value(i) != scalar_val {
4369                            return Ok(false);
4370                        }
4371                    }
4372                }
4373                Ok(true)
4374            }
4375            DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
4376            DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
4377            DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
4378            DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
4379            data_type => {
4380                let mut global_idx = 0usize;
4381                let Some(byte_width) = data_type.byte_width_opt() else {
4382                    return Ok(false);
4383                };
4384                let scalar_data = scalar.to_data();
4385                if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
4386                    return Ok(false);
4387                }
4388                let scalar_bytes = scalar_data.buffers()[0].as_slice();
4389                if scalar_bytes.len() != byte_width {
4390                    return Ok(false);
4391                }
4392
4393                for arr in arrays {
4394                    let data = arr.to_data();
4395                    if data.buffers().is_empty() {
4396                        return Ok(false);
4397                    }
4398                    let buf = data.buffers()[0].as_slice();
4399                    let base = data.offset();
4400                    for i in 0..arr.len() {
4401                        let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4402                        global_idx += 1;
4403                        if !is_valid {
4404                            continue;
4405                        }
4406                        let start = (base + i) * byte_width;
4407                        if buf[start..start + byte_width] != scalar_bytes[..] {
4408                            return Ok(false);
4409                        }
4410                    }
4411                }
4412                Ok(true)
4413            }
4414        }
4415    }
4416
4417    fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
4418        arrays: &[ArrayRef],
4419        scalar: &ArrayRef,
4420        validity: Option<&BooleanBuffer>,
4421    ) -> Result<bool> {
4422        debug_assert_eq!(scalar.len(), 1);
4423        let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
4424        let mut global_idx = 0usize;
4425        for arr in arrays {
4426            let str_arr = arr.as_string::<O>();
4427            for i in 0..arr.len() {
4428                let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4429                global_idx += 1;
4430                if !is_valid {
4431                    continue;
4432                }
4433                if str_arr.value(i).as_bytes() != scalar_val {
4434                    return Ok(false);
4435                }
4436            }
4437        }
4438        Ok(true)
4439    }
4440
4441    fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
4442        arrays: &[ArrayRef],
4443        scalar: &ArrayRef,
4444        validity: Option<&BooleanBuffer>,
4445    ) -> Result<bool> {
4446        debug_assert_eq!(scalar.len(), 1);
4447        let scalar_val = scalar.as_binary::<O>().value(0);
4448        let mut global_idx = 0usize;
4449        for arr in arrays {
4450            let bin_arr = arr.as_binary::<O>();
4451            for i in 0..arr.len() {
4452                let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4453                global_idx += 1;
4454                if !is_valid {
4455                    continue;
4456                }
4457                if bin_arr.value(i) != scalar_val {
4458                    return Ok(false);
4459                }
4460            }
4461        }
4462        Ok(true)
4463    }
4464
4465    fn find_constant_scalar(
4466        arrays: &[ArrayRef],
4467        validity: Option<&BooleanBuffer>,
4468    ) -> Result<Option<ArrayRef>> {
4469        if arrays.is_empty() {
4470            return Ok(None);
4471        }
4472
4473        let global_scalar_idx = if let Some(validity) = validity {
4474            let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
4475                return Ok(None);
4476            };
4477            idx
4478        } else {
4479            0
4480        };
4481
4482        let mut idx_remaining = global_scalar_idx;
4483        let mut scalar_arr_idx = 0usize;
4484        while scalar_arr_idx < arrays.len() {
4485            let len = arrays[scalar_arr_idx].len();
4486            if idx_remaining < len {
4487                break;
4488            }
4489            idx_remaining -= len;
4490            scalar_arr_idx += 1;
4491        }
4492
4493        if scalar_arr_idx >= arrays.len() {
4494            return Ok(None);
4495        }
4496
4497        let scalar =
4498            lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
4499        if scalar.null_count() != 0 {
4500            return Ok(None);
4501        }
4502        if !Self::is_constant_values(arrays, &scalar, validity)? {
4503            return Ok(None);
4504        }
4505        Ok(Some(scalar))
4506    }
4507
4508    fn resolve_dict_values_compression_metadata(
4509        field_metadata: &HashMap<String, String>,
4510        env_compression: Option<String>,
4511        env_compression_level: Option<String>,
4512    ) -> HashMap<String, String> {
4513        let mut metadata = HashMap::new();
4514
4515        let compression = field_metadata
4516            .get(DICT_VALUES_COMPRESSION_META_KEY)
4517            .cloned()
4518            .or(env_compression)
4519            .unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
4520        metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
4521
4522        if let Some(compression_level) = field_metadata
4523            .get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
4524            .cloned()
4525            .or(env_compression_level)
4526        {
4527            metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
4528        }
4529
4530        metadata
4531    }
4532
4533    fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
4534        // This is an internal synthetic field used only to feed metadata into
4535        // `create_block_compressor` for dictionary values. The concrete type/name here
4536        // are not semantically meaningful; we rely on explicit metadata below to control
4537        // general compression selection for dictionary values.
4538        let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
4539        dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
4540            &field.metadata,
4541            env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
4542            env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
4543        );
4544        Ok(dict_values_field)
4545    }
4546
4547    #[allow(clippy::too_many_arguments)]
4548    fn encode_miniblock(
4549        column_idx: u32,
4550        field: &Field,
4551        compression_strategy: &dyn CompressionStrategy,
4552        data: DataBlock,
4553        repdef: crate::repdef::SerializedRepDefs,
4554        row_number: u64,
4555        dictionary_data: Option<DataBlock>,
4556        num_rows: u64,
4557        support_large_chunk: bool,
4558    ) -> Result<EncodedPage> {
4559        if let DataBlock::AllNull(_null_block) = data {
4560            // We should not be using mini-block for all-null.  There are other structural
4561            // encodings for that.
4562            unreachable!()
4563        }
4564
4565        let num_items = data.num_values();
4566
4567        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
4568        let (compressed_data, value_encoding) = compressor.compress(data)?;
4569
4570        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
4571
4572        let mut compressed_rep = repdef
4573            .rep_slicer()
4574            .map(|rep_slicer| {
4575                Self::compress_levels(
4576                    rep_slicer,
4577                    num_items,
4578                    compression_strategy,
4579                    &compressed_data.chunks,
4580                    max_rep,
4581                )
4582            })
4583            .transpose()?;
4584
4585        let (rep_index, rep_index_depth) =
4586            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
4587                Some(rep_index) => (Some(rep_index.clone()), 1),
4588                None => (None, 0),
4589            };
4590
4591        let mut compressed_def = repdef
4592            .def_slicer()
4593            .map(|def_slicer| {
4594                Self::compress_levels(
4595                    def_slicer,
4596                    num_items,
4597                    compression_strategy,
4598                    &compressed_data.chunks,
4599                    /*max_rep=*/ 0,
4600                )
4601            })
4602            .transpose()?;
4603
4604        // TODO: Parquet sparsely encodes values here.  We could do the same but
4605        // then we won't have log2 values per chunk.  This means more metadata
4606        // and potentially more decoder asymmetry.  However, it may be worth
4607        // investigating at some point
4608
4609        let rep_data = compressed_rep
4610            .as_mut()
4611            .map(|cr| std::mem::take(&mut cr.data));
4612        let def_data = compressed_def
4613            .as_mut()
4614            .map(|cd| std::mem::take(&mut cd.data));
4615
4616        let serialized =
4617            Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk)?;
4618
4619        // Metadata, Data, Dictionary, (maybe) Repetition Index
4620        let mut data = Vec::with_capacity(4);
4621        data.push(serialized.metadata);
4622        data.push(serialized.data);
4623
4624        if let Some(dictionary_data) = dictionary_data {
4625            let num_dictionary_items = dictionary_data.num_values();
4626            let dict_values_field = Self::build_dict_values_compressor_field(field)?;
4627
4628            let (compressor, dictionary_encoding) = compression_strategy
4629                .create_block_compressor(&dict_values_field, &dictionary_data)?;
4630            let dictionary_buffer = compressor.compress(dictionary_data)?;
4631
4632            data.push(dictionary_buffer);
4633            if let Some(rep_index) = rep_index {
4634                data.push(rep_index);
4635            }
4636
4637            let description = ProtobufUtils21::miniblock_layout(
4638                compressed_rep.map(|cr| cr.compression),
4639                compressed_def.map(|cd| cd.compression),
4640                value_encoding,
4641                rep_index_depth,
4642                serialized.num_buffers,
4643                Some((dictionary_encoding, num_dictionary_items)),
4644                &repdef.def_meaning,
4645                num_items,
4646                support_large_chunk,
4647            );
4648            Ok(EncodedPage {
4649                num_rows,
4650                column_idx,
4651                data,
4652                description: PageEncoding::Structural(description),
4653                row_number,
4654            })
4655        } else {
4656            let description = ProtobufUtils21::miniblock_layout(
4657                compressed_rep.map(|cr| cr.compression),
4658                compressed_def.map(|cd| cd.compression),
4659                value_encoding,
4660                rep_index_depth,
4661                serialized.num_buffers,
4662                None,
4663                &repdef.def_meaning,
4664                num_items,
4665                support_large_chunk,
4666            );
4667
4668            if let Some(rep_index) = rep_index {
4669                let view = rep_index.borrow_to_typed_slice::<u64>();
4670                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4671                debug_assert_eq!(total, num_rows);
4672
4673                data.push(rep_index);
4674            }
4675
4676            Ok(EncodedPage {
4677                num_rows,
4678                column_idx,
4679                data,
4680                description: PageEncoding::Structural(description),
4681                row_number,
4682            })
4683        }
4684    }
4685
4686    // For fixed-size data we encode < control word | data > for each value
4687    fn serialize_full_zip_fixed(
4688        fixed: FixedWidthDataBlock,
4689        mut repdef: ControlWordIterator,
4690        num_values: u64,
4691    ) -> SerializedFullZip {
4692        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4693        let mut zipped_data = Vec::with_capacity(len);
4694
4695        let max_rep_index_val = if repdef.has_repetition() {
4696            len as u64
4697        } else {
4698            // Setting this to 0 means we won't write a repetition index
4699            0
4700        };
4701        let mut rep_index_builder =
4702            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4703
4704        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
4705        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
4706        assert_eq!(
4707            fixed.bits_per_value % 8,
4708            0,
4709            "Non-byte aligned full-zip compression not yet supported"
4710        );
4711
4712        let bytes_per_value = fixed.bits_per_value as usize / 8;
4713        let mut offset = 0;
4714
4715        if bytes_per_value == 0 {
4716            // No data, just dump the repdef into the buffer
4717            while let Some(control) = repdef.append_next(&mut zipped_data) {
4718                if control.is_new_row {
4719                    // We have finished a row
4720                    debug_assert!(offset <= len);
4721                    // SAFETY: We know that `start <= len`
4722                    unsafe { rep_index_builder.append(offset as u64) };
4723                }
4724                offset = zipped_data.len();
4725            }
4726        } else {
4727            // We have data, zip it with the repdef
4728            let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4729            while let Some(control) = repdef.append_next(&mut zipped_data) {
4730                if control.is_new_row {
4731                    // We have finished a row
4732                    debug_assert!(offset <= len);
4733                    // SAFETY: We know that `start <= len`
4734                    unsafe { rep_index_builder.append(offset as u64) };
4735                }
4736                if control.is_visible {
4737                    let value = data_iter.next().unwrap();
4738                    zipped_data.extend_from_slice(value);
4739                }
4740                offset = zipped_data.len();
4741            }
4742        }
4743
4744        debug_assert_eq!(zipped_data.len(), len);
4745        // Put the final value in the rep index
4746        // SAFETY: `zipped_data.len() == len`
4747        unsafe {
4748            rep_index_builder.append(zipped_data.len() as u64);
4749        }
4750
4751        let zipped_data = LanceBuffer::from(zipped_data);
4752        let rep_index = rep_index_builder.into_data();
4753        let rep_index = if rep_index.is_empty() {
4754            None
4755        } else {
4756            Some(LanceBuffer::from(rep_index))
4757        };
4758        SerializedFullZip {
4759            values: zipped_data,
4760            repetition_index: rep_index,
4761        }
4762    }
4763
4764    // For variable-size data we encode < control word | length | data > for each value
4765    //
4766    // In addition, we create a second buffer, the repetition index
4767    fn serialize_full_zip_variable(
4768        variable: VariableWidthBlock,
4769        mut repdef: ControlWordIterator,
4770        num_items: u64,
4771    ) -> SerializedFullZip {
4772        let bytes_per_offset = variable.bits_per_offset as usize / 8;
4773        assert_eq!(
4774            variable.bits_per_offset % 8,
4775            0,
4776            "Only byte-aligned offsets supported"
4777        );
4778        let len = variable.data.len()
4779            + repdef.bytes_per_word() * num_items as usize
4780            + bytes_per_offset * variable.num_values as usize;
4781        let mut buf = Vec::with_capacity(len);
4782
4783        let max_rep_index_val = len as u64;
4784        let mut rep_index_builder =
4785            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4786
4787        // TODO: byte pack the item lengths with varint encoding
4788        match bytes_per_offset {
4789            4 => {
4790                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4791                let mut rep_offset = 0;
4792                let mut windows_iter = offs.as_ref().windows(2);
4793                while let Some(control) = repdef.append_next(&mut buf) {
4794                    if control.is_new_row {
4795                        // We have finished a row
4796                        debug_assert!(rep_offset <= len);
4797                        // SAFETY: We know that `buf.len() <= len`
4798                        unsafe { rep_index_builder.append(rep_offset as u64) };
4799                    }
4800                    if control.is_visible {
4801                        let window = windows_iter.next().unwrap();
4802                        if control.is_valid_item {
4803                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4804                            buf.extend_from_slice(
4805                                &variable.data[window[0] as usize..window[1] as usize],
4806                            );
4807                        }
4808                    }
4809                    rep_offset = buf.len();
4810                }
4811            }
4812            8 => {
4813                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4814                let mut rep_offset = 0;
4815                let mut windows_iter = offs.as_ref().windows(2);
4816                while let Some(control) = repdef.append_next(&mut buf) {
4817                    if control.is_new_row {
4818                        // We have finished a row
4819                        debug_assert!(rep_offset <= len);
4820                        // SAFETY: We know that `buf.len() <= len`
4821                        unsafe { rep_index_builder.append(rep_offset as u64) };
4822                    }
4823                    if control.is_visible {
4824                        let window = windows_iter.next().unwrap();
4825                        if control.is_valid_item {
4826                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4827                            buf.extend_from_slice(
4828                                &variable.data[window[0] as usize..window[1] as usize],
4829                            );
4830                        }
4831                    }
4832                    rep_offset = buf.len();
4833                }
4834            }
4835            _ => panic!("Unsupported offset size"),
4836        }
4837
4838        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4839        // if we are over `len` then we have a bug.
4840        debug_assert!(buf.len() <= len);
4841        // Put the final value in the rep index
4842        // SAFETY: `zipped_data.len() == len`
4843        unsafe {
4844            rep_index_builder.append(buf.len() as u64);
4845        }
4846
4847        let zipped_data = LanceBuffer::from(buf);
4848        let rep_index = rep_index_builder.into_data();
4849        debug_assert!(!rep_index.is_empty());
4850        let rep_index = Some(LanceBuffer::from(rep_index));
4851        SerializedFullZip {
4852            values: zipped_data,
4853            repetition_index: rep_index,
4854        }
4855    }
4856
4857    /// Serializes data into a single buffer according to the full-zip format which zips
4858    /// together the repetition, definition, and value data into a single buffer.
4859    fn serialize_full_zip(
4860        compressed_data: PerValueDataBlock,
4861        repdef: ControlWordIterator,
4862        num_items: u64,
4863    ) -> SerializedFullZip {
4864        match compressed_data {
4865            PerValueDataBlock::Fixed(fixed) => {
4866                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4867            }
4868            PerValueDataBlock::Variable(var) => {
4869                Self::serialize_full_zip_variable(var, repdef, num_items)
4870            }
4871        }
4872    }
4873
4874    fn expand_boolean_to_bytes(fixed: FixedWidthDataBlock) -> FixedWidthDataBlock {
4875        debug_assert_eq!(fixed.bits_per_value, 1);
4876        let num_values = fixed.num_values as usize;
4877        let bool_buf = BooleanBuffer::new(fixed.data.into_buffer(), 0, num_values);
4878        let expanded: Vec<u8> = (0..num_values).map(|i| bool_buf.value(i) as u8).collect();
4879        FixedWidthDataBlock {
4880            data: LanceBuffer::from(expanded),
4881            bits_per_value: 8,
4882            num_values: fixed.num_values,
4883            block_info: BlockInfo::new(),
4884        }
4885    }
4886
4887    fn encode_full_zip(
4888        column_idx: u32,
4889        field: &Field,
4890        compression_strategy: &dyn CompressionStrategy,
4891        data: DataBlock,
4892        repdef: crate::repdef::SerializedRepDefs,
4893        row_number: u64,
4894        num_lists: u64,
4895    ) -> Result<EncodedPage> {
4896        let max_rep = repdef
4897            .repetition_levels
4898            .as_ref()
4899            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4900        let max_def = repdef
4901            .definition_levels
4902            .as_ref()
4903            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4904
4905        // To handle FSL we just flatten
4906        // let data = data.flatten();
4907
4908        let (num_items, num_visible_items) =
4909            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4910                // If there are rep levels there may be "invisible" items and we need to encode
4911                // rep_levels.len() things which might be larger than data.num_values()
4912                (rep_levels.len() as u64, data.num_values())
4913            } else {
4914                // If there are no rep levels then we encode data.num_values() things
4915                (data.num_values(), data.num_values())
4916            };
4917
4918        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4919
4920        let repdef_iter = build_control_word_iterator(
4921            repdef.repetition_levels.as_deref(),
4922            max_rep,
4923            repdef.definition_levels.as_deref(),
4924            max_def,
4925            max_visible_def,
4926            num_items as usize,
4927        );
4928        let bits_rep = repdef_iter.bits_rep();
4929        let bits_def = repdef_iter.bits_def();
4930
4931        // Full-zip requires byte-aligned values; expand 1-bit booleans to 1 byte each.
4932        let data = match data {
4933            DataBlock::FixedWidth(fixed) if fixed.bits_per_value == 1 => {
4934                DataBlock::FixedWidth(Self::expand_boolean_to_bytes(fixed))
4935            }
4936            other => other,
4937        };
4938
4939        let compressor = compression_strategy.create_per_value(field, &data)?;
4940        let (compressed_data, value_encoding) = compressor.compress(data)?;
4941
4942        let description = match &compressed_data {
4943            PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4944                bits_rep,
4945                bits_def,
4946                fixed.bits_per_value as u32,
4947                value_encoding,
4948                &repdef.def_meaning,
4949                num_items as u32,
4950                num_visible_items as u32,
4951            ),
4952            PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4953                bits_rep,
4954                bits_def,
4955                variable.bits_per_offset as u32,
4956                value_encoding,
4957                &repdef.def_meaning,
4958                num_items as u32,
4959                num_visible_items as u32,
4960            ),
4961        };
4962
4963        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4964
4965        let data = if let Some(repindex) = zipped.repetition_index {
4966            vec![zipped.values, repindex]
4967        } else {
4968            vec![zipped.values]
4969        };
4970
4971        Ok(EncodedPage {
4972            num_rows: num_lists,
4973            column_idx,
4974            data,
4975            description: PageEncoding::Structural(description),
4976            row_number,
4977        })
4978    }
4979
4980    fn should_dictionary_encode(
4981        data_block: &DataBlock,
4982        field: &Field,
4983        version: LanceFileVersion,
4984    ) -> Option<DictEncodingBudget> {
4985        const DEFAULT_SAMPLE_SIZE: usize = 4096;
4986        const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
4987
4988        // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip
4989        // estimating the size for other types.
4990        match data_block {
4991            DataBlock::FixedWidth(fixed) => {
4992                if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4993                    return None;
4994                }
4995                if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4996                    return None;
4997                }
4998                if fixed.bits_per_value % 8 != 0 {
4999                    return None;
5000                }
5001            }
5002            DataBlock::VariableWidth(var) => {
5003                if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
5004                    return None;
5005                }
5006            }
5007            _ => return None,
5008        }
5009
5010        // Don't dictionary encode tiny arrays.
5011        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
5012            .ok()
5013            .and_then(|val| val.parse().ok())
5014            .unwrap_or(100);
5015        if data_block.num_values() < too_small {
5016            return None;
5017        }
5018
5019        let num_values = data_block.num_values();
5020
5021        // Apply divisor threshold and cap. This is intentionally conservative: the goal is to
5022        // avoid spending too much CPU trying to estimate very high cardinalities.
5023        let divisor: u64 = field
5024            .metadata
5025            .get(DICT_DIVISOR_META_KEY)
5026            .and_then(|val| val.parse().ok())
5027            .or_else(|| {
5028                env::var("LANCE_ENCODING_DICT_DIVISOR")
5029                    .ok()
5030                    .and_then(|val| val.parse().ok())
5031            })
5032            .unwrap_or(DEFAULT_DICT_DIVISOR);
5033
5034        let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
5035            .ok()
5036            .and_then(|val| val.parse().ok())
5037            .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
5038
5039        let threshold_cardinality = num_values
5040            .checked_div(divisor.max(1))
5041            .unwrap_or(0)
5042            .min(max_cardinality);
5043        if threshold_cardinality == 0 {
5044            return None;
5045        }
5046
5047        // Get size ratio from metadata or env var.
5048        let threshold_ratio = field
5049            .metadata
5050            .get(DICT_SIZE_RATIO_META_KEY)
5051            .and_then(|val| val.parse::<f64>().ok())
5052            .or_else(|| {
5053                env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
5054                    .ok()
5055                    .and_then(|val| val.parse().ok())
5056            })
5057            .unwrap_or(DEFAULT_DICT_SIZE_RATIO);
5058
5059        if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
5060            panic!(
5061                "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
5062                threshold_ratio
5063            );
5064        }
5065
5066        let data_size = data_block.data_size();
5067        if data_size == 0 {
5068            return None;
5069        }
5070
5071        let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
5072        let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
5073
5074        // Avoid probing dictionary encoding on data that appears to be near-unique.
5075        if Self::sample_is_near_unique(
5076            data_block,
5077            DEFAULT_SAMPLE_SIZE,
5078            DEFAULT_SAMPLE_UNIQUE_RATIO,
5079        )? {
5080            return None;
5081        }
5082
5083        let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
5084        Some(DictEncodingBudget {
5085            max_dict_entries,
5086            max_encoded_size,
5087        })
5088    }
5089
5090    /// Probe whether a page looks near-unique before attempting dictionary encoding.
5091    ///
5092    /// The probe uses deterministic stride sampling (not RNG sampling), which keeps
5093    /// the check cheap and reproducible across runs. The result is only a gate for
5094    /// whether we try dictionary encoding, not a cardinality statistic.
5095    fn sample_is_near_unique(
5096        data_block: &DataBlock,
5097        max_samples: usize,
5098        unique_ratio_threshold: f64,
5099    ) -> Option<bool> {
5100        use std::collections::HashSet;
5101
5102        if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 {
5103            return None;
5104        }
5105
5106        let num_values = usize::try_from(data_block.num_values()).ok()?;
5107        if num_values == 0 {
5108            return Some(false);
5109        }
5110
5111        let sample_count = num_values.min(max_samples).max(1);
5112        // Uniform stride sampling across the page.
5113        let step = (num_values / sample_count).max(1);
5114
5115        match data_block {
5116            DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
5117                64 => {
5118                    let values = fixed.data.borrow_to_typed_slice::<u64>();
5119                    let values = values.as_ref();
5120                    let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5121                    for idx in (0..num_values).step_by(step).take(sample_count) {
5122                        unique.insert(values.get(idx).copied()?);
5123                    }
5124                    let ratio = unique.len() as f64 / sample_count as f64;
5125                    // Avoid overreacting to tiny pages with too few samples.
5126                    Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5127                }
5128                128 => {
5129                    let values = fixed.data.borrow_to_typed_slice::<u128>();
5130                    let values = values.as_ref();
5131                    let mut unique: HashSet<u128> = HashSet::with_capacity(sample_count.min(1024));
5132                    for idx in (0..num_values).step_by(step).take(sample_count) {
5133                        unique.insert(values.get(idx).copied()?);
5134                    }
5135                    let ratio = unique.len() as f64 / sample_count as f64;
5136                    Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5137                }
5138                _ => Some(false),
5139            },
5140            DataBlock::VariableWidth(var) => {
5141                use xxhash_rust::xxh3::xxh3_64;
5142
5143                // Hash variable-width slices instead of storing borrowed slice keys.
5144                let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5145                match var.bits_per_offset {
5146                    32 => {
5147                        let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
5148                        let offsets: &[u32] = offsets_ref.as_ref();
5149                        for i in (0..num_values).step_by(step).take(sample_count) {
5150                            let start = usize::try_from(*offsets.get(i)?).ok()?;
5151                            let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5152                            if start > end || end > var.data.len() {
5153                                return None;
5154                            }
5155                            unique.insert(xxh3_64(&var.data[start..end]));
5156                        }
5157                    }
5158                    64 => {
5159                        let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
5160                        let offsets: &[u64] = offsets_ref.as_ref();
5161                        for i in (0..num_values).step_by(step).take(sample_count) {
5162                            let start = usize::try_from(*offsets.get(i)?).ok()?;
5163                            let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5164                            if start > end || end > var.data.len() {
5165                                return None;
5166                            }
5167                            unique.insert(xxh3_64(&var.data[start..end]));
5168                        }
5169                    }
5170                    _ => return Some(false),
5171                }
5172                let ratio = unique.len() as f64 / sample_count as f64;
5173                Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5174            }
5175            _ => Some(false),
5176        }
5177    }
5178
5179    // Creates an encode task, consuming all buffered data
5180    fn do_flush(
5181        &mut self,
5182        arrays: Vec<ArrayRef>,
5183        repdefs: Vec<RepDefBuilder>,
5184        row_number: u64,
5185        num_rows: u64,
5186    ) -> Result<Vec<EncodeTask>> {
5187        let column_idx = self.column_index;
5188        let compression_strategy = self.compression_strategy.clone();
5189        let field = self.field.clone();
5190        let encoding_metadata = self.encoding_metadata.clone();
5191        let support_large_chunk = self.support_large_chunk;
5192        let version = self.version;
5193        let task = spawn_cpu(move || {
5194            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5195            let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
5196            let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
5197            let repdef = RepDefBuilder::serialize(repdefs);
5198
5199            if num_values == 0 {
5200                // We should not encode empty arrays.  So if we get here that should mean that we
5201                // either have all empty lists or all null lists (or a mix).  We still need to encode
5202                // the rep/def information but we can skip the data encoding.
5203                log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
5204                return Self::encode_complex_all_null(
5205                    column_idx,
5206                    repdef,
5207                    row_number,
5208                    num_rows,
5209                    version,
5210                    compression_strategy.as_ref(),
5211                );
5212            }
5213
5214            let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
5215            let all_null = leaf_validity
5216                .as_ref()
5217                .map(|validity| validity.count_set_bits() == 0)
5218                .unwrap_or(false);
5219
5220            if all_null {
5221                return if is_simple_validity {
5222                    log::debug!(
5223                        "Encoding column {} with {} items ({} rows) using simple-null layout",
5224                        column_idx,
5225                        num_values,
5226                        num_rows
5227                    );
5228                    Self::encode_simple_all_null(column_idx, num_values, row_number)
5229                } else {
5230                    log::debug!(
5231                        "Encoding column {} with {} items ({} rows) using complex-null layout",
5232                        column_idx,
5233                        num_values,
5234                        num_rows
5235                    );
5236                    Self::encode_complex_all_null(
5237                        column_idx,
5238                        repdef,
5239                        row_number,
5240                        num_rows,
5241                        version,
5242                        compression_strategy.as_ref(),
5243                    )
5244                };
5245            }
5246
5247            if let DataType::Struct(fields) = &field.data_type()
5248                && fields.is_empty()
5249            {
5250                if has_repdef_info {
5251                    return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported.  The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
5252                }
5253                // This is maybe a little confusing but the reader should never look at this anyways and it
5254                // seems like overkill to invent a new layout just for "empty structs".
5255                return Self::encode_simple_all_null(column_idx, num_values, row_number);
5256            }
5257
5258            let data_block = DataBlock::from_arrays(&arrays, num_values);
5259
5260            if version.resolve() >= LanceFileVersion::V2_2
5261                && let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
5262            {
5263                log::debug!(
5264                    "Encoding column {} with {} items ({} rows) using constant layout",
5265                    column_idx,
5266                    num_values,
5267                    num_rows
5268                );
5269                return constant::encode_constant_page(
5270                    column_idx,
5271                    scalar,
5272                    repdef,
5273                    row_number,
5274                    num_rows,
5275                );
5276            }
5277
5278            let requires_full_zip_packed_struct =
5279                if let DataBlock::Struct(ref struct_data_block) = data_block {
5280                    struct_data_block.has_variable_width_child()
5281                } else {
5282                    false
5283                };
5284
5285            if requires_full_zip_packed_struct {
5286                log::debug!(
5287                    "Encoding column {} with {} items using full-zip packed struct layout",
5288                    column_idx,
5289                    num_values
5290                );
5291                return Self::encode_full_zip(
5292                    column_idx,
5293                    &field,
5294                    compression_strategy.as_ref(),
5295                    data_block,
5296                    repdef,
5297                    row_number,
5298                    num_rows,
5299                );
5300            }
5301
5302            // If the rep/def levels are too sparse for miniblock (e.g. many empty
5303            // lists with very few values), fall back to fullzip to avoid exceeding
5304            // the u16 per-chunk rep/def buffer size limit.
5305            let too_sparse = Self::repdef_too_sparse_for_miniblock(&repdef, num_values);
5306
5307            if !too_sparse {
5308                if let DataBlock::Dictionary(dict) = data_block {
5309                    log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
5310                    let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
5311                    // TODO: https://github.com/lancedb/lance/issues/4809
5312                    // If we compute stats on dictionary_data_block => panic.
5313                    // If we don't compute stats on indices_data_block => panic.
5314                    // This is messy.  Don't make me call compute_stat ever.
5315                    indices_data_block.compute_stat();
5316                    return Self::encode_miniblock(
5317                        column_idx,
5318                        &field,
5319                        compression_strategy.as_ref(),
5320                        indices_data_block,
5321                        repdef,
5322                        row_number,
5323                        Some(dictionary_data_block),
5324                        num_rows,
5325                        support_large_chunk,
5326                    );
5327                }
5328            } else {
5329                log::debug!(
5330                    "Encoding column {} with {} items using full-zip layout \
5331                     (rep/def too sparse for mini-block)",
5332                    column_idx,
5333                    num_values
5334                );
5335            }
5336
5337            {
5338                // Try dictionary encoding first if applicable. If encoding aborts, fall back to the
5339                // preferred structural encoding.
5340                let dict_result = if too_sparse {
5341                    None
5342                } else {
5343                    Self::should_dictionary_encode(&data_block, &field, version)
5344                        .and_then(|budget| {
5345                            log::debug!(
5346                                "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
5347                                column_idx,
5348                                num_values
5349                            );
5350                            dict::dictionary_encode(
5351                                &data_block,
5352                                budget.max_dict_entries,
5353                                budget.max_encoded_size,
5354                            )
5355                        })
5356                };
5357
5358                if let Some((indices_data_block, dictionary_data_block)) = dict_result {
5359                    Self::encode_miniblock(
5360                        column_idx,
5361                        &field,
5362                        compression_strategy.as_ref(),
5363                        indices_data_block,
5364                        repdef,
5365                        row_number,
5366                        Some(dictionary_data_block),
5367                        num_rows,
5368                        support_large_chunk,
5369                    )
5370                } else if !too_sparse && Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
5371                    log::debug!(
5372                        "Encoding column {} with {} items using mini-block layout",
5373                        column_idx,
5374                        num_values
5375                    );
5376                    Self::encode_miniblock(
5377                        column_idx,
5378                        &field,
5379                        compression_strategy.as_ref(),
5380                        data_block,
5381                        repdef,
5382                        row_number,
5383                        None,
5384                        num_rows,
5385                        support_large_chunk,
5386                    )
5387                } else if too_sparse || Self::prefers_fullzip(encoding_metadata.as_ref()) {
5388                    log::debug!(
5389                        "Encoding column {} with {} items using full-zip layout",
5390                        column_idx,
5391                        num_values
5392                    );
5393                    Self::encode_full_zip(
5394                        column_idx,
5395                        &field,
5396                        compression_strategy.as_ref(),
5397                        data_block,
5398                        repdef,
5399                        row_number,
5400                        num_rows,
5401                    )
5402                } else {
5403                    Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
5404                }
5405            }
5406        })
5407        .boxed();
5408        Ok(vec![task])
5409    }
5410
5411    fn extract_validity_buf(
5412        array: Arc<dyn Array>,
5413        repdef: &mut RepDefBuilder,
5414        keep_original_array: bool,
5415    ) -> Result<Arc<dyn Array>> {
5416        if let Some(validity) = array.nulls() {
5417            if keep_original_array {
5418                repdef.add_validity_bitmap(validity.clone());
5419            } else {
5420                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
5421            }
5422            let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
5423            Ok(make_array(data_no_nulls))
5424        } else {
5425            repdef.add_no_null(array.len());
5426            Ok(array)
5427        }
5428    }
5429
5430    fn extract_validity(
5431        mut array: Arc<dyn Array>,
5432        repdef: &mut RepDefBuilder,
5433        keep_original_array: bool,
5434    ) -> Result<Arc<dyn Array>> {
5435        match array.data_type() {
5436            DataType::Null => {
5437                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
5438                Ok(array)
5439            }
5440            DataType::Dictionary(_, _) => {
5441                array = dict::normalize_dict_nulls(array)?;
5442                Self::extract_validity_buf(array, repdef, keep_original_array)
5443            }
5444            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
5445            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
5446            // NOT want to increase the repdef depth.
5447            //
5448            // This would be quite catasrophic for something like vector embeddings.  Imagine we
5449            // had thousands of vectors and some were null but no vector contained null items.  If
5450            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
5451            // with a rep/def value for every single item in the vector.
5452            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
5453        }
5454    }
5455}
5456
5457impl FieldEncoder for PrimitiveStructuralEncoder {
5458    // Buffers data, if there is enough to write a page then we create an encode task
5459    fn maybe_encode(
5460        &mut self,
5461        array: ArrayRef,
5462        _external_buffers: &mut OutOfLineBuffers,
5463        mut repdef: RepDefBuilder,
5464        row_number: u64,
5465        num_rows: u64,
5466    ) -> Result<Vec<EncodeTask>> {
5467        let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
5468        self.accumulated_repdefs.push(repdef);
5469
5470        if let Some((arrays, row_number, num_rows)) =
5471            self.accumulation_queue.insert(array, row_number, num_rows)
5472        {
5473            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5474            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5475        } else {
5476            Ok(vec![])
5477        }
5478    }
5479
5480    // If there is any data left in the buffer then create an encode task from it
5481    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
5482        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
5483            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5484            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5485        } else {
5486            Ok(vec![])
5487        }
5488    }
5489
5490    fn num_columns(&self) -> u32 {
5491        1
5492    }
5493
5494    fn finish(
5495        &mut self,
5496        _external_buffers: &mut OutOfLineBuffers,
5497    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
5498        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
5499    }
5500}
5501
5502#[cfg(test)]
5503#[allow(clippy::single_range_in_vec_init)]
5504mod tests {
5505    use super::{
5506        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
5507        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
5508        FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
5509        PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
5510    };
5511    use crate::buffer::LanceBuffer;
5512    use crate::compression::DefaultDecompressionStrategy;
5513    use crate::constants::{
5514        COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
5515        DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
5516        STRUCTURAL_ENCODING_MINIBLOCK,
5517    };
5518    use crate::data::BlockInfo;
5519    use crate::decoder::PageEncoding;
5520    use crate::encodings::logical::primitive::{
5521        ChunkDrainInstructions, PrimitiveStructuralEncoder,
5522    };
5523    use crate::format::ProtobufUtils21;
5524    use crate::format::pb21;
5525    use crate::format::pb21::compressive_encoding::Compression;
5526    use crate::testing::{TestCases, check_round_trip_encoding_of_data};
5527    use crate::version::LanceFileVersion;
5528    use arrow_array::{ArrayRef, Int8Array, StringArray};
5529    use arrow_schema::DataType;
5530    use std::collections::HashMap;
5531    use std::{collections::VecDeque, sync::Arc};
5532
5533    #[test]
5534    fn test_is_narrow() {
5535        let int8_array = Int8Array::from(vec![1, 2, 3]);
5536        let array_ref: ArrayRef = Arc::new(int8_array);
5537        let block = DataBlock::from_array(array_ref);
5538
5539        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5540
5541        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
5542        let block = DataBlock::from_array(string_array);
5543        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5544
5545        let string_array = StringArray::from(vec![
5546            Some("hello world".repeat(100)),
5547            Some("world".to_string()),
5548        ]);
5549        let block = DataBlock::from_array(string_array);
5550        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
5551    }
5552
5553    #[test]
5554    fn test_map_range() {
5555        // Null in the middle
5556        // [[A, B, C], [D, E], NULL, [F, G, H]]
5557        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
5558        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
5559        let max_visible_def = 0;
5560        let total_items = 8;
5561        let max_rep = 1;
5562
5563        let check = |range, expected_item_range, expected_level_range| {
5564            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5565                range,
5566                rep.as_ref(),
5567                def.as_ref(),
5568                max_rep,
5569                max_visible_def,
5570                total_items,
5571                PreambleAction::Absent,
5572            );
5573            assert_eq!(item_range, expected_item_range);
5574            assert_eq!(level_range, expected_level_range);
5575        };
5576
5577        check(0..1, 0..3, 0..3);
5578        check(1..2, 3..5, 3..5);
5579        check(2..3, 5..5, 5..6);
5580        check(3..4, 5..8, 6..9);
5581        check(0..2, 0..5, 0..5);
5582        check(1..3, 3..5, 3..6);
5583        check(2..4, 5..8, 5..9);
5584        check(0..3, 0..5, 0..6);
5585        check(1..4, 3..8, 3..9);
5586        check(0..4, 0..8, 0..9);
5587
5588        // Null at start
5589        // [NULL, [A, B], [C]]
5590        let rep = Some(vec![1, 1, 0, 1]);
5591        let def = Some(vec![1, 0, 0, 0]);
5592        let max_visible_def = 0;
5593        let total_items = 3;
5594
5595        let check = |range, expected_item_range, expected_level_range| {
5596            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5597                range,
5598                rep.as_ref(),
5599                def.as_ref(),
5600                max_rep,
5601                max_visible_def,
5602                total_items,
5603                PreambleAction::Absent,
5604            );
5605            assert_eq!(item_range, expected_item_range);
5606            assert_eq!(level_range, expected_level_range);
5607        };
5608
5609        check(0..1, 0..0, 0..1);
5610        check(1..2, 0..2, 1..3);
5611        check(2..3, 2..3, 3..4);
5612        check(0..2, 0..2, 0..3);
5613        check(1..3, 0..3, 1..4);
5614        check(0..3, 0..3, 0..4);
5615
5616        // Null at end
5617        // [[A], [B, C], NULL]
5618        let rep = Some(vec![1, 1, 0, 1]);
5619        let def = Some(vec![0, 0, 0, 1]);
5620        let max_visible_def = 0;
5621        let total_items = 3;
5622
5623        let check = |range, expected_item_range, expected_level_range| {
5624            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5625                range,
5626                rep.as_ref(),
5627                def.as_ref(),
5628                max_rep,
5629                max_visible_def,
5630                total_items,
5631                PreambleAction::Absent,
5632            );
5633            assert_eq!(item_range, expected_item_range);
5634            assert_eq!(level_range, expected_level_range);
5635        };
5636
5637        check(0..1, 0..1, 0..1);
5638        check(1..2, 1..3, 1..3);
5639        check(2..3, 3..3, 3..4);
5640        check(0..2, 0..3, 0..3);
5641        check(1..3, 1..3, 1..4);
5642        check(0..3, 0..3, 0..4);
5643
5644        // No nulls, with repetition
5645        // [[A, B], [C, D], [E, F]]
5646        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
5647        let def: Option<&[u16]> = None;
5648        let max_visible_def = 0;
5649        let total_items = 6;
5650
5651        let check = |range, expected_item_range, expected_level_range| {
5652            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5653                range,
5654                rep.as_ref(),
5655                def.as_ref(),
5656                max_rep,
5657                max_visible_def,
5658                total_items,
5659                PreambleAction::Absent,
5660            );
5661            assert_eq!(item_range, expected_item_range);
5662            assert_eq!(level_range, expected_level_range);
5663        };
5664
5665        check(0..1, 0..2, 0..2);
5666        check(1..2, 2..4, 2..4);
5667        check(2..3, 4..6, 4..6);
5668        check(0..2, 0..4, 0..4);
5669        check(1..3, 2..6, 2..6);
5670        check(0..3, 0..6, 0..6);
5671
5672        // No repetition, with nulls (this case is trivial)
5673        // [A, B, NULL, C]
5674        let rep: Option<&[u16]> = None;
5675        let def = Some(vec![0, 0, 1, 0]);
5676        let max_visible_def = 1;
5677        let total_items = 4;
5678
5679        let check = |range, expected_item_range, expected_level_range| {
5680            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5681                range,
5682                rep.as_ref(),
5683                def.as_ref(),
5684                max_rep,
5685                max_visible_def,
5686                total_items,
5687                PreambleAction::Absent,
5688            );
5689            assert_eq!(item_range, expected_item_range);
5690            assert_eq!(level_range, expected_level_range);
5691        };
5692
5693        check(0..1, 0..1, 0..1);
5694        check(1..2, 1..2, 1..2);
5695        check(2..3, 2..3, 2..3);
5696        check(0..2, 0..2, 0..2);
5697        check(1..3, 1..3, 1..3);
5698        check(0..3, 0..3, 0..3);
5699
5700        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
5701        // [[..., A] [B, C], NULL]
5702        //
5703        // What we do will depend on the preamble action
5704        let rep = Some(vec![0, 1, 0, 1]);
5705        let def = Some(vec![0, 0, 0, 1]);
5706        let max_visible_def = 0;
5707        let total_items = 3;
5708
5709        let check = |range, expected_item_range, expected_level_range| {
5710            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5711                range,
5712                rep.as_ref(),
5713                def.as_ref(),
5714                max_rep,
5715                max_visible_def,
5716                total_items,
5717                PreambleAction::Take,
5718            );
5719            assert_eq!(item_range, expected_item_range);
5720            assert_eq!(level_range, expected_level_range);
5721        };
5722
5723        // If we are taking the preamble then the range must start at 0
5724        check(0..1, 0..3, 0..3);
5725        check(0..2, 0..3, 0..4);
5726
5727        let check = |range, expected_item_range, expected_level_range| {
5728            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5729                range,
5730                rep.as_ref(),
5731                def.as_ref(),
5732                max_rep,
5733                max_visible_def,
5734                total_items,
5735                PreambleAction::Skip,
5736            );
5737            assert_eq!(item_range, expected_item_range);
5738            assert_eq!(level_range, expected_level_range);
5739        };
5740
5741        check(0..1, 1..3, 1..3);
5742        check(1..2, 3..3, 3..4);
5743        check(0..2, 1..3, 1..4);
5744
5745        // Another preamble case but now it doesn't end with a new list
5746        // [[..., A], NULL, [D, E]]
5747        //
5748        // What we do will depend on the preamble action
5749        let rep = Some(vec![0, 1, 1, 0]);
5750        let def = Some(vec![0, 1, 0, 0]);
5751        let max_visible_def = 0;
5752        let total_items = 4;
5753
5754        let check = |range, expected_item_range, expected_level_range| {
5755            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5756                range,
5757                rep.as_ref(),
5758                def.as_ref(),
5759                max_rep,
5760                max_visible_def,
5761                total_items,
5762                PreambleAction::Take,
5763            );
5764            assert_eq!(item_range, expected_item_range);
5765            assert_eq!(level_range, expected_level_range);
5766        };
5767
5768        // If we are taking the preamble then the range must start at 0
5769        check(0..1, 0..1, 0..2);
5770        check(0..2, 0..3, 0..4);
5771
5772        let check = |range, expected_item_range, expected_level_range| {
5773            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5774                range,
5775                rep.as_ref(),
5776                def.as_ref(),
5777                max_rep,
5778                max_visible_def,
5779                total_items,
5780                PreambleAction::Skip,
5781            );
5782            assert_eq!(item_range, expected_item_range);
5783            assert_eq!(level_range, expected_level_range);
5784        };
5785
5786        // If we are taking the preamble then the range must start at 0
5787        check(0..1, 1..1, 1..2);
5788        check(1..2, 1..3, 2..4);
5789        check(0..2, 1..3, 1..4);
5790
5791        // Now a preamble case without any definition levels
5792        // [[..., A] [B, C], [D]]
5793        let rep = Some(vec![0, 1, 0, 1]);
5794        let def: Option<Vec<u16>> = None;
5795        let max_visible_def = 0;
5796        let total_items = 4;
5797
5798        let check = |range, expected_item_range, expected_level_range| {
5799            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5800                range,
5801                rep.as_ref(),
5802                def.as_ref(),
5803                max_rep,
5804                max_visible_def,
5805                total_items,
5806                PreambleAction::Take,
5807            );
5808            assert_eq!(item_range, expected_item_range);
5809            assert_eq!(level_range, expected_level_range);
5810        };
5811
5812        // If we are taking the preamble then the range must start at 0
5813        check(0..1, 0..3, 0..3);
5814        check(0..2, 0..4, 0..4);
5815
5816        let check = |range, expected_item_range, expected_level_range| {
5817            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5818                range,
5819                rep.as_ref(),
5820                def.as_ref(),
5821                max_rep,
5822                max_visible_def,
5823                total_items,
5824                PreambleAction::Skip,
5825            );
5826            assert_eq!(item_range, expected_item_range);
5827            assert_eq!(level_range, expected_level_range);
5828        };
5829
5830        check(0..1, 1..3, 1..3);
5831        check(1..2, 3..4, 3..4);
5832        check(0..2, 1..4, 1..4);
5833
5834        // If we have nested lists then non-top level lists may be empty/null
5835        // and we need to make sure we still handle them as invisible items (we
5836        // failed to do this previously)
5837        let rep = Some(vec![2, 1, 2, 0, 1, 2]);
5838        let def = Some(vec![0, 1, 2, 0, 0, 0]);
5839        let max_rep = 2;
5840        let max_visible_def = 0;
5841        let total_items = 4;
5842
5843        let check = |range, expected_item_range, expected_level_range| {
5844            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5845                range,
5846                rep.as_ref(),
5847                def.as_ref(),
5848                max_rep,
5849                max_visible_def,
5850                total_items,
5851                PreambleAction::Absent,
5852            );
5853            assert_eq!(item_range, expected_item_range);
5854            assert_eq!(level_range, expected_level_range);
5855        };
5856
5857        check(0..3, 0..4, 0..6);
5858        check(0..1, 0..1, 0..2);
5859        check(1..2, 1..3, 2..5);
5860        check(2..3, 3..4, 5..6);
5861
5862        // Invisible items in a preamble that we are taking (regressing a previous failure)
5863        let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5864        let def = Some(vec![0, 1, 0, 0, 0, 0]);
5865        let max_rep = 1;
5866        let max_visible_def = 0;
5867        let total_items = 5;
5868
5869        let check = |range, expected_item_range, expected_level_range| {
5870            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5871                range,
5872                rep.as_ref(),
5873                def.as_ref(),
5874                max_rep,
5875                max_visible_def,
5876                total_items,
5877                PreambleAction::Take,
5878            );
5879            assert_eq!(item_range, expected_item_range);
5880            assert_eq!(level_range, expected_level_range);
5881        };
5882
5883        check(0..0, 0..1, 0..2);
5884        check(0..1, 0..3, 0..4);
5885        check(0..2, 0..4, 0..5);
5886
5887        // Skip preamble (with invis items) and skip a few rows (with invis items)
5888        // and then take a few rows but not all the rows
5889        let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5890        let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5891        let max_rep = 1;
5892        let max_visible_def = 0;
5893        let total_items = 5;
5894
5895        let check = |range, expected_item_range, expected_level_range| {
5896            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5897                range,
5898                rep.as_ref(),
5899                def.as_ref(),
5900                max_rep,
5901                max_visible_def,
5902                total_items,
5903                PreambleAction::Skip,
5904            );
5905            assert_eq!(item_range, expected_item_range);
5906            assert_eq!(level_range, expected_level_range);
5907        };
5908
5909        check(2..3, 2..4, 5..7);
5910    }
5911
5912    #[test]
5913    fn test_slice_batch_data_and_rebase_offsets_u32() {
5914        let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
5915        let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
5916
5917        let (sliced_data, normalized_offsets) =
5918            VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5919                .unwrap();
5920
5921        assert_eq!(sliced_data.as_ref(), b"6789ab");
5922        let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
5923        assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
5924    }
5925
5926    #[test]
5927    fn test_slice_batch_data_and_rebase_offsets_u64() {
5928        let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
5929        let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
5930
5931        let (sliced_data, normalized_offsets) =
5932            VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
5933                .unwrap();
5934
5935        assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
5936        let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
5937        assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
5938    }
5939
5940    #[test]
5941    fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
5942        let data = LanceBuffer::copy_slice(b"abcd");
5943        let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
5944
5945        let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5946            .expect_err("offset end before start should error");
5947        assert!(err.to_string().contains("less than base"));
5948    }
5949
5950    #[test]
5951    fn test_schedule_instructions() {
5952        // Convert repetition index to bytes for testing
5953        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5954        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5955        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5956
5957        let check = |user_ranges, expected_instructions| {
5958            let instructions =
5959                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5960            assert_eq!(instructions, expected_instructions);
5961        };
5962
5963        // The instructions we expect if we're grabbing the whole range
5964        let expected_take_all = vec![
5965            ChunkInstructions {
5966                chunk_idx: 0,
5967                preamble: PreambleAction::Absent,
5968                rows_to_skip: 0,
5969                rows_to_take: 6,
5970                take_trailer: true,
5971            },
5972            ChunkInstructions {
5973                chunk_idx: 1,
5974                preamble: PreambleAction::Take,
5975                rows_to_skip: 0,
5976                rows_to_take: 2,
5977                take_trailer: false,
5978            },
5979            ChunkInstructions {
5980                chunk_idx: 2,
5981                preamble: PreambleAction::Absent,
5982                rows_to_skip: 0,
5983                rows_to_take: 5,
5984                take_trailer: true,
5985            },
5986            ChunkInstructions {
5987                chunk_idx: 3,
5988                preamble: PreambleAction::Take,
5989                rows_to_skip: 0,
5990                rows_to_take: 1,
5991                take_trailer: false,
5992            },
5993        ];
5994
5995        // Take all as 1 range
5996        check(&[0..14], expected_take_all.clone());
5997
5998        // Take all a individual rows
5999        check(
6000            &[
6001                0..1,
6002                1..2,
6003                2..3,
6004                3..4,
6005                4..5,
6006                5..6,
6007                6..7,
6008                7..8,
6009                8..9,
6010                9..10,
6011                10..11,
6012                11..12,
6013                12..13,
6014                13..14,
6015            ],
6016            expected_take_all,
6017        );
6018
6019        // Test some partial takes
6020
6021        // 2 rows in the same chunk but not contiguous
6022        check(
6023            &[0..1, 3..4],
6024            vec![
6025                ChunkInstructions {
6026                    chunk_idx: 0,
6027                    preamble: PreambleAction::Absent,
6028                    rows_to_skip: 0,
6029                    rows_to_take: 1,
6030                    take_trailer: false,
6031                },
6032                ChunkInstructions {
6033                    chunk_idx: 0,
6034                    preamble: PreambleAction::Absent,
6035                    rows_to_skip: 3,
6036                    rows_to_take: 1,
6037                    take_trailer: false,
6038                },
6039            ],
6040        );
6041
6042        // Taking just a trailer/preamble
6043        check(
6044            &[5..6],
6045            vec![
6046                ChunkInstructions {
6047                    chunk_idx: 0,
6048                    preamble: PreambleAction::Absent,
6049                    rows_to_skip: 5,
6050                    rows_to_take: 1,
6051                    take_trailer: true,
6052                },
6053                ChunkInstructions {
6054                    chunk_idx: 1,
6055                    preamble: PreambleAction::Take,
6056                    rows_to_skip: 0,
6057                    rows_to_take: 0,
6058                    take_trailer: false,
6059                },
6060            ],
6061        );
6062
6063        // Skipping an entire chunk
6064        check(
6065            &[7..10],
6066            vec![
6067                ChunkInstructions {
6068                    chunk_idx: 1,
6069                    preamble: PreambleAction::Skip,
6070                    rows_to_skip: 1,
6071                    rows_to_take: 1,
6072                    take_trailer: false,
6073                },
6074                ChunkInstructions {
6075                    chunk_idx: 2,
6076                    preamble: PreambleAction::Absent,
6077                    rows_to_skip: 0,
6078                    rows_to_take: 2,
6079                    take_trailer: false,
6080                },
6081            ],
6082        );
6083    }
6084
6085    #[test]
6086    fn test_drain_instructions() {
6087        fn drain_from_instructions(
6088            instructions: &mut VecDeque<ChunkInstructions>,
6089            mut rows_desired: u64,
6090            need_preamble: &mut bool,
6091            skip_in_chunk: &mut u64,
6092        ) -> Vec<ChunkDrainInstructions> {
6093            // Note: instructions.len() is an upper bound, we typically take much fewer
6094            let mut drain_instructions = Vec::with_capacity(instructions.len());
6095            while rows_desired > 0 || *need_preamble {
6096                let (next_instructions, consumed_chunk) = instructions
6097                    .front()
6098                    .unwrap()
6099                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
6100                if consumed_chunk {
6101                    instructions.pop_front();
6102                }
6103                drain_instructions.push(next_instructions);
6104            }
6105            drain_instructions
6106        }
6107
6108        // Convert repetition index to bytes for testing
6109        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
6110        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6111        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6112        let user_ranges = vec![1..7, 10..14];
6113
6114        // First, schedule the ranges
6115        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6116
6117        let mut to_drain = VecDeque::from(scheduled.clone());
6118
6119        // Now we drain in batches of 4
6120
6121        let mut need_preamble = false;
6122        let mut skip_in_chunk = 0;
6123
6124        let next_batch =
6125            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6126
6127        assert!(!need_preamble);
6128        assert_eq!(skip_in_chunk, 4);
6129        assert_eq!(
6130            next_batch,
6131            vec![ChunkDrainInstructions {
6132                chunk_instructions: scheduled[0].clone(),
6133                rows_to_take: 4,
6134                rows_to_skip: 0,
6135                preamble_action: PreambleAction::Absent,
6136            }]
6137        );
6138
6139        let next_batch =
6140            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6141
6142        assert!(!need_preamble);
6143        assert_eq!(skip_in_chunk, 2);
6144
6145        assert_eq!(
6146            next_batch,
6147            vec![
6148                ChunkDrainInstructions {
6149                    chunk_instructions: scheduled[0].clone(),
6150                    rows_to_take: 1,
6151                    rows_to_skip: 4,
6152                    preamble_action: PreambleAction::Absent,
6153                },
6154                ChunkDrainInstructions {
6155                    chunk_instructions: scheduled[1].clone(),
6156                    rows_to_take: 1,
6157                    rows_to_skip: 0,
6158                    preamble_action: PreambleAction::Take,
6159                },
6160                ChunkDrainInstructions {
6161                    chunk_instructions: scheduled[2].clone(),
6162                    rows_to_take: 2,
6163                    rows_to_skip: 0,
6164                    preamble_action: PreambleAction::Absent,
6165                }
6166            ]
6167        );
6168
6169        let next_batch =
6170            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6171
6172        assert!(!need_preamble);
6173        assert_eq!(skip_in_chunk, 0);
6174
6175        assert_eq!(
6176            next_batch,
6177            vec![
6178                ChunkDrainInstructions {
6179                    chunk_instructions: scheduled[2].clone(),
6180                    rows_to_take: 1,
6181                    rows_to_skip: 2,
6182                    preamble_action: PreambleAction::Absent,
6183                },
6184                ChunkDrainInstructions {
6185                    chunk_instructions: scheduled[3].clone(),
6186                    rows_to_take: 1,
6187                    rows_to_skip: 0,
6188                    preamble_action: PreambleAction::Take,
6189                },
6190            ]
6191        );
6192
6193        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
6194        let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
6195        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6196        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6197        let user_ranges = vec![0..28];
6198
6199        // First, schedule the ranges
6200        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6201
6202        let mut to_drain = VecDeque::from(scheduled.clone());
6203
6204        // Drain first chunk and some of second chunk
6205
6206        let mut need_preamble = false;
6207        let mut skip_in_chunk = 0;
6208
6209        let next_batch =
6210            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
6211
6212        assert_eq!(
6213            next_batch,
6214            vec![
6215                ChunkDrainInstructions {
6216                    chunk_instructions: scheduled[0].clone(),
6217                    rows_to_take: 6,
6218                    rows_to_skip: 0,
6219                    preamble_action: PreambleAction::Absent,
6220                },
6221                ChunkDrainInstructions {
6222                    chunk_instructions: scheduled[1].clone(),
6223                    rows_to_take: 1,
6224                    rows_to_skip: 0,
6225                    preamble_action: PreambleAction::Take,
6226                },
6227            ]
6228        );
6229
6230        assert!(!need_preamble);
6231        assert_eq!(skip_in_chunk, 1);
6232
6233        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
6234        // we get a drain task to take the preamble of the third chunk (and nothing else)
6235        let next_batch =
6236            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6237
6238        assert_eq!(
6239            next_batch,
6240            vec![
6241                ChunkDrainInstructions {
6242                    chunk_instructions: scheduled[1].clone(),
6243                    rows_to_take: 2,
6244                    rows_to_skip: 1,
6245                    preamble_action: PreambleAction::Skip,
6246                },
6247                ChunkDrainInstructions {
6248                    chunk_instructions: scheduled[2].clone(),
6249                    rows_to_take: 0,
6250                    rows_to_skip: 0,
6251                    preamble_action: PreambleAction::Take,
6252                },
6253            ]
6254        );
6255
6256        assert!(!need_preamble);
6257        assert_eq!(skip_in_chunk, 0);
6258    }
6259
6260    #[tokio::test]
6261    async fn test_fullzip_initialize_is_lazy() {
6262        use futures::{FutureExt, future::BoxFuture};
6263        use std::ops::Range;
6264        use std::sync::Mutex;
6265
6266        #[derive(Debug, Clone)]
6267        struct RecordingScheduler {
6268            data: bytes::Bytes,
6269            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6270        }
6271
6272        impl RecordingScheduler {
6273            fn new(data: bytes::Bytes) -> Self {
6274                Self {
6275                    data,
6276                    requests: Arc::new(Mutex::new(Vec::new())),
6277                }
6278            }
6279
6280            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6281                self.requests.lock().unwrap().clone()
6282            }
6283        }
6284
6285        impl crate::EncodingsIo for RecordingScheduler {
6286            fn submit_request(
6287                &self,
6288                ranges: Vec<Range<u64>>,
6289                _priority: u64,
6290            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6291                self.requests.lock().unwrap().push(ranges.clone());
6292                let data = ranges
6293                    .into_iter()
6294                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6295                    .collect::<Vec<_>>();
6296                std::future::ready(Ok(data)).boxed()
6297            }
6298        }
6299
6300        #[derive(Debug)]
6301        struct TestFixedDecompressor;
6302
6303        impl FixedPerValueDecompressor for TestFixedDecompressor {
6304            fn decompress(
6305                &self,
6306                _data: FixedWidthDataBlock,
6307                _num_rows: u64,
6308            ) -> crate::Result<DataBlock> {
6309                unimplemented!("Test decompressor")
6310            }
6311
6312            fn bits_per_value(&self) -> u64 {
6313                32
6314            }
6315        }
6316
6317        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
6318            0;
6319            16 * 1024
6320        ])));
6321        let mut scheduler = FullZipScheduler {
6322            data_buf_position: 0,
6323            data_buf_size: 4096,
6324            rep_index: Some(FullZipRepIndexDetails {
6325                buf_position: 1000,
6326                bytes_per_value: 4,
6327            }),
6328            priority: 0,
6329            rows_in_page: 100,
6330            bits_per_offset: 32,
6331            details: Arc::new(FullZipDecodeDetails {
6332                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6333                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6334                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6335                max_rep: 0,
6336                max_visible_def: 0,
6337            }),
6338            cached_state: None,
6339            enable_cache: false,
6340        };
6341
6342        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6343        let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6344
6345        assert!(
6346            cached_data
6347                .as_arc_any()
6348                .downcast_ref::<super::NoCachedPageData>()
6349                .is_some(),
6350            "FullZip initialize should not eagerly load repetition index data"
6351        );
6352        assert!(scheduler.cached_state.is_none());
6353        assert!(
6354            io.requests().is_empty(),
6355            "FullZip initialize should not issue any I/O"
6356        );
6357    }
6358
6359    #[tokio::test]
6360    async fn test_fullzip_read_source_slices_prefetched_page() {
6361        let page_start = 200_u64;
6362        let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
6363        let source = FullZipReadSource::PrefetchedPage {
6364            base_offset: page_start,
6365            data: page_data,
6366        };
6367        let ranges = vec![
6368            page_start..(page_start + 3),
6369            (page_start + 4)..(page_start + 8),
6370        ];
6371        let mut data = source.fetch(&ranges, 0).await.unwrap();
6372        assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
6373        assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
6374    }
6375
6376    #[tokio::test]
6377    async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
6378        use futures::{FutureExt, future::BoxFuture};
6379        use std::ops::Range;
6380        use std::sync::Mutex;
6381
6382        #[derive(Debug, Clone)]
6383        struct RecordingScheduler {
6384            data: bytes::Bytes,
6385            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6386        }
6387
6388        impl RecordingScheduler {
6389            fn new(data: bytes::Bytes) -> Self {
6390                Self {
6391                    data,
6392                    requests: Arc::new(Mutex::new(Vec::new())),
6393                }
6394            }
6395
6396            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6397                self.requests.lock().unwrap().clone()
6398            }
6399        }
6400
6401        impl crate::EncodingsIo for RecordingScheduler {
6402            fn submit_request(
6403                &self,
6404                ranges: Vec<Range<u64>>,
6405                _priority: u64,
6406            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6407                self.requests.lock().unwrap().push(ranges.clone());
6408                let data = ranges
6409                    .into_iter()
6410                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6411                    .collect::<Vec<_>>();
6412                std::future::ready(Ok(data)).boxed()
6413            }
6414        }
6415
6416        #[derive(Debug)]
6417        struct TestFixedDecompressor;
6418
6419        impl FixedPerValueDecompressor for TestFixedDecompressor {
6420            fn decompress(
6421                &self,
6422                _data: FixedWidthDataBlock,
6423                _num_rows: u64,
6424            ) -> crate::Result<DataBlock> {
6425                unimplemented!("Test decompressor")
6426            }
6427
6428            fn bits_per_value(&self) -> u64 {
6429                32
6430            }
6431        }
6432
6433        let rows_in_page = 100_u64;
6434        let bytes_per_value = 4_u64;
6435        let rep_start = 1000_u64;
6436        let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
6437        let mut data = vec![0_u8; 16 * 1024];
6438        data[rep_start as usize..rep_start as usize + rep_size].fill(7);
6439        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
6440
6441        let mut scheduler = FullZipScheduler {
6442            data_buf_position: 0,
6443            data_buf_size: 4096,
6444            rep_index: Some(FullZipRepIndexDetails {
6445                buf_position: rep_start,
6446                bytes_per_value,
6447            }),
6448            priority: 0,
6449            rows_in_page,
6450            bits_per_offset: 32,
6451            details: Arc::new(FullZipDecodeDetails {
6452                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6453                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6454                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6455                max_rep: 0,
6456                max_visible_def: 0,
6457            }),
6458            cached_state: None,
6459            enable_cache: true,
6460        };
6461
6462        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6463        let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6464        assert!(
6465            cached_data
6466                .as_arc_any()
6467                .downcast_ref::<FullZipCacheableState>()
6468                .is_some()
6469        );
6470        assert!(scheduler.cached_state.is_some());
6471        assert_eq!(
6472            io.requests(),
6473            vec![vec![
6474                rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
6475            ]]
6476        );
6477    }
6478
6479    #[tokio::test]
6480    async fn test_fullzip_full_page_bypasses_rep_index_io() {
6481        use futures::{FutureExt, future::BoxFuture};
6482        use std::ops::Range;
6483        use std::sync::Mutex;
6484
6485        #[derive(Debug, Clone)]
6486        struct RecordingScheduler {
6487            data: bytes::Bytes,
6488            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6489        }
6490
6491        impl RecordingScheduler {
6492            fn new(data: bytes::Bytes) -> Self {
6493                Self {
6494                    data,
6495                    requests: Arc::new(Mutex::new(Vec::new())),
6496                }
6497            }
6498
6499            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6500                self.requests.lock().unwrap().clone()
6501            }
6502        }
6503
6504        impl crate::EncodingsIo for RecordingScheduler {
6505            fn submit_request(
6506                &self,
6507                ranges: Vec<Range<u64>>,
6508                _priority: u64,
6509            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6510                self.requests.lock().unwrap().push(ranges.clone());
6511                let data = ranges
6512                    .into_iter()
6513                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6514                    .collect::<Vec<_>>();
6515                std::future::ready(Ok(data)).boxed()
6516            }
6517        }
6518
6519        #[derive(Debug)]
6520        struct TestFixedDecompressor;
6521
6522        impl FixedPerValueDecompressor for TestFixedDecompressor {
6523            fn decompress(
6524                &self,
6525                _data: FixedWidthDataBlock,
6526                _num_rows: u64,
6527            ) -> crate::Result<DataBlock> {
6528                unimplemented!("Test decompressor")
6529            }
6530
6531            fn bits_per_value(&self) -> u64 {
6532                32
6533            }
6534        }
6535
6536        let rows_in_page = 100_u64;
6537        let data_start = 256_u64;
6538        let data_size = 500_u64;
6539        let rep_start = 4096_u64;
6540        let bytes_per_value = 4_u64;
6541
6542        let mut bytes = vec![0_u8; 16 * 1024];
6543        for i in 0..=rows_in_page {
6544            let offset = (i * 5) as u32;
6545            let pos = rep_start as usize + (i * bytes_per_value) as usize;
6546            bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
6547        }
6548        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
6549
6550        let scheduler = FullZipScheduler {
6551            data_buf_position: data_start,
6552            data_buf_size: data_size,
6553            rep_index: Some(FullZipRepIndexDetails {
6554                buf_position: rep_start,
6555                bytes_per_value,
6556            }),
6557            priority: 0,
6558            rows_in_page,
6559            bits_per_offset: 32,
6560            details: Arc::new(FullZipDecodeDetails {
6561                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6562                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6563                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6564                max_rep: 0,
6565                max_visible_def: 0,
6566            }),
6567            cached_state: None,
6568            enable_cache: false,
6569        };
6570
6571        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6572        let tasks = scheduler
6573            .schedule_ranges_rep(
6574                &[0..rows_in_page],
6575                &io_dyn,
6576                FullZipRepIndexDetails {
6577                    buf_position: rep_start,
6578                    bytes_per_value,
6579                },
6580            )
6581            .unwrap();
6582
6583        let requests = io.requests();
6584        assert_eq!(requests.len(), 1);
6585        assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
6586
6587        let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
6588        let requests_after_await = io.requests();
6589        assert_eq!(
6590            requests_after_await.len(),
6591            1,
6592            "full page path should not issue rep-index I/O"
6593        );
6594    }
6595
6596    /// This test is used to reproduce fuzz test https://github.com/lancedb/lance/issues/4492
6597    #[tokio::test]
6598    async fn test_fuzz_issue_4492_empty_rep_values() {
6599        use lance_datagen::{RowCount, Seed, array, gen_batch};
6600
6601        let seed = 1823859942947654717u64;
6602        let num_rows = 2741usize;
6603
6604        // Generate the exact same data that caused the failure
6605        let batch_gen = gen_batch().with_seed(Seed::from(seed));
6606        let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
6607        let list_generator = array::rand_list_any(base_generator, false);
6608
6609        let batch = batch_gen
6610            .anon_col(list_generator)
6611            .into_batch_rows(RowCount::from(num_rows as u64))
6612            .unwrap();
6613
6614        let list_array = batch.column(0).clone();
6615
6616        // Force miniblock encoding
6617        let mut metadata = HashMap::new();
6618        metadata.insert(
6619            STRUCTURAL_ENCODING_META_KEY.to_string(),
6620            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6621        );
6622
6623        let test_cases = TestCases::default()
6624            .with_min_file_version(LanceFileVersion::V2_1)
6625            .with_batch_size(100)
6626            .with_range(0..num_rows.min(500) as u64)
6627            .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
6628
6629        check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
6630    }
6631
6632    async fn test_minichunk_size_helper(
6633        string_data: Vec<Option<String>>,
6634        minichunk_size: u64,
6635        file_version: LanceFileVersion,
6636    ) {
6637        use crate::constants::MINICHUNK_SIZE_META_KEY;
6638        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6639        use arrow_array::{ArrayRef, StringArray};
6640        use std::sync::Arc;
6641
6642        let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
6643
6644        let mut metadata = HashMap::new();
6645        metadata.insert(
6646            MINICHUNK_SIZE_META_KEY.to_string(),
6647            minichunk_size.to_string(),
6648        );
6649        metadata.insert(
6650            STRUCTURAL_ENCODING_META_KEY.to_string(),
6651            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6652        );
6653
6654        let test_cases = TestCases::default()
6655            .with_min_file_version(file_version)
6656            .with_batch_size(1000);
6657
6658        check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
6659    }
6660
6661    #[tokio::test]
6662    async fn test_minichunk_size_roundtrip() {
6663        // Test that minichunk size can be configured and works correctly in round-trip encoding
6664        let mut string_data = Vec::new();
6665        for i in 0..100 {
6666            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6667        }
6668        // configure minichunk size to 64 bytes (smaller than the default 4kb) for Lance 2.1
6669        test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
6670    }
6671
6672    #[tokio::test]
6673    async fn test_minichunk_size_128kb_v2_2() {
6674        // Test that minichunk size can be configured to 128KB and works correctly with Lance 2.2
6675        let mut string_data = Vec::new();
6676        // create a 500kb string array
6677        for i in 0..10000 {
6678            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6679        }
6680        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6681    }
6682
6683    #[tokio::test]
6684    async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
6685        let mut string_data = Vec::new();
6686        // 128kb/chunk / 6 bytes (t_9999) = 21845 > max 4096 items per chunk
6687        for i in 0..10000 {
6688            string_data.push(Some(format!("t_{}", i)));
6689        }
6690        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6691    }
6692
6693    #[tokio::test]
6694    async fn test_large_dictionary_general_compression() {
6695        use arrow_array::{ArrayRef, StringArray};
6696        use std::collections::HashMap;
6697        use std::sync::Arc;
6698
6699        // Create large string dictionary data (>32KiB) with low cardinality
6700        // Use 100 unique strings, each 500 bytes long = 50KB dictionary
6701        let unique_values: Vec<String> = (0..100)
6702            .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
6703            .collect();
6704
6705        // Repeat these strings many times to create a large array
6706        let repeated_strings: Vec<_> = unique_values
6707            .iter()
6708            .cycle()
6709            .take(100_000)
6710            .map(|s| Some(s.as_str()))
6711            .collect();
6712
6713        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
6714
6715        // Configure test to use V2_2 and verify encoding
6716        let test_cases = TestCases::default()
6717            .with_min_file_version(LanceFileVersion::V2_2)
6718            .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
6719                assert_eq!(cols.len(), 1);
6720                let col = &cols[0];
6721
6722                // Navigate to the dictionary encoding in the page layout
6723                if let Some(PageEncoding::Structural(page_layout)) =
6724                    &col.final_pages.first().map(|p| &p.description)
6725                    && let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
6726                        &page_layout.layout
6727                    && let Some(dictionary_encoding) = &mini_block.dictionary
6728                {
6729                    match dictionary_encoding.compression.as_ref() {
6730                        Some(Compression::General(general)) => {
6731                            // Verify it's using LZ4 or Zstd
6732                            let compression = general.compression.as_ref().unwrap();
6733                            assert!(
6734                                compression.scheme()
6735                                    == pb21::CompressionScheme::CompressionAlgorithmLz4
6736                                    || compression.scheme()
6737                                        == pb21::CompressionScheme::CompressionAlgorithmZstd,
6738                                "Expected LZ4 or Zstd compression for large dictionary"
6739                            );
6740                        }
6741                        _ => panic!("Expected General compression for large dictionary"),
6742                    }
6743                }
6744            }));
6745
6746        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
6747    }
6748
6749    fn dictionary_encoding_from_page(
6750        page: &crate::encoder::EncodedPage,
6751    ) -> &crate::format::pb21::CompressiveEncoding {
6752        let PageEncoding::Structural(layout) = &page.description else {
6753            panic!("Expected structural page encoding");
6754        };
6755        let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
6756        else {
6757            panic!("Expected mini-block layout");
6758        };
6759        layout
6760            .dictionary
6761            .as_ref()
6762            .unwrap_or_else(|| panic!("Expected dictionary encoding"))
6763    }
6764
6765    async fn encode_variable_dict_page(
6766        metadata: HashMap<String, String>,
6767    ) -> crate::encoder::EncodedPage {
6768        use arrow_array::types::Int32Type;
6769        use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
6770
6771        let values = Arc::new(StringArray::from(
6772            (0..128)
6773                .map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
6774                .collect::<Vec<_>>(),
6775        )) as ArrayRef;
6776        let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
6777        let dict_array =
6778            Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
6779
6780        let field = arrow_schema::Field::new(
6781            "dict_col",
6782            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
6783            false,
6784        )
6785        .with_metadata(metadata);
6786
6787        encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
6788    }
6789
6790    async fn encode_auto_fixed_dict_page(
6791        metadata: HashMap<String, String>,
6792    ) -> crate::encoder::EncodedPage {
6793        use arrow_array::{ArrayRef, Decimal128Array};
6794
6795        // 128-bit fixed-width values with low cardinality to trigger dictionary encoding.
6796        let values = (0..20_000)
6797            .map(|i| match i % 3 {
6798                0 => 10_i128,
6799                1 => 20_i128,
6800                _ => 30_i128,
6801            })
6802            .collect::<Vec<_>>();
6803        let decimal = Decimal128Array::from_iter_values(values)
6804            .with_precision_and_scale(38, 0)
6805            .unwrap();
6806        let decimal = Arc::new(decimal) as ArrayRef;
6807
6808        let mut field_metadata = metadata;
6809        // Strongly encourage dictionary encoding for this synthetic test data.
6810        field_metadata.insert(
6811            "lance-encoding:dict-size-ratio".to_string(),
6812            "0.99".to_string(),
6813        );
6814        let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
6815            .with_metadata(field_metadata);
6816
6817        encode_first_page(field, decimal, LanceFileVersion::V2_2).await
6818    }
6819
6820    #[tokio::test]
6821    async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
6822        let page = encode_variable_dict_page(HashMap::new()).await;
6823        let dictionary_encoding = dictionary_encoding_from_page(&page);
6824        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6825            panic!("Expected General compression for dictionary values");
6826        };
6827        let compression = general.compression.as_ref().unwrap();
6828        assert_eq!(
6829            compression.scheme(),
6830            pb21::CompressionScheme::CompressionAlgorithmLz4
6831        );
6832    }
6833
6834    #[tokio::test]
6835    async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
6836        let page = encode_auto_fixed_dict_page(HashMap::new()).await;
6837        let dictionary_encoding = dictionary_encoding_from_page(&page);
6838        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6839            panic!("Expected General compression for dictionary values");
6840        };
6841        let compression = general.compression.as_ref().unwrap();
6842        assert_eq!(
6843            compression.scheme(),
6844            pb21::CompressionScheme::CompressionAlgorithmLz4
6845        );
6846    }
6847
6848    #[tokio::test]
6849    async fn test_dict_values_general_compression_zstd() {
6850        let mut metadata = HashMap::new();
6851        metadata.insert(
6852            DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6853            "zstd".to_string(),
6854        );
6855        let page = encode_variable_dict_page(metadata).await;
6856        let dictionary_encoding = dictionary_encoding_from_page(&page);
6857        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6858            panic!("Expected General compression for dictionary values");
6859        };
6860        let compression = general.compression.as_ref().unwrap();
6861        assert_eq!(
6862            compression.scheme(),
6863            pb21::CompressionScheme::CompressionAlgorithmZstd
6864        );
6865    }
6866
6867    #[tokio::test]
6868    async fn test_dict_values_general_compression_none() {
6869        let mut metadata = HashMap::new();
6870        metadata.insert(
6871            DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6872            "none".to_string(),
6873        );
6874        let page = encode_variable_dict_page(metadata).await;
6875        let dictionary_encoding = dictionary_encoding_from_page(&page);
6876        assert!(
6877            !matches!(
6878                dictionary_encoding.compression.as_ref(),
6879                Some(Compression::General(_))
6880            ),
6881            "Expected dictionary values to avoid General compression"
6882        );
6883    }
6884
6885    #[test]
6886    fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
6887        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6888            &HashMap::new(),
6889            None,
6890            None,
6891        );
6892        assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
6893        assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
6894    }
6895
6896    #[test]
6897    fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
6898        let field_metadata = HashMap::from([
6899            (
6900                DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6901                "none".to_string(),
6902            ),
6903            (
6904                DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
6905                "7".to_string(),
6906            ),
6907        ]);
6908        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6909            &field_metadata,
6910            Some("zstd".to_string()),
6911            Some("3".to_string()),
6912        );
6913        assert_eq!(
6914            metadata.get(COMPRESSION_META_KEY),
6915            Some(&"none".to_string()),
6916        );
6917        assert_eq!(
6918            metadata.get(COMPRESSION_LEVEL_META_KEY),
6919            Some(&"7".to_string()),
6920        );
6921    }
6922
6923    #[test]
6924    fn test_resolve_dict_values_compression_metadata_env_fallback() {
6925        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6926            &HashMap::new(),
6927            Some("zstd".to_string()),
6928            Some("9".to_string()),
6929        );
6930        assert_eq!(
6931            metadata.get(COMPRESSION_META_KEY),
6932            Some(&"zstd".to_string()),
6933        );
6934        assert_eq!(
6935            metadata.get(COMPRESSION_LEVEL_META_KEY),
6936            Some(&"9".to_string()),
6937        );
6938    }
6939
6940    #[tokio::test]
6941    async fn test_dictionary_encode_int64() {
6942        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6943        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6944        use crate::version::LanceFileVersion;
6945        use arrow_array::{ArrayRef, Int64Array};
6946        use std::collections::HashMap;
6947        use std::sync::Arc;
6948
6949        // Low cardinality with poor RLE opportunity.
6950        let values = (0..1000)
6951            .map(|i| match i % 3 {
6952                0 => 10i64,
6953                1 => 20i64,
6954                _ => 30i64,
6955            })
6956            .collect::<Vec<_>>();
6957        let array = Arc::new(Int64Array::from(values)) as ArrayRef;
6958
6959        let mut metadata = HashMap::new();
6960        metadata.insert(
6961            STRUCTURAL_ENCODING_META_KEY.to_string(),
6962            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6963        );
6964        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6965
6966        let test_cases = TestCases::default()
6967            .with_min_file_version(LanceFileVersion::V2_2)
6968            .with_batch_size(1000)
6969            .with_range(0..1000)
6970            .with_indices(vec![0, 1, 10, 999])
6971            .with_expected_encoding("dictionary");
6972
6973        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6974    }
6975
6976    #[tokio::test]
6977    async fn test_dictionary_encode_float64() {
6978        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6979        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6980        use crate::version::LanceFileVersion;
6981        use arrow_array::{ArrayRef, Float64Array};
6982        use std::collections::HashMap;
6983        use std::sync::Arc;
6984
6985        // Low cardinality with poor RLE opportunity.
6986        let values = (0..1000)
6987            .map(|i| match i % 3 {
6988                0 => 0.1f64,
6989                1 => 0.2f64,
6990                _ => 0.3f64,
6991            })
6992            .collect::<Vec<_>>();
6993        let array = Arc::new(Float64Array::from(values)) as ArrayRef;
6994
6995        let mut metadata = HashMap::new();
6996        metadata.insert(
6997            STRUCTURAL_ENCODING_META_KEY.to_string(),
6998            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6999        );
7000        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
7001
7002        let test_cases = TestCases::default()
7003            .with_min_file_version(LanceFileVersion::V2_2)
7004            .with_batch_size(1000)
7005            .with_range(0..1000)
7006            .with_indices(vec![0, 1, 10, 999])
7007            .with_expected_encoding("dictionary");
7008
7009        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
7010    }
7011
7012    #[test]
7013    fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
7014        let rows = 10_000;
7015        let unique_values = 2_000;
7016
7017        let dictionary_encoding =
7018            ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
7019        let layout = pb21::MiniBlockLayout {
7020            rep_compression: None,
7021            def_compression: None,
7022            value_compression: Some(ProtobufUtils21::flat(64, None)),
7023            dictionary: Some(dictionary_encoding),
7024            num_dictionary_items: unique_values,
7025            layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
7026            num_buffers: 1,
7027            repetition_index_depth: 0,
7028            num_items: rows,
7029            has_large_chunk: false,
7030        };
7031
7032        let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
7033        let scheduler = super::MiniBlockScheduler::try_new(
7034            &buffer_offsets_and_sizes,
7035            /*priority=*/ 0,
7036            /*items_in_page=*/ rows,
7037            &layout,
7038            &DefaultDecompressionStrategy::default(),
7039        )
7040        .unwrap();
7041
7042        let dictionary = scheduler.dictionary.unwrap();
7043        assert_eq!(dictionary.num_dictionary_items, unique_values);
7044        assert_eq!(
7045            dictionary.dictionary_data_alignment,
7046            crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
7047        );
7048    }
7049
7050    // Dictionary encoding decision tests
7051    fn create_test_fixed_data_block(
7052        num_values: u64,
7053        cardinality: u64,
7054        bits_per_value: u64,
7055    ) -> DataBlock {
7056        assert!(cardinality > 0);
7057        assert!(cardinality <= num_values);
7058        let block_info = BlockInfo::default();
7059
7060        assert_eq!(bits_per_value % 8, 0);
7061        let data = match bits_per_value {
7062            32 => {
7063                let values = (0..num_values)
7064                    .map(|i| (i % cardinality) as u32)
7065                    .collect::<Vec<_>>();
7066                crate::buffer::LanceBuffer::reinterpret_vec(values)
7067            }
7068            64 => {
7069                let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
7070                crate::buffer::LanceBuffer::reinterpret_vec(values)
7071            }
7072            128 => {
7073                let values = (0..num_values)
7074                    .map(|i| (i % cardinality) as u128)
7075                    .collect::<Vec<_>>();
7076                crate::buffer::LanceBuffer::reinterpret_vec(values)
7077            }
7078            _ => unreachable!(),
7079        };
7080        DataBlock::FixedWidth(FixedWidthDataBlock {
7081            bits_per_value,
7082            data,
7083            num_values,
7084            block_info,
7085        })
7086    }
7087
7088    /// Helper to create VariableWidth (string) test data block with exact cardinality
7089    fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
7090        use arrow_array::StringArray;
7091
7092        assert!(cardinality <= num_values && cardinality > 0);
7093
7094        let mut values = Vec::with_capacity(num_values as usize);
7095        for i in 0..num_values {
7096            values.push(format!("value_{:016}", i % cardinality));
7097        }
7098
7099        let array = StringArray::from(values);
7100        DataBlock::from_array(Arc::new(array) as ArrayRef)
7101    }
7102
7103    #[test]
7104    fn test_should_dictionary_encode() {
7105        use crate::constants::DICT_SIZE_RATIO_META_KEY;
7106        use lance_core::datatypes::Field as LanceField;
7107
7108        // Create data where dict encoding saves space
7109        let block = create_test_variable_width_block(1000, 10);
7110
7111        let mut metadata = HashMap::new();
7112        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7113        let arrow_field =
7114            arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7115        let field = LanceField::try_from(&arrow_field).unwrap();
7116
7117        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7118            &block,
7119            &field,
7120            LanceFileVersion::V2_1,
7121        );
7122
7123        assert!(
7124            result.is_some(),
7125            "Should use dictionary encode based on size"
7126        );
7127    }
7128
7129    #[test]
7130    fn test_should_not_dictionary_encode_unsupported_bits() {
7131        use crate::constants::DICT_SIZE_RATIO_META_KEY;
7132        use lance_core::datatypes::Field as LanceField;
7133
7134        let block = create_test_fixed_data_block(1000, 1000, 32);
7135
7136        let mut metadata = HashMap::new();
7137        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7138        let arrow_field =
7139            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
7140        let field = LanceField::try_from(&arrow_field).unwrap();
7141
7142        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7143            &block,
7144            &field,
7145            LanceFileVersion::V2_1,
7146        );
7147
7148        assert!(
7149            result.is_none(),
7150            "Should not use dictionary encode for unsupported bit width"
7151        );
7152    }
7153
7154    #[test]
7155    fn test_should_not_dictionary_encode_near_unique_sample() {
7156        use crate::constants::DICT_SIZE_RATIO_META_KEY;
7157        use lance_core::datatypes::Field as LanceField;
7158
7159        let num_values = 5000;
7160        let block = create_test_variable_width_block(num_values, num_values);
7161
7162        let mut metadata = HashMap::new();
7163        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
7164        let arrow_field =
7165            arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7166        let field = LanceField::try_from(&arrow_field).unwrap();
7167
7168        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7169            &block,
7170            &field,
7171            LanceFileVersion::V2_1,
7172        );
7173
7174        assert!(
7175            result.is_none(),
7176            "Should not probe dictionary encoding for near-unique data"
7177        );
7178    }
7179
7180    async fn encode_first_page(
7181        field: arrow_schema::Field,
7182        array: ArrayRef,
7183        version: LanceFileVersion,
7184    ) -> crate::encoder::EncodedPage {
7185        use crate::encoder::{
7186            ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
7187            default_encoding_strategy,
7188        };
7189        use crate::repdef::RepDefBuilder;
7190
7191        let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
7192        let encoding_strategy = default_encoding_strategy(version);
7193        let mut column_index_seq = ColumnIndexSequence::default();
7194        let encoding_options = EncodingOptions {
7195            cache_bytes_per_column: 1,
7196            max_page_bytes: 32 * 1024 * 1024,
7197            keep_original_array: true,
7198            buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
7199            version,
7200        };
7201
7202        let mut encoder = encoding_strategy
7203            .create_field_encoder(
7204                encoding_strategy.as_ref(),
7205                &lance_field,
7206                &mut column_index_seq,
7207                &encoding_options,
7208            )
7209            .unwrap();
7210
7211        let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
7212        let repdef = RepDefBuilder::default();
7213        let num_rows = array.len() as u64;
7214        let mut pages = Vec::new();
7215        for task in encoder
7216            .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
7217            .unwrap()
7218        {
7219            pages.push(task.await.unwrap());
7220        }
7221        for task in encoder.flush(&mut external_buffers).unwrap() {
7222            pages.push(task.await.unwrap());
7223        }
7224        pages.into_iter().next().unwrap()
7225    }
7226
7227    #[tokio::test]
7228    async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
7229        use crate::format::pb21::page_layout::Layout;
7230
7231        let val = vec![0xABu8; 33];
7232        let arr: ArrayRef = Arc::new(
7233            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
7234                std::iter::repeat_n(Some(val.as_slice()), 256),
7235                33,
7236            )
7237            .unwrap(),
7238        );
7239        let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
7240        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7241
7242        let PageEncoding::Structural(layout) = &page.description else {
7243            panic!("Expected structural encoding");
7244        };
7245        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7246            panic!("Expected constant layout in slot 2");
7247        };
7248        assert!(layout.inline_value.is_none());
7249        assert_eq!(page.data.len(), 1);
7250
7251        let test_cases = TestCases::default()
7252            .with_min_file_version(LanceFileVersion::V2_2)
7253            .with_max_file_version(LanceFileVersion::V2_2)
7254            .with_page_sizes(vec![4096]);
7255        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7256    }
7257
7258    #[tokio::test]
7259    async fn test_constant_layout_out_of_line_utf8_v2_2() {
7260        use crate::format::pb21::page_layout::Layout;
7261
7262        let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
7263            std::iter::repeat_n("hello", 512),
7264        ));
7265        let field = arrow_schema::Field::new("c", DataType::Utf8, true);
7266        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7267
7268        let PageEncoding::Structural(layout) = &page.description else {
7269            panic!("Expected structural encoding");
7270        };
7271        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7272            panic!("Expected constant layout in slot 2");
7273        };
7274        assert!(layout.inline_value.is_none());
7275        assert_eq!(page.data.len(), 1);
7276
7277        let test_cases = TestCases::default()
7278            .with_min_file_version(LanceFileVersion::V2_2)
7279            .with_max_file_version(LanceFileVersion::V2_2)
7280            .with_page_sizes(vec![4096]);
7281        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7282    }
7283
7284    #[tokio::test]
7285    async fn test_constant_layout_nullable_item_v2_2() {
7286        use crate::format::pb21::page_layout::Layout;
7287
7288        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
7289            Some(7),
7290            None,
7291            Some(7),
7292            None,
7293            Some(7),
7294        ]));
7295        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7296        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7297
7298        let PageEncoding::Structural(layout) = &page.description else {
7299            panic!("Expected structural encoding");
7300        };
7301        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7302            panic!("Expected constant layout in slot 2");
7303        };
7304        assert!(layout.inline_value.is_some());
7305        assert_eq!(page.data.len(), 2);
7306
7307        let test_cases = TestCases::default()
7308            .with_min_file_version(LanceFileVersion::V2_2)
7309            .with_max_file_version(LanceFileVersion::V2_2)
7310            .with_page_sizes(vec![4096]);
7311        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7312    }
7313
7314    #[tokio::test]
7315    async fn test_constant_layout_list_repdef_v2_2() {
7316        use crate::format::pb21::page_layout::Layout;
7317        use arrow_array::builder::{Int32Builder, ListBuilder};
7318
7319        let mut builder = ListBuilder::new(Int32Builder::new());
7320        builder.values().append_value(7);
7321        builder.values().append_null();
7322        builder.values().append_value(7);
7323        builder.append(true);
7324
7325        builder.append(true);
7326
7327        builder.values().append_value(7);
7328        builder.append(true);
7329
7330        builder.append_null();
7331
7332        let arr: ArrayRef = Arc::new(builder.finish());
7333        let field = arrow_schema::Field::new(
7334            "c",
7335            DataType::List(Arc::new(arrow_schema::Field::new(
7336                "item",
7337                DataType::Int32,
7338                true,
7339            ))),
7340            true,
7341        );
7342        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7343
7344        let PageEncoding::Structural(layout) = &page.description else {
7345            panic!("Expected structural encoding");
7346        };
7347        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7348            panic!("Expected constant layout in slot 2");
7349        };
7350        assert!(layout.inline_value.is_some());
7351        assert_eq!(page.data.len(), 2);
7352
7353        let test_cases = TestCases::default()
7354            .with_min_file_version(LanceFileVersion::V2_2)
7355            .with_max_file_version(LanceFileVersion::V2_2)
7356            .with_page_sizes(vec![4096]);
7357        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7358    }
7359
7360    #[tokio::test]
7361    async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
7362        use crate::format::pb21::page_layout::Layout;
7363        use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
7364
7365        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
7366        for _ in 0..64 {
7367            builder.values().append_value(1);
7368            builder.values().append_null();
7369            builder.values().append_value(3);
7370            builder.append(true);
7371        }
7372        let arr: ArrayRef = Arc::new(builder.finish());
7373        let field = arrow_schema::Field::new(
7374            "c",
7375            DataType::FixedSizeList(
7376                Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
7377                3,
7378            ),
7379            true,
7380        );
7381        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7382
7383        if let PageEncoding::Structural(layout) = &page.description {
7384            assert!(
7385                !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7386                "FixedSizeList should not use constant layout yet"
7387            );
7388        }
7389
7390        let test_cases = TestCases::default()
7391            .with_min_file_version(LanceFileVersion::V2_2)
7392            .with_max_file_version(LanceFileVersion::V2_2)
7393            .with_page_sizes(vec![4096]);
7394        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7395    }
7396
7397    #[tokio::test]
7398    async fn test_constant_layout_not_written_before_v2_2() {
7399        use crate::format::pb21::page_layout::Layout;
7400
7401        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
7402        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7403        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
7404
7405        let PageEncoding::Structural(layout) = &page.description else {
7406            return;
7407        };
7408        assert!(
7409            !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7410            "Should not emit constant layout before v2.2"
7411        );
7412
7413        let test_cases = TestCases::default()
7414            .with_min_file_version(LanceFileVersion::V2_1)
7415            .with_max_file_version(LanceFileVersion::V2_1)
7416            .with_page_sizes(vec![4096]);
7417        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7418    }
7419
7420    #[tokio::test]
7421    async fn test_all_null_constant_layout_still_works_v2_2() {
7422        use crate::format::pb21::page_layout::Layout;
7423
7424        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
7425        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7426        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7427
7428        let PageEncoding::Structural(layout) = &page.description else {
7429            panic!("Expected structural encoding");
7430        };
7431        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7432            panic!("Expected layout in slot 2");
7433        };
7434        assert!(layout.inline_value.is_none());
7435        assert_eq!(page.data.len(), 0);
7436
7437        let test_cases = TestCases::default()
7438            .with_min_file_version(LanceFileVersion::V2_2)
7439            .with_max_file_version(LanceFileVersion::V2_2)
7440            .with_page_sizes(vec![4096]);
7441        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7442    }
7443
7444    #[test]
7445    fn test_encode_decode_complex_all_null_vals_roundtrip() {
7446        use crate::compression::{
7447            DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
7448        };
7449
7450        let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
7451
7452        let compression_strategy = DefaultCompressionStrategy::default();
7453        let decompression_strategy = DefaultDecompressionStrategy::default();
7454
7455        let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
7456            &values,
7457            &compression_strategy,
7458        )
7459        .unwrap();
7460
7461        let decompressor = decompression_strategy
7462            .create_block_decompressor(&encoding)
7463            .unwrap();
7464        let decompressed = decompressor
7465            .decompress(compressed_buf, values.len() as u64)
7466            .unwrap();
7467        let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
7468        assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
7469        assert_eq!(decompressed_fixed_width.bits_per_value, 16);
7470        let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
7471        assert_eq!(rep_result.as_ref(), values.as_ref());
7472    }
7473
7474    #[tokio::test]
7475    async fn test_complex_all_null_compression_gated_by_version() {
7476        use crate::format::pb21::page_layout::Layout;
7477        use arrow_array::ListArray;
7478
7479        let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7480            (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7481        );
7482        let arr: ArrayRef = Arc::new(list_array);
7483        let field = arrow_schema::Field::new(
7484            "c",
7485            DataType::List(Arc::new(arrow_schema::Field::new(
7486                "item",
7487                DataType::Int32,
7488                true,
7489            ))),
7490            true,
7491        );
7492
7493        let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
7494        let PageEncoding::Structural(layout_v21) = &page_v21.description else {
7495            panic!("Expected structural encoding");
7496        };
7497        let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
7498            panic!("Expected constant layout");
7499        };
7500        assert!(layout_v21.rep_compression.is_none());
7501        assert!(layout_v21.def_compression.is_none());
7502        assert_eq!(layout_v21.num_rep_values, 0);
7503        assert_eq!(layout_v21.num_def_values, 0);
7504
7505        let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
7506        let PageEncoding::Structural(layout_v22) = &page_v22.description else {
7507            panic!("Expected structural encoding");
7508        };
7509        let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
7510            panic!("Expected constant layout");
7511        };
7512        assert!(layout_v22.def_compression.is_some());
7513        assert!(layout_v22.num_def_values > 0);
7514    }
7515
7516    #[tokio::test]
7517    async fn test_complex_all_null_round_trip() {
7518        use arrow_array::ListArray;
7519
7520        let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7521            (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7522        );
7523
7524        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
7525        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
7526            .await;
7527    }
7528
7529    // https://github.com/lance-format/lance/issues/6681
7530    #[tokio::test]
7531    async fn test_sparse_boolean_list_roundtrip() {
7532        use arrow_array::builder::{BooleanBuilder, ListBuilder};
7533
7534        let mut list_builder = ListBuilder::new(BooleanBuilder::new());
7535        for i in 0..1000i32 {
7536            if i % 64 == 0 {
7537                // Alternate true/false so the array is not constant (constant path avoids the bug).
7538                list_builder.values().append_value(i % 128 == 0);
7539                list_builder.append(true);
7540            } else {
7541                list_builder.append(false);
7542            }
7543        }
7544        let list_array = Arc::new(list_builder.finish());
7545
7546        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
7547        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
7548    }
7549}