Skip to main content

lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use crate::{
16    constants::{
17        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18    },
19    data::DictionaryDataBlock,
20    encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21    format::{
22        ProtobufUtils21,
23        pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
24    },
25};
26use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
27use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
31use itertools::Itertools;
32use lance_arrow::DataTypeExt;
33use lance_arrow::deepcopy::deep_copy_nulls;
34use lance_core::{
35    cache::{CacheKey, Context, DeepSizeOf},
36    error::{Error, LanceOptionExt},
37    utils::bit::pad_bytes,
38};
39use log::trace;
40
41use crate::{
42    compression::{
43        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44    },
45    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46    utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50    encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56    encodings::logical::primitive::miniblock::MiniBlockCompressed,
57    statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60    repdef::{
61        CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
62        RepDefSlicer, build_control_word_iterator,
63    },
64    utils::accumulation::AccumulationQueue,
65};
66use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
67
68use crate::constants::{
69    COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
70    DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
71    DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
72    DICT_VALUES_COMPRESSION_META_KEY,
73};
74use crate::version::LanceFileVersion;
75use crate::{
76    EncodingsIo,
77    buffer::LanceBuffer,
78    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
79    decoder::{
80        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
81        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
82        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
83        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
84    },
85    encoder::{
86        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
87    },
88    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
89};
90
91pub mod blob;
92pub mod constant;
93pub mod dict;
94pub mod fullzip;
95pub mod miniblock;
96
97const FILL_BYTE: u8 = 0xFE;
98const DEFAULT_DICT_DIVISOR: u64 = 2;
99const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
100const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
101const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
102
103struct PageLoadTask {
104    decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
105    num_rows: u64,
106}
107
108/// A trait for figuring out how to schedule the data within
109/// a single page.
110trait StructuralPageScheduler: std::fmt::Debug + Send {
111    /// Fetches any metadata required for the page
112    fn initialize<'a>(
113        &'a mut self,
114        io: &Arc<dyn EncodingsIo>,
115    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
116    /// Loads metadata from a previous initialize call
117    fn load(&mut self, data: &Arc<dyn CachedPageData>);
118    /// Schedules the read of the given ranges in the page
119    ///
120    /// The read may be split into multiple "shards" if the page is extremely large.
121    /// Each shard maps to one or more rows and can be decoded independently.
122    ///
123    /// Note: this sharding is for splitting up very large pages into smaller reads to
124    /// avoid buffering too much data in memory.  It is not related to the batch size or
125    /// compute units in any way.
126    fn schedule_ranges(
127        &self,
128        ranges: &[Range<u64>],
129        io: &Arc<dyn EncodingsIo>,
130    ) -> Result<Vec<PageLoadTask>>;
131}
132
133/// Metadata describing the decoded size of a mini-block
134#[derive(Debug)]
135struct ChunkMeta {
136    num_values: u64,
137    chunk_size_bytes: u64,
138    offset_bytes: u64,
139}
140
141/// A mini-block chunk that has been decoded and decompressed
142#[derive(Debug, Clone)]
143struct DecodedMiniBlockChunk {
144    rep: Option<ScalarBuffer<u16>>,
145    def: Option<ScalarBuffer<u16>>,
146    values: DataBlock,
147}
148
149/// A task to decode a one or more mini-blocks of data into an output batch
150///
151/// Note: Two batches might share the same mini-block of data.  When this happens
152/// then each batch gets a copy of the block and each batch decodes the block independently.
153///
154/// This means we have duplicated work but it is necessary to avoid having to synchronize
155/// the decoding of the block. (TODO: test this theory)
156#[derive(Debug)]
157struct DecodeMiniBlockTask {
158    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
159    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
160    value_decompressor: Arc<dyn MiniBlockDecompressor>,
161    dictionary_data: Option<Arc<DataBlock>>,
162    def_meaning: Arc<[DefinitionInterpretation]>,
163    num_buffers: u64,
164    max_visible_level: u16,
165    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
166    has_large_chunk: bool,
167}
168
169impl DecodeMiniBlockTask {
170    fn decode_levels(
171        rep_decompressor: &dyn BlockDecompressor,
172        levels: LanceBuffer,
173        num_levels: u16,
174    ) -> Result<ScalarBuffer<u16>> {
175        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
176        let rep = rep.as_fixed_width().unwrap();
177        debug_assert_eq!(rep.num_values, num_levels as u64);
178        debug_assert_eq!(rep.bits_per_value, 16);
179        Ok(rep.data.borrow_to_typed_slice::<u16>())
180    }
181
182    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
183    // values from `level_buf` starting at `offset`.
184    //
185    // We need to handle both the case where `levels` is None (no nulls encountered
186    // yet) and the case where `level_buf` is None (the input we are copying from has
187    // no nulls)
188    fn extend_levels(
189        range: Range<u64>,
190        levels: &mut Option<LevelBuffer>,
191        level_buf: &Option<impl AsRef<[u16]>>,
192        dest_offset: usize,
193    ) {
194        if let Some(level_buf) = level_buf {
195            if levels.is_none() {
196                // This is the first non-empty def buf we've hit, fill in the past
197                // with 0 (valid)
198                let mut new_levels_vec =
199                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
200                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
201                *levels = Some(new_levels_vec);
202            }
203            levels.as_mut().unwrap().extend(
204                level_buf.as_ref()[range.start as usize..range.end as usize]
205                    .iter()
206                    .copied(),
207            );
208        } else if let Some(levels) = levels {
209            let num_values = (range.end - range.start) as usize;
210            // This is an all-valid level_buf but we had nulls earlier and so we
211            // need to materialize it
212            levels.extend(iter::repeat_n(0, num_values));
213        }
214    }
215
216    /// Maps a range of rows to a range of items and a range of levels
217    ///
218    /// If there is no repetition information this just returns the range as-is.
219    ///
220    /// If there is repetition information then we need to do some work to figure out what
221    /// range of items corresponds to the requested range of rows.
222    ///
223    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
224    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
225    ///
226    /// Idx: 0 1 2 3 4 5 6
227    /// Rep: 1 0 0 1 0 1 0
228    ///
229    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
230    ///
231    /// If there are invisible items then we don't count them when calculating the range of items we
232    /// are interested in but we do count them when calculating the range of levels we are interested
233    /// in.  As a result we have to return both the item range (first return value) and the level range
234    /// (second return value).
235    ///
236    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
237    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
238    ///
239    /// Idx: 0 1 2 3 4 5 6 7 8
240    /// Rep: 1 0 0 1 0 1 1 0 0
241    /// Def: 0 0 0 0 0 1 0 0 0
242    /// Itm: 1 2 3 4 5 6 7 8
243    ///
244    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
245    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
246    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
247    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
248    ///
249    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
250    ///
251    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
252    fn map_range(
253        range: Range<u64>,
254        rep: Option<&impl AsRef<[u16]>>,
255        def: Option<&impl AsRef<[u16]>>,
256        max_rep: u16,
257        max_visible_def: u16,
258        // The total number of items (not rows) in the chunk.  This is not quite the same as
259        // rep.len() / def.len() because it doesn't count invisible items
260        total_items: u64,
261        preamble_action: PreambleAction,
262    ) -> (Range<u64>, Range<u64>) {
263        if let Some(rep) = rep {
264            let mut rep = rep.as_ref();
265            // If there is a preamble and we need to skip it then do that first.  The work is the same
266            // whether there is def information or not
267            let mut items_in_preamble = 0_u64;
268            let first_row_start = match preamble_action {
269                PreambleAction::Skip | PreambleAction::Take => {
270                    let first_row_start = if let Some(def) = def.as_ref() {
271                        let mut first_row_start = None;
272                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
273                            if *rep == max_rep {
274                                first_row_start = Some(idx as u64);
275                                break;
276                            }
277                            if *def <= max_visible_def {
278                                items_in_preamble += 1;
279                            }
280                        }
281                        first_row_start
282                    } else {
283                        let first_row_start =
284                            rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
285                        items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
286                        first_row_start
287                    };
288                    // It is possible for a chunk to be entirely partial values but if it is then it
289                    // should never show up as a preamble to skip
290                    if first_row_start.is_none() {
291                        assert!(preamble_action == PreambleAction::Take);
292                        return (0..total_items, 0..rep.len() as u64);
293                    }
294                    let first_row_start = first_row_start.unwrap();
295                    rep = &rep[first_row_start as usize..];
296                    first_row_start
297                }
298                PreambleAction::Absent => {
299                    debug_assert!(rep[0] == max_rep);
300                    0
301                }
302            };
303
304            // We hit this case when all we needed was the preamble
305            if range.start == range.end {
306                debug_assert!(preamble_action == PreambleAction::Take);
307                debug_assert!(items_in_preamble <= total_items);
308                return (0..items_in_preamble, 0..first_row_start);
309            }
310            assert!(range.start < range.end);
311
312            let mut rows_seen = 0;
313            let mut new_start = 0;
314            let mut new_levels_start = 0;
315
316            if let Some(def) = def {
317                let def = &def.as_ref()[first_row_start as usize..];
318
319                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
320                let mut lead_invis_seen = 0;
321
322                if range.start > 0 {
323                    if def[0] > max_visible_def {
324                        lead_invis_seen += 1;
325                    }
326                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
327                        if *rep == max_rep {
328                            rows_seen += 1;
329                            if rows_seen == range.start {
330                                new_start = idx as u64 + 1 - lead_invis_seen;
331                                new_levels_start = idx as u64 + 1;
332                                break;
333                            }
334                        }
335                        if *def > max_visible_def {
336                            lead_invis_seen += 1;
337                        }
338                    }
339                }
340
341                rows_seen += 1;
342
343                let mut new_end = u64::MAX;
344                let mut new_levels_end = rep.len() as u64;
345                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
346                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
347                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
348                    .iter()
349                    .zip(&def[(new_levels_start + 1) as usize..])
350                    .enumerate()
351                {
352                    if *rep == max_rep {
353                        rows_seen += 1;
354                        if rows_seen == range.end + 1 {
355                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
356                            new_levels_end = idx as u64 + new_levels_start + 1;
357                            break;
358                        }
359                    }
360                    if *def > max_visible_def {
361                        tail_invis_seen += 1;
362                    }
363                }
364
365                if new_end == u64::MAX {
366                    new_levels_end = rep.len() as u64;
367                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
368                    new_end = rep.len() as u64 - total_invis_seen;
369                }
370
371                assert_ne!(new_end, u64::MAX);
372
373                // Adjust for any skipped preamble
374                if preamble_action == PreambleAction::Skip {
375                    new_start += items_in_preamble;
376                    new_end += items_in_preamble;
377                    new_levels_start += first_row_start;
378                    new_levels_end += first_row_start;
379                } else if preamble_action == PreambleAction::Take {
380                    debug_assert_eq!(new_start, 0);
381                    debug_assert_eq!(new_levels_start, 0);
382                    new_end += items_in_preamble;
383                    new_levels_end += first_row_start;
384                }
385
386                debug_assert!(new_end <= total_items);
387                (new_start..new_end, new_levels_start..new_levels_end)
388            } else {
389                // Easy case, there are no invisible items, so we don't need to check for them
390                // The items range and levels range will be the same.  We do still need to walk
391                // the rep levels to find the row boundaries
392
393                // range.start == 0 always maps to 0, otherwise we need to walk
394                if range.start > 0 {
395                    for (idx, rep) in rep.iter().skip(1).enumerate() {
396                        if *rep == max_rep {
397                            rows_seen += 1;
398                            if rows_seen == range.start {
399                                new_start = idx as u64 + 1;
400                                break;
401                            }
402                        }
403                    }
404                }
405                let mut new_end = rep.len() as u64;
406                // range.end == max_items always maps to rep.len(), otherwise we need to walk
407                if range.end < total_items {
408                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
409                        if *rep == max_rep {
410                            rows_seen += 1;
411                            if rows_seen == range.end {
412                                new_end = idx as u64 + new_start + 1;
413                                break;
414                            }
415                        }
416                    }
417                }
418
419                // Adjust for any skipped preamble
420                if preamble_action == PreambleAction::Skip {
421                    new_start += first_row_start;
422                    new_end += first_row_start;
423                } else if preamble_action == PreambleAction::Take {
424                    debug_assert_eq!(new_start, 0);
425                    new_end += first_row_start;
426                }
427
428                debug_assert!(new_end <= total_items);
429                (new_start..new_end, new_start..new_end)
430            }
431        } else {
432            // No repetition info, easy case, just use the range as-is and the item
433            // and level ranges are the same
434            (range.clone(), range)
435        }
436    }
437
438    // read `num_buffers` buffer sizes from `buf` starting at `offset`
439    fn read_buffer_sizes<const LARGE: bool>(
440        buf: &[u8],
441        offset: &mut usize,
442        num_buffers: u64,
443    ) -> Vec<u32> {
444        let read_size = if LARGE { 4 } else { 2 };
445        (0..num_buffers)
446            .map(|_| {
447                let bytes = &buf[*offset..*offset + read_size];
448                let size = if LARGE {
449                    u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
450                } else {
451                    // the buffer size is read from u16 but is stored as u32 after decoding for consistency
452                    u16::from_le_bytes([bytes[0], bytes[1]]) as u32
453                };
454                *offset += read_size;
455                size
456            })
457            .collect()
458    }
459
460    // Unserialize a miniblock into a collection of vectors
461    fn decode_miniblock_chunk(
462        &self,
463        buf: &LanceBuffer,
464        items_in_chunk: u64,
465    ) -> Result<DecodedMiniBlockChunk> {
466        let mut offset = 0;
467        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
468        offset += 2;
469
470        let rep_size = if self.rep_decompressor.is_some() {
471            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
472            offset += 2;
473            Some(rep_size)
474        } else {
475            None
476        };
477        let def_size = if self.def_decompressor.is_some() {
478            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
479            offset += 2;
480            Some(def_size)
481        } else {
482            None
483        };
484
485        let buffer_sizes = if self.has_large_chunk {
486            Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
487        } else {
488            Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
489        };
490
491        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
492
493        let rep = rep_size.map(|rep_size| {
494            let rep = buf.slice_with_length(offset, rep_size as usize);
495            offset += rep_size as usize;
496            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
497            rep
498        });
499
500        let def = def_size.map(|def_size| {
501            let def = buf.slice_with_length(offset, def_size as usize);
502            offset += def_size as usize;
503            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504            def
505        });
506
507        let buffers = buffer_sizes
508            .into_iter()
509            .map(|buf_size| {
510                let buf = buf.slice_with_length(offset, buf_size as usize);
511                offset += buf_size as usize;
512                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
513                buf
514            })
515            .collect::<Vec<_>>();
516
517        let values = self
518            .value_decompressor
519            .decompress(buffers, items_in_chunk)?;
520
521        let rep = rep
522            .map(|rep| {
523                Self::decode_levels(
524                    self.rep_decompressor.as_ref().unwrap().as_ref(),
525                    rep,
526                    num_levels,
527                )
528            })
529            .transpose()?;
530        let def = def
531            .map(|def| {
532                Self::decode_levels(
533                    self.def_decompressor.as_ref().unwrap().as_ref(),
534                    def,
535                    num_levels,
536                )
537            })
538            .transpose()?;
539
540        Ok(DecodedMiniBlockChunk { rep, def, values })
541    }
542}
543
544impl DecodePageTask for DecodeMiniBlockTask {
545    fn decode(self: Box<Self>) -> Result<DecodedPage> {
546        // First, we create output buffers for the rep and def and data
547        let mut repbuf: Option<LevelBuffer> = None;
548        let mut defbuf: Option<LevelBuffer> = None;
549
550        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
551
552        // This is probably an over-estimate but it's quick and easy to calculate
553        let estimated_size_bytes = self
554            .instructions
555            .iter()
556            .map(|(_, chunk)| chunk.data.len())
557            .sum::<usize>()
558            * 2;
559        let mut data_builder =
560            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
561
562        // We need to keep track of the offset into repbuf/defbuf that we are building up
563        let mut level_offset = 0;
564
565        // Pre-compute caching needs for each chunk by checking if the next chunk is the same
566        let needs_caching: Vec<bool> = self
567            .instructions
568            .windows(2)
569            .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
570            .chain(std::iter::once(false)) // the last one never needs caching
571            .collect();
572
573        // Cache for storing decoded chunks when beneficial
574        let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
575
576        // Now we iterate through each instruction and process it
577        for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
578            let should_cache_this_chunk = needs_caching[idx];
579
580            let decoded_chunk = match &chunk_cache {
581                Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
582                    // Clone only when we have a cache hit (much cheaper than decoding)
583                    cached_chunk.clone()
584                }
585                _ => {
586                    // Cache miss, need to decode
587                    let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
588
589                    // Only update cache if this chunk will benefit the next access
590                    if should_cache_this_chunk {
591                        chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
592                    }
593                    decoded
594                }
595            };
596
597            let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
598
599            // Our instructions tell us which rows we want to take from this chunk
600            let row_range_start =
601                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
602            let row_range_end = row_range_start + instructions.rows_to_take;
603
604            // We use the rep info to map the row range to an item range / levels range
605            let (item_range, level_range) = Self::map_range(
606                row_range_start..row_range_end,
607                rep.as_ref(),
608                def.as_ref(),
609                max_rep,
610                self.max_visible_level,
611                chunk.items_in_chunk,
612                instructions.preamble_action,
613            );
614            if item_range.end - item_range.start > chunk.items_in_chunk {
615                return Err(lance_core::Error::internal(format!(
616                    "Item range {:?} is greater than chunk items in chunk {:?}",
617                    item_range, chunk.items_in_chunk
618                )));
619            }
620
621            // Now we append the data to the output buffers
622            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
623            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
624            level_offset += (level_range.end - level_range.start) as usize;
625            data_builder.append(&values, item_range);
626        }
627
628        let mut data = data_builder.finish();
629
630        let unraveler =
631            RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
632
633        if let Some(dictionary) = &self.dictionary_data {
634            // Don't decode here, that happens later (if needed)
635            let DataBlock::FixedWidth(indices) = data else {
636                return Err(lance_core::Error::internal(format!(
637                    "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
638                    data
639                )));
640            };
641            data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
642                indices,
643                dictionary.as_ref().clone(),
644            ));
645        }
646
647        Ok(DecodedPage {
648            data,
649            repdef: unraveler,
650        })
651    }
652}
653
654/// A chunk that has been loaded by the miniblock scheduler (but not
655/// yet decoded)
656#[derive(Debug)]
657struct LoadedChunk {
658    data: LanceBuffer,
659    items_in_chunk: u64,
660    byte_range: Range<u64>,
661    chunk_idx: usize,
662}
663
664impl Clone for LoadedChunk {
665    fn clone(&self) -> Self {
666        Self {
667            // Safe as we always create borrowed buffers here
668            data: self.data.clone(),
669            items_in_chunk: self.items_in_chunk,
670            byte_range: self.byte_range.clone(),
671            chunk_idx: self.chunk_idx,
672        }
673    }
674}
675
676/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
677/// details on the different layouts.
678#[derive(Debug)]
679struct MiniBlockDecoder {
680    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
681    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
682    value_decompressor: Arc<dyn MiniBlockDecompressor>,
683    def_meaning: Arc<[DefinitionInterpretation]>,
684    loaded_chunks: VecDeque<LoadedChunk>,
685    instructions: VecDeque<ChunkInstructions>,
686    offset_in_current_chunk: u64,
687    num_rows: u64,
688    num_buffers: u64,
689    dictionary: Option<Arc<DataBlock>>,
690    has_large_chunk: bool,
691}
692
693/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
694/// process for miniblock encoded data.
695impl StructuralPageDecoder for MiniBlockDecoder {
696    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
697        let mut items_desired = num_rows;
698        let mut need_preamble = false;
699        let mut skip_in_chunk = self.offset_in_current_chunk;
700        let mut drain_instructions = Vec::new();
701        while items_desired > 0 || need_preamble {
702            let (instructions, consumed) = self
703                .instructions
704                .front()
705                .unwrap()
706                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
707
708            while self.loaded_chunks.front().unwrap().chunk_idx
709                != instructions.chunk_instructions.chunk_idx
710            {
711                self.loaded_chunks.pop_front();
712            }
713            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
714            if consumed {
715                self.instructions.pop_front();
716            }
717        }
718        // We can throw away need_preamble here because it must be false.  If it were true it would mean
719        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
720        self.offset_in_current_chunk = skip_in_chunk;
721
722        let max_visible_level = self
723            .def_meaning
724            .iter()
725            .take_while(|l| !l.is_list())
726            .map(|l| l.num_def_levels())
727            .sum::<u16>();
728
729        Ok(Box::new(DecodeMiniBlockTask {
730            instructions: drain_instructions,
731            def_decompressor: self.def_decompressor.clone(),
732            rep_decompressor: self.rep_decompressor.clone(),
733            value_decompressor: self.value_decompressor.clone(),
734            dictionary_data: self.dictionary.clone(),
735            def_meaning: self.def_meaning.clone(),
736            num_buffers: self.num_buffers,
737            max_visible_level,
738            has_large_chunk: self.has_large_chunk,
739        }))
740    }
741
742    fn num_rows(&self) -> u64 {
743        self.num_rows
744    }
745}
746
747#[derive(Debug)]
748struct CachedComplexAllNullState {
749    rep: Option<ScalarBuffer<u16>>,
750    def: Option<ScalarBuffer<u16>>,
751}
752
753impl DeepSizeOf for CachedComplexAllNullState {
754    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
755        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
757    }
758}
759
760impl CachedPageData for CachedComplexAllNullState {
761    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
762        self
763    }
764}
765
766/// A scheduler for all-null data that has repetition and definition levels
767///
768/// We still need to do some I/O in this case because we need to figure out what kind of null we
769/// are dealing with (null list, null struct, what level null struct, etc.)
770///
771/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
772/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
773/// and fast so...maybe not :)
774#[derive(Debug)]
775pub struct ComplexAllNullScheduler {
776    // Set from protobuf
777    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
778    def_meaning: Arc<[DefinitionInterpretation]>,
779    repdef: Option<Arc<CachedComplexAllNullState>>,
780    max_rep: u16,
781    max_visible_level: u16,
782    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
783    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
784    num_rep_values: u64,
785    num_def_values: u64,
786}
787
788impl ComplexAllNullScheduler {
789    pub fn new(
790        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
791        def_meaning: Arc<[DefinitionInterpretation]>,
792        rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
793        def_decompressor: Option<Arc<dyn BlockDecompressor>>,
794        num_rep_values: u64,
795        num_def_values: u64,
796    ) -> Self {
797        let max_rep = def_meaning.iter().filter(|l| l.is_list()).count() as u16;
798        let max_visible_level = def_meaning
799            .iter()
800            .take_while(|l| !l.is_list())
801            .map(|l| l.num_def_levels())
802            .sum::<u16>();
803        Self {
804            buffer_offsets_and_sizes,
805            def_meaning,
806            repdef: None,
807            max_rep,
808            max_visible_level,
809            rep_decompressor,
810            def_decompressor,
811            num_rep_values,
812            num_def_values,
813        }
814    }
815}
816
817impl StructuralPageScheduler for ComplexAllNullScheduler {
818    fn initialize<'a>(
819        &'a mut self,
820        io: &Arc<dyn EncodingsIo>,
821    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
822        // Fully load the rep & def buffers, as needed
823        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
824        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
825        let has_rep = rep_size > 0;
826        let has_def = def_size > 0;
827
828        let mut reads = Vec::with_capacity(2);
829        if has_rep {
830            reads.push(rep_pos..rep_pos + rep_size);
831        }
832        if has_def {
833            reads.push(def_pos..def_pos + def_size);
834        }
835
836        let data = io.submit_request(reads, 0);
837        let rep_decompressor = self.rep_decompressor.clone();
838        let def_decompressor = self.def_decompressor.clone();
839        let num_rep_values = self.num_rep_values;
840        let num_def_values = self.num_def_values;
841
842        async move {
843            let data = data.await?;
844            let mut data_iter = data.into_iter();
845
846            let decompress_levels = |compressed_bytes: Bytes,
847                                     decompressor: &Arc<dyn BlockDecompressor>,
848                                     num_values: u64,
849                                     level_type: &str|
850             -> Result<ScalarBuffer<u16>> {
851                let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
852                let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
853                match decompressed {
854                    DataBlock::FixedWidth(block) => {
855                        if block.num_values != num_values {
856                            return Err(Error::invalid_input_source(format!(
857                                "Unexpected {} level count after decompression: expected {}, got {}",
858                                level_type, num_values, block.num_values
859                            )
860                            .into()));
861                        }
862                        if block.bits_per_value != 16 {
863                            return Err(Error::invalid_input_source(format!(
864                                "Unexpected {} level bit width after decompression: expected 16, got {}",
865                                level_type, block.bits_per_value
866                            )
867                            .into()));
868                        }
869                        Ok(block.data.borrow_to_typed_slice::<u16>())
870                    }
871                    _ => Err(Error::invalid_input_source(format!(
872                        "Expected fixed-width data block for {} levels",
873                        level_type
874                    )
875                    .into())),
876                }
877            };
878
879            let rep = if has_rep {
880                let rep = data_iter.next().unwrap();
881                if let Some(rep_decompressor) = rep_decompressor.as_ref() {
882                    Some(decompress_levels(
883                        rep,
884                        rep_decompressor,
885                        num_rep_values,
886                        "repetition",
887                    )?)
888                } else {
889                    let rep = LanceBuffer::from_bytes(rep, 2);
890                    let rep = rep.borrow_to_typed_slice::<u16>();
891                    Some(rep)
892                }
893            } else {
894                None
895            };
896
897            let def = if has_def {
898                let def = data_iter.next().unwrap();
899                if let Some(def_decompressor) = def_decompressor.as_ref() {
900                    Some(decompress_levels(
901                        def,
902                        def_decompressor,
903                        num_def_values,
904                        "definition",
905                    )?)
906                } else {
907                    let def = LanceBuffer::from_bytes(def, 2);
908                    let def = def.borrow_to_typed_slice::<u16>();
909                    Some(def)
910                }
911            } else {
912                None
913            };
914
915            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
916
917            self.repdef = Some(repdef.clone());
918
919            Ok(repdef as Arc<dyn CachedPageData>)
920        }
921        .boxed()
922    }
923
924    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
925        self.repdef = Some(
926            data.clone()
927                .as_arc_any()
928                .downcast::<CachedComplexAllNullState>()
929                .unwrap(),
930        );
931    }
932
933    fn schedule_ranges(
934        &self,
935        ranges: &[Range<u64>],
936        _io: &Arc<dyn EncodingsIo>,
937    ) -> Result<Vec<PageLoadTask>> {
938        let ranges = VecDeque::from_iter(ranges.iter().cloned());
939        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
940        let decoder = Box::new(ComplexAllNullPageDecoder {
941            ranges,
942            rep: self.repdef.as_ref().unwrap().rep.clone(),
943            def: self.repdef.as_ref().unwrap().def.clone(),
944            num_rows,
945            def_meaning: self.def_meaning.clone(),
946            max_rep: self.max_rep,
947            max_visible_level: self.max_visible_level,
948            cursor_row: 0,
949            cursor_level: 0,
950        }) as Box<dyn StructuralPageDecoder>;
951        let page_load_task = PageLoadTask {
952            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
953            num_rows,
954        };
955        Ok(vec![page_load_task])
956    }
957}
958
959#[derive(Debug)]
960pub struct ComplexAllNullPageDecoder {
961    ranges: VecDeque<Range<u64>>,
962    rep: Option<ScalarBuffer<u16>>,
963    def: Option<ScalarBuffer<u16>>,
964    num_rows: u64,
965    def_meaning: Arc<[DefinitionInterpretation]>,
966    max_rep: u16,
967    max_visible_level: u16,
968    cursor_row: u64,
969    cursor_level: usize,
970}
971
972impl ComplexAllNullPageDecoder {
973    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
974        let mut rows_desired = num_rows;
975        let mut ranges = Vec::with_capacity(self.ranges.len());
976        while rows_desired > 0 {
977            let front = self.ranges.front_mut().unwrap();
978            let avail = front.end - front.start;
979            if avail > rows_desired {
980                ranges.push(front.start..front.start + rows_desired);
981                front.start += rows_desired;
982                rows_desired = 0;
983            } else {
984                ranges.push(self.ranges.pop_front().unwrap());
985                rows_desired -= avail;
986            }
987        }
988        ranges
989    }
990
991    fn take_row(&mut self) -> Result<(Range<usize>, u64)> {
992        let start = self.cursor_level;
993        let end = if let Some(rep) = &self.rep {
994            if start >= rep.len() {
995                return Err(Error::internal(
996                    "Invalid complex all-null layout: repetition buffer too short",
997                ));
998            }
999            if rep[start] != self.max_rep {
1000                return Err(Error::internal(
1001                    "Invalid complex all-null layout: row did not start at max repetition level",
1002                ));
1003            }
1004            let mut end = start + 1;
1005            while end < rep.len() && rep[end] != self.max_rep {
1006                end += 1;
1007            }
1008            end
1009        } else {
1010            start + 1
1011        };
1012
1013        let visible = if let Some(def) = &self.def {
1014            if end > def.len() {
1015                return Err(Error::internal(
1016                    "Invalid complex all-null layout: definition buffer too short",
1017                ));
1018            }
1019            def[start..end]
1020                .iter()
1021                .filter(|d| **d <= self.max_visible_level)
1022                .count() as u64
1023        } else {
1024            (end - start) as u64
1025        };
1026
1027        self.cursor_level = end;
1028        self.cursor_row += 1;
1029        Ok((start..end, visible))
1030    }
1031
1032    fn skip_to_row(&mut self, target_row: u64) -> Result<()> {
1033        while self.cursor_row < target_row {
1034            self.take_row()?;
1035        }
1036        Ok(())
1037    }
1038}
1039
1040impl StructuralPageDecoder for ComplexAllNullPageDecoder {
1041    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1042        let drained_ranges = self.drain_ranges(num_rows);
1043        let mut level_slices: Vec<Range<usize>> = Vec::new();
1044        let mut visible_items_total = 0;
1045
1046        for range in drained_ranges {
1047            self.skip_to_row(range.start)?;
1048            for _ in range.start..range.end {
1049                let (level_range, visible) = self.take_row()?;
1050                visible_items_total += visible;
1051                if let Some(last) = level_slices.last_mut()
1052                    && last.end == level_range.start
1053                {
1054                    last.end = level_range.end;
1055                    continue;
1056                }
1057                level_slices.push(level_range);
1058            }
1059        }
1060
1061        Ok(Box::new(DecodeComplexAllNullTask {
1062            level_slices,
1063            visible_items_total,
1064            rep: self.rep.clone(),
1065            def: self.def.clone(),
1066            def_meaning: self.def_meaning.clone(),
1067            max_visible_level: self.max_visible_level,
1068        }))
1069    }
1070
1071    fn num_rows(&self) -> u64 {
1072        self.num_rows
1073    }
1074}
1075
1076/// We use `level_slices` to slice into `rep` and `def` and create rep/def buffers
1077/// for the null data.
1078#[derive(Debug)]
1079pub struct DecodeComplexAllNullTask {
1080    level_slices: Vec<Range<usize>>,
1081    visible_items_total: u64,
1082    rep: Option<ScalarBuffer<u16>>,
1083    def: Option<ScalarBuffer<u16>>,
1084    def_meaning: Arc<[DefinitionInterpretation]>,
1085    max_visible_level: u16,
1086}
1087
1088impl DecodeComplexAllNullTask {
1089    fn decode_level(&self, levels: &Option<ScalarBuffer<u16>>) -> Option<Vec<u16>> {
1090        levels.as_ref().map(|levels| {
1091            let num_levels = self
1092                .level_slices
1093                .iter()
1094                .map(|range| range.end - range.start)
1095                .sum();
1096            let mut referenced_levels = Vec::with_capacity(num_levels);
1097            for range in &self.level_slices {
1098                referenced_levels.extend(levels[range.start..range.end].iter().copied());
1099            }
1100            referenced_levels
1101        })
1102    }
1103}
1104
1105impl DecodePageTask for DecodeComplexAllNullTask {
1106    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1107        let rep = self.decode_level(&self.rep);
1108        let def = self.decode_level(&self.def);
1109
1110        // If there are definition levels there may be empty / null lists which are not visible
1111        // in the items array.  We need to account for that here to figure out how many values
1112        // should be in the items array.
1113        let num_values = if let Some(def) = &def {
1114            def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
1115        } else {
1116            self.visible_items_total
1117        };
1118
1119        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1120        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
1121        Ok(DecodedPage {
1122            data,
1123            repdef: unraveler,
1124        })
1125    }
1126}
1127
1128/// A scheduler for simple all-null data
1129///
1130/// "simple" all-null data is data that is all null and only has a single level of definition and
1131/// no repetition.  We don't need to read any data at all in this case.
1132#[derive(Debug, Default)]
1133pub struct SimpleAllNullScheduler {}
1134
1135impl StructuralPageScheduler for SimpleAllNullScheduler {
1136    fn initialize<'a>(
1137        &'a mut self,
1138        _io: &Arc<dyn EncodingsIo>,
1139    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1140        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1141    }
1142
1143    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1144
1145    fn schedule_ranges(
1146        &self,
1147        ranges: &[Range<u64>],
1148        _io: &Arc<dyn EncodingsIo>,
1149    ) -> Result<Vec<PageLoadTask>> {
1150        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1151        let decoder =
1152            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1153        let page_load_task = PageLoadTask {
1154            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1155            num_rows,
1156        };
1157        Ok(vec![page_load_task])
1158    }
1159}
1160
1161/// A page decode task for all-null data without any
1162/// repetition and only a single level of definition
1163#[derive(Debug)]
1164struct SimpleAllNullDecodePageTask {
1165    num_values: u64,
1166}
1167impl DecodePageTask for SimpleAllNullDecodePageTask {
1168    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1169        let unraveler = RepDefUnraveler::new(
1170            None,
1171            Some(vec![1; self.num_values as usize]),
1172            Arc::new([DefinitionInterpretation::NullableItem]),
1173            self.num_values,
1174        );
1175        Ok(DecodedPage {
1176            data: DataBlock::AllNull(AllNullDataBlock {
1177                num_values: self.num_values,
1178            }),
1179            repdef: unraveler,
1180        })
1181    }
1182}
1183
1184#[derive(Debug)]
1185pub struct SimpleAllNullPageDecoder {
1186    num_rows: u64,
1187}
1188
1189impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1190    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1191        Ok(Box::new(SimpleAllNullDecodePageTask {
1192            num_values: num_rows,
1193        }))
1194    }
1195
1196    fn num_rows(&self) -> u64 {
1197        self.num_rows
1198    }
1199}
1200
1201#[derive(Debug, Clone)]
1202struct MiniBlockSchedulerDictionary {
1203    // These come from the protobuf
1204    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1205    dictionary_buf_position_and_size: (u64, u64),
1206    dictionary_data_alignment: u64,
1207    num_dictionary_items: u64,
1208}
1209
1210/// Individual block metadata within a MiniBlock repetition index.
1211#[derive(Debug)]
1212struct MiniBlockRepIndexBlock {
1213    // The index of the first row that starts after the beginning of this block.  If the block
1214    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1215    // then this will be a row that starts in some future block.
1216    first_row: u64,
1217    // The number of rows in the block, including the trailer but not the preamble.
1218    // Can be 0 if the block is entirely preamble
1219    starts_including_trailer: u64,
1220    // Whether the block has a preamble
1221    has_preamble: bool,
1222    // Whether the block has a trailer
1223    has_trailer: bool,
1224}
1225
1226impl DeepSizeOf for MiniBlockRepIndexBlock {
1227    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1228        0
1229    }
1230}
1231
1232/// Repetition index for MiniBlock encoding.
1233///
1234/// Stores block-level offset information to enable efficient random
1235/// access to nested data structures within mini-blocks.
1236#[derive(Debug)]
1237struct MiniBlockRepIndex {
1238    blocks: Vec<MiniBlockRepIndexBlock>,
1239}
1240
1241impl DeepSizeOf for MiniBlockRepIndex {
1242    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1243        self.blocks.deep_size_of_children(context)
1244    }
1245}
1246
1247impl MiniBlockRepIndex {
1248    /// Decode repetition index from chunk metadata using default values.
1249    ///
1250    /// This creates a repetition index where each chunk has no partial values
1251    /// and no trailers, suitable for simple sequential data layouts.
1252    pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1253        let mut blocks = Vec::with_capacity(chunks.len());
1254        let mut offset: u64 = 0;
1255
1256        for c in chunks {
1257            blocks.push(MiniBlockRepIndexBlock {
1258                first_row: offset,
1259                starts_including_trailer: c.num_values,
1260                has_preamble: false,
1261                has_trailer: false,
1262            });
1263
1264            offset += c.num_values;
1265        }
1266
1267        Self { blocks }
1268    }
1269
1270    /// Decode repetition index from raw bytes in little-endian format.
1271    ///
1272    /// The bytes should contain u64 values arranged in groups of `stride` elements,
1273    /// where the first two values of each group represent ends_count and partial_count.
1274    /// Returns an empty index if no bytes are provided.
1275    pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1276        // Convert bytes to u64 slice, handling alignment automatically
1277        let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1278        let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1279        let n = u64_slice.len() / stride;
1280
1281        let mut blocks = Vec::with_capacity(n);
1282        let mut chunk_has_preamble = false;
1283        let mut offset: u64 = 0;
1284
1285        // Extract first two values from each block: ends_count and partial_count
1286        for i in 0..n {
1287            let base_idx = i * stride;
1288            let ends = u64_slice[base_idx];
1289            let partial = u64_slice[base_idx + 1];
1290
1291            let has_trailer = partial > 0;
1292            // Convert branches to arithmetic for better compiler optimization
1293            let starts_including_trailer =
1294                ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1295
1296            blocks.push(MiniBlockRepIndexBlock {
1297                first_row: offset,
1298                starts_including_trailer,
1299                has_preamble: chunk_has_preamble,
1300                has_trailer,
1301            });
1302
1303            chunk_has_preamble = has_trailer;
1304            offset += starts_including_trailer;
1305        }
1306
1307        Self { blocks }
1308    }
1309}
1310
1311/// State that is loaded once and cached for future lookups
1312#[derive(Debug)]
1313struct MiniBlockCacheableState {
1314    /// Metadata that describes each chunk in the page
1315    chunk_meta: Vec<ChunkMeta>,
1316    /// The decoded repetition index
1317    rep_index: MiniBlockRepIndex,
1318    /// The dictionary for the page, if any
1319    dictionary: Option<Arc<DataBlock>>,
1320}
1321
1322impl DeepSizeOf for MiniBlockCacheableState {
1323    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1324        self.rep_index.deep_size_of_children(context)
1325            + self
1326                .dictionary
1327                .as_ref()
1328                .map(|dict| dict.data_size() as usize)
1329                .unwrap_or(0)
1330    }
1331}
1332
1333impl CachedPageData for MiniBlockCacheableState {
1334    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1335        self
1336    }
1337}
1338
1339/// A scheduler for a page that has been encoded with the mini-block layout
1340///
1341/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1342/// in practice.
1343///
1344/// First, during initialization, we load the chunk metadata, the repetition index,
1345/// and the dictionary (these last two may not be present)
1346///
1347/// Then, during scheduling, we use the user's requested row ranges and the repetition
1348/// index to determine which chunks we need and which rows we need from those chunks.
1349///
1350/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1351/// from the user is 40..60 then we need to:
1352///
1353///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1354///    then read 3 items for the 11th row of our range.
1355///  - Read the second chunk and read the remaining items in our 11th row and then read
1356///    the remaining 9 full rows.
1357///
1358/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1359/// The first two decode tasks will just need the first chunk.  The third decode task will
1360/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1361/// chunk.  The final decode task will just need the second chunk.
1362///
1363/// The above prose descriptions are what are represented by `ChunkInstructions` and
1364/// `ChunkDrainInstructions`.
1365#[derive(Debug)]
1366pub struct MiniBlockScheduler {
1367    // These come from the protobuf
1368    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1369    priority: u64,
1370    items_in_page: u64,
1371    repetition_index_depth: u16,
1372    num_buffers: u64,
1373    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1374    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1375    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1376    def_meaning: Arc<[DefinitionInterpretation]>,
1377    dictionary: Option<MiniBlockSchedulerDictionary>,
1378    // This is set after initialization
1379    page_meta: Option<Arc<MiniBlockCacheableState>>,
1380    has_large_chunk: bool,
1381}
1382
1383impl MiniBlockScheduler {
1384    fn try_new(
1385        buffer_offsets_and_sizes: &[(u64, u64)],
1386        priority: u64,
1387        items_in_page: u64,
1388        layout: &pb21::MiniBlockLayout,
1389        decompressors: &dyn DecompressionStrategy,
1390    ) -> Result<Self> {
1391        let rep_decompressor = layout
1392            .rep_compression
1393            .as_ref()
1394            .map(|rep_compression| {
1395                decompressors
1396                    .create_block_decompressor(rep_compression)
1397                    .map(Arc::from)
1398            })
1399            .transpose()?;
1400        let def_decompressor = layout
1401            .def_compression
1402            .as_ref()
1403            .map(|def_compression| {
1404                decompressors
1405                    .create_block_decompressor(def_compression)
1406                    .map(Arc::from)
1407            })
1408            .transpose()?;
1409        let def_meaning = layout
1410            .layers
1411            .iter()
1412            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1413            .collect::<Vec<_>>();
1414        let value_decompressor = decompressors.create_miniblock_decompressor(
1415            layout.value_compression.as_ref().unwrap(),
1416            decompressors,
1417        )?;
1418
1419        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1420            let num_dictionary_items = layout.num_dictionary_items;
1421            let dictionary_decompressor = decompressors
1422                .create_block_decompressor(dictionary_encoding)?
1423                .into();
1424            let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
1425            {
1426                Compression::Variable(_) => 4,
1427                Compression::Flat(_) => 16,
1428                Compression::General(_) => 1,
1429                Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
1430                    crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
1431                }
1432                _ => {
1433                    return Err(Error::invalid_input_source(
1434                        format!(
1435                            "Unsupported mini-block dictionary encoding: {:?}",
1436                            dictionary_encoding.compression.as_ref().unwrap()
1437                        )
1438                        .into(),
1439                    ));
1440                }
1441            };
1442            Some(MiniBlockSchedulerDictionary {
1443                dictionary_decompressor,
1444                dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1445                dictionary_data_alignment,
1446                num_dictionary_items,
1447            })
1448        } else {
1449            None
1450        };
1451
1452        Ok(Self {
1453            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1454            rep_decompressor,
1455            def_decompressor,
1456            value_decompressor: value_decompressor.into(),
1457            repetition_index_depth: layout.repetition_index_depth as u16,
1458            num_buffers: layout.num_buffers,
1459            priority,
1460            items_in_page,
1461            dictionary,
1462            def_meaning: def_meaning.into(),
1463            page_meta: None,
1464            has_large_chunk: layout.has_large_chunk,
1465        })
1466    }
1467
1468    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1469        let page_meta = self.page_meta.as_ref().unwrap();
1470        chunk_indices
1471            .iter()
1472            .map(|&chunk_idx| {
1473                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1474                let bytes_start = chunk_meta.offset_bytes;
1475                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1476                LoadedChunk {
1477                    byte_range: bytes_start..bytes_end,
1478                    items_in_chunk: chunk_meta.num_values,
1479                    chunk_idx,
1480                    data: LanceBuffer::empty(),
1481                }
1482            })
1483            .collect()
1484    }
1485}
1486
1487#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1488enum PreambleAction {
1489    Take,
1490    Skip,
1491    Absent,
1492}
1493
1494// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1495// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1496// us how exactly to read from the chunk.
1497//
1498// Examples:
1499//
1500// | Chunk 0     | Chunk 1   | Chunk 2   | Chunk 3 |
1501// | xxxxyyyyzzz | zzzzzzzzz | zzzzzzzzz | aaabbcc |
1502//
1503// Full read (0..6)
1504//
1505// Chunk 0: (several rows, ends with trailer)
1506//   preamble: absent
1507//   rows_to_skip: 0
1508//   rows_to_take: 3 (x, y, z)
1509//   take_trailer: true
1510//
1511// Chunk 1: (all preamble, ends with trailer)
1512//   preamble: take
1513//   rows_to_skip: 0
1514//   rows_to_take: 0
1515//   take_trailer: true
1516//
1517// Chunk 2: (all preamble, no trailer)
1518//   preamble: take
1519//   rows_to_skip: 0
1520//   rows_to_take: 0
1521//   take_trailer: false
1522//
1523// Chunk 3: (several rows, no trailer or preamble)
1524//   preamble: absent
1525//   rows_to_skip: 0
1526//   rows_to_take: 3 (a, b, c)
1527//   take_trailer: false
1528#[derive(Clone, Debug, PartialEq, Eq)]
1529struct ChunkInstructions {
1530    // The index of the chunk to read
1531    chunk_idx: usize,
1532    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1533    // is no repetition index there is never a preamble.
1534    //
1535    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1536    // that spans several chunks.
1537    preamble: PreambleAction,
1538    // How many complete rows (not including the preamble or trailer) to skip
1539    //
1540    // If this is non-zero then premable must not be Take
1541    rows_to_skip: u64,
1542    // How many rows to take.  If a row splits across chunks then we will count the row in the first
1543    // chunk that contains the row.
1544    rows_to_take: u64,
1545    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1546    // never a trailer.
1547    //
1548    // A chunk that is all preamble may or may not have a trailer.
1549    //
1550    // If this is true then we want to include the trailer
1551    take_trailer: bool,
1552}
1553
1554// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1555// start decoding them, based on a batch size, which might not align with what we scheduled.
1556//
1557// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1558//
1559// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1560// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1561// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1562// then take 10 rows.
1563//
1564// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1565//  -no preamble, skip 5, take 10, take trailer
1566//  -take preamble, skip 0, take 50, no trailer
1567//
1568// and we are draining 20 rows then the drain instructions for the first batch will be:
1569//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1570//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1571#[derive(Debug, PartialEq, Eq)]
1572struct ChunkDrainInstructions {
1573    chunk_instructions: ChunkInstructions,
1574    rows_to_skip: u64,
1575    rows_to_take: u64,
1576    preamble_action: PreambleAction,
1577}
1578
1579impl ChunkInstructions {
1580    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1581    //
1582    // We assume that `user_ranges` are in sorted order and non-overlapping
1583    //
1584    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1585    fn schedule_instructions(
1586        rep_index: &MiniBlockRepIndex,
1587        user_ranges: &[Range<u64>],
1588    ) -> Vec<Self> {
1589        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1590        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1591        // per row which can happen with lists.
1592        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1593
1594        for user_range in user_ranges {
1595            let mut rows_needed = user_range.end - user_range.start;
1596            let mut need_preamble = false;
1597
1598            // Need to find the first chunk with a first row >= user_range.start.  If there are
1599            // multiple chunks with the same first row we need to take the first one.
1600            let mut block_index = match rep_index
1601                .blocks
1602                .binary_search_by_key(&user_range.start, |block| block.first_row)
1603            {
1604                Ok(idx) => {
1605                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1606                    // are grabbing first eligible chunk
1607                    let mut idx = idx;
1608                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1609                        idx -= 1;
1610                    }
1611                    idx
1612                }
1613                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1614                Err(idx) => idx - 1,
1615            };
1616
1617            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1618
1619            while rows_needed > 0 || need_preamble {
1620                // Check if we've gone past the last block (should not happen)
1621                if block_index >= rep_index.blocks.len() {
1622                    log::warn!(
1623                        "schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
1624                    );
1625                    break;
1626                }
1627
1628                let chunk = &rep_index.blocks[block_index];
1629                let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1630
1631                // Handle blocks that are entirely preamble (rows_avail = 0)
1632                // These blocks have no rows to take but may have a preamble we need
1633                // We only look for preamble if to_skip == 0 (we're not skipping rows)
1634                if rows_avail == 0 && to_skip == 0 {
1635                    // Only process if this chunk has a preamble we need
1636                    if chunk.has_preamble && need_preamble {
1637                        chunk_instructions.push(Self {
1638                            chunk_idx: block_index,
1639                            preamble: PreambleAction::Take,
1640                            rows_to_skip: 0,
1641                            rows_to_take: 0,
1642                            // We still need to look at has_trailer to distinguish between "all preamble
1643                            // and row ends at end of chunk" and "all preamble and row bleeds into next
1644                            // chunk".  Both cases will have 0 rows available.
1645                            take_trailer: chunk.has_trailer,
1646                        });
1647                        // Only set need_preamble = false if the chunk has at least one row,
1648                        // Or we are reaching the last block,
1649                        // Otherwise, the chunk is entirely preamble and we need the next chunk's preamble too
1650                        if chunk.starts_including_trailer > 0
1651                            || block_index == rep_index.blocks.len() - 1
1652                        {
1653                            need_preamble = false;
1654                        }
1655                    }
1656                    // Move to next block
1657                    block_index += 1;
1658                    continue;
1659                }
1660
1661                // Edge case: if rows_avail == 0 but to_skip > 0
1662                // This theoretically shouldn't happen (binary search should avoid it)
1663                // but handle it for safety
1664                if rows_avail == 0 && to_skip > 0 {
1665                    // This block doesn't have enough rows to skip, move to next block
1666                    // Adjust to_skip by the number of rows in this block
1667                    to_skip -= chunk.starts_including_trailer;
1668                    block_index += 1;
1669                    continue;
1670                }
1671
1672                let rows_to_take = rows_avail.min(rows_needed);
1673                rows_needed -= rows_to_take;
1674
1675                let mut take_trailer = false;
1676                let preamble = if chunk.has_preamble {
1677                    if need_preamble {
1678                        PreambleAction::Take
1679                    } else {
1680                        PreambleAction::Skip
1681                    }
1682                } else {
1683                    PreambleAction::Absent
1684                };
1685
1686                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1687                if rows_to_take == rows_avail && chunk.has_trailer {
1688                    take_trailer = true;
1689                    need_preamble = true;
1690                } else {
1691                    need_preamble = false;
1692                };
1693
1694                chunk_instructions.push(Self {
1695                    preamble,
1696                    chunk_idx: block_index,
1697                    rows_to_skip: to_skip,
1698                    rows_to_take,
1699                    take_trailer,
1700                });
1701
1702                to_skip = 0;
1703                block_index += 1;
1704            }
1705        }
1706
1707        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1708        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1709        // rows of chunk 0 starting at 0")
1710        if user_ranges.len() > 1 {
1711            // TODO: Could probably optimize this allocation away
1712            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1713            let mut instructions_iter = chunk_instructions.into_iter();
1714            merged_instructions.push(instructions_iter.next().unwrap());
1715            for instruction in instructions_iter {
1716                let last = merged_instructions.last_mut().unwrap();
1717                if last.chunk_idx == instruction.chunk_idx
1718                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1719                {
1720                    last.rows_to_take += instruction.rows_to_take;
1721                    last.take_trailer |= instruction.take_trailer;
1722                } else {
1723                    merged_instructions.push(instruction);
1724                }
1725            }
1726            merged_instructions
1727        } else {
1728            chunk_instructions
1729        }
1730    }
1731
1732    fn drain_from_instruction(
1733        &self,
1734        rows_desired: &mut u64,
1735        need_preamble: &mut bool,
1736        skip_in_chunk: &mut u64,
1737    ) -> (ChunkDrainInstructions, bool) {
1738        // If we need the premable then we shouldn't be skipping anything
1739        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1740        let rows_avail = self.rows_to_take - *skip_in_chunk;
1741        let has_preamble = self.preamble != PreambleAction::Absent;
1742        let preamble_action = match (*need_preamble, has_preamble) {
1743            (true, true) => PreambleAction::Take,
1744            (true, false) => panic!("Need preamble but there isn't one"),
1745            (false, true) => PreambleAction::Skip,
1746            (false, false) => PreambleAction::Absent,
1747        };
1748
1749        // How many rows are we actually taking in this take step (including the preamble
1750        // and trailer both as individual rows)
1751        let rows_taking = if *rows_desired >= rows_avail {
1752            // We want all the rows.  If there is a trailer we are grabbing it and will need
1753            // the preamble of the next chunk
1754            // If there is a trailer and we are taking all the rows then we need the preamble
1755            // of the next chunk.
1756            //
1757            // Also, if this chunk is entirely preamble (rows_avail == 0 && !take_trailer) then we
1758            // need the preamble of the next chunk.
1759            *need_preamble = self.take_trailer;
1760            rows_avail
1761        } else {
1762            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1763            // it so we will not need the preamble
1764            *need_preamble = false;
1765            *rows_desired
1766        };
1767        let rows_skipped = *skip_in_chunk;
1768
1769        // Update the state for the next iteration
1770        let consumed_chunk = if *rows_desired >= rows_avail {
1771            *rows_desired -= rows_avail;
1772            *skip_in_chunk = 0;
1773            true
1774        } else {
1775            *skip_in_chunk += *rows_desired;
1776            *rows_desired = 0;
1777            false
1778        };
1779
1780        (
1781            ChunkDrainInstructions {
1782                chunk_instructions: self.clone(),
1783                rows_to_skip: rows_skipped,
1784                rows_to_take: rows_taking,
1785                preamble_action,
1786            },
1787            consumed_chunk,
1788        )
1789    }
1790}
1791
1792enum Words {
1793    U16(ScalarBuffer<u16>),
1794    U32(ScalarBuffer<u32>),
1795}
1796
1797struct WordsIter<'a> {
1798    iter: Box<dyn Iterator<Item = u32> + 'a>,
1799}
1800
1801impl Words {
1802    pub fn len(&self) -> usize {
1803        match self {
1804            Self::U16(b) => b.len(),
1805            Self::U32(b) => b.len(),
1806        }
1807    }
1808
1809    pub fn iter(&self) -> WordsIter<'_> {
1810        match self {
1811            Self::U16(buf) => WordsIter {
1812                iter: Box::new(buf.iter().map(|&x| x as u32)),
1813            },
1814            Self::U32(buf) => WordsIter {
1815                iter: Box::new(buf.iter().copied()),
1816            },
1817        }
1818    }
1819
1820    pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1821        let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1822        assert_eq!(bytes.len() % bytes_per_value, 0);
1823        let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1824        if has_large_chunk {
1825            Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1826        } else {
1827            Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1828        }
1829    }
1830}
1831
1832impl<'a> Iterator for WordsIter<'a> {
1833    type Item = u32;
1834
1835    fn next(&mut self) -> Option<Self::Item> {
1836        self.iter.next()
1837    }
1838}
1839
1840impl StructuralPageScheduler for MiniBlockScheduler {
1841    fn initialize<'a>(
1842        &'a mut self,
1843        io: &Arc<dyn EncodingsIo>,
1844    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1845        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1846        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1847        // need.
1848        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1849        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1850        let mut bufs_needed = 1;
1851        if self.dictionary.is_some() {
1852            bufs_needed += 1;
1853        }
1854        if self.repetition_index_depth > 0 {
1855            bufs_needed += 1;
1856        }
1857        let mut required_ranges = Vec::with_capacity(bufs_needed);
1858        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1859        if let Some(ref dictionary) = self.dictionary {
1860            required_ranges.push(
1861                dictionary.dictionary_buf_position_and_size.0
1862                    ..dictionary.dictionary_buf_position_and_size.0
1863                        + dictionary.dictionary_buf_position_and_size.1,
1864            );
1865        }
1866        if self.repetition_index_depth > 0 {
1867            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1868            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1869        }
1870        let io_req = io.submit_request(required_ranges, 0);
1871
1872        async move {
1873            let mut buffers = io_req.await?.into_iter().fuse();
1874            let meta_bytes = buffers.next().unwrap();
1875            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1876            let rep_index_bytes = buffers.next();
1877
1878            // Parse the metadata and build the chunk meta
1879            let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1880            let mut chunk_meta = Vec::with_capacity(words.len());
1881
1882            let mut rows_counter = 0;
1883            let mut offset_bytes = value_buf_position;
1884            for (word_idx, word) in words.iter().enumerate() {
1885                let log_num_values = word & 0x0F;
1886                let divided_bytes = word >> 4;
1887                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1888                debug_assert!(num_bytes > 0);
1889                let num_values = if word_idx < words.len() - 1 {
1890                    debug_assert!(log_num_values > 0);
1891                    1 << log_num_values
1892                } else {
1893                    debug_assert!(
1894                        log_num_values == 0
1895                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1896                    );
1897                    self.items_in_page - rows_counter
1898                };
1899                rows_counter += num_values;
1900
1901                chunk_meta.push(ChunkMeta {
1902                    num_values,
1903                    chunk_size_bytes: num_bytes as u64,
1904                    offset_bytes,
1905                });
1906                offset_bytes += num_bytes as u64;
1907            }
1908
1909            // Build the repetition index
1910            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1911                assert!(rep_index_data.len() % 8 == 0);
1912                let stride = self.repetition_index_depth as usize + 1;
1913                MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1914            } else {
1915                MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1916            };
1917
1918            let mut page_meta = MiniBlockCacheableState {
1919                chunk_meta,
1920                rep_index,
1921                dictionary: None,
1922            };
1923
1924            // decode dictionary
1925            if let Some(ref mut dictionary) = self.dictionary {
1926                let dictionary_data = dictionary_bytes.unwrap();
1927                page_meta.dictionary =
1928                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1929                        LanceBuffer::from_bytes(
1930                            dictionary_data,
1931                            dictionary.dictionary_data_alignment,
1932                        ),
1933                        dictionary.num_dictionary_items,
1934                    )?));
1935            };
1936            let page_meta = Arc::new(page_meta);
1937            self.page_meta = Some(page_meta.clone());
1938            Ok(page_meta as Arc<dyn CachedPageData>)
1939        }
1940        .boxed()
1941    }
1942
1943    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1944        self.page_meta = Some(
1945            data.clone()
1946                .as_arc_any()
1947                .downcast::<MiniBlockCacheableState>()
1948                .unwrap(),
1949        );
1950    }
1951
1952    fn schedule_ranges(
1953        &self,
1954        ranges: &[Range<u64>],
1955        io: &Arc<dyn EncodingsIo>,
1956    ) -> Result<Vec<PageLoadTask>> {
1957        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1958
1959        let page_meta = self.page_meta.as_ref().unwrap();
1960
1961        let chunk_instructions =
1962            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1963
1964        debug_assert_eq!(
1965            num_rows,
1966            chunk_instructions
1967                .iter()
1968                .map(|ci| ci.rows_to_take)
1969                .sum::<u64>()
1970        );
1971
1972        let chunks_needed = chunk_instructions
1973            .iter()
1974            .map(|ci| ci.chunk_idx)
1975            .unique()
1976            .collect::<Vec<_>>();
1977
1978        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1979        let chunk_ranges = loaded_chunks
1980            .iter()
1981            .map(|c| c.byte_range.clone())
1982            .collect::<Vec<_>>();
1983        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1984
1985        let rep_decompressor = self.rep_decompressor.clone();
1986        let def_decompressor = self.def_decompressor.clone();
1987        let value_decompressor = self.value_decompressor.clone();
1988        let num_buffers = self.num_buffers;
1989        let has_large_chunk = self.has_large_chunk;
1990        let dictionary = page_meta
1991            .dictionary
1992            .as_ref()
1993            .map(|dictionary| dictionary.clone());
1994        let def_meaning = self.def_meaning.clone();
1995
1996        let res = async move {
1997            let loaded_chunk_data = loaded_chunk_data.await?;
1998            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1999                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
2000            }
2001
2002            Ok(Box::new(MiniBlockDecoder {
2003                rep_decompressor,
2004                def_decompressor,
2005                value_decompressor,
2006                def_meaning,
2007                loaded_chunks: VecDeque::from_iter(loaded_chunks),
2008                instructions: VecDeque::from(chunk_instructions),
2009                offset_in_current_chunk: 0,
2010                dictionary,
2011                num_rows,
2012                num_buffers,
2013                has_large_chunk,
2014            }) as Box<dyn StructuralPageDecoder>)
2015        }
2016        .boxed();
2017        let page_load_task = PageLoadTask {
2018            decoder_fut: res,
2019            num_rows,
2020        };
2021        Ok(vec![page_load_task])
2022    }
2023}
2024
2025#[derive(Debug, Clone, Copy)]
2026struct FullZipRepIndexDetails {
2027    buf_position: u64,
2028    bytes_per_value: u64, // Will be 1, 2, 4, or 8
2029}
2030
2031#[derive(Debug)]
2032enum PerValueDecompressor {
2033    Fixed(Arc<dyn FixedPerValueDecompressor>),
2034    Variable(Arc<dyn VariablePerValueDecompressor>),
2035}
2036
2037#[derive(Debug)]
2038struct FullZipDecodeDetails {
2039    value_decompressor: PerValueDecompressor,
2040    def_meaning: Arc<[DefinitionInterpretation]>,
2041    ctrl_word_parser: ControlWordParser,
2042    max_rep: u16,
2043    max_visible_def: u16,
2044}
2045
2046/// Describes where FullZip byte ranges should be read from.
2047///
2048/// FullZip decoding always needs a list of byte ranges, but those bytes can come
2049/// from two different places:
2050/// - Remote I/O (normal path): ranges are fetched from the underlying `EncodingsIo`.
2051/// - A prefetched full page (full scan fast path): the entire page has already been
2052///   loaded once and ranges should be sliced from memory.
2053///
2054/// This abstraction keeps scheduling code focused on "which ranges are needed"
2055/// instead of "how bytes are fetched", and it lets full-page scans avoid the
2056/// two-stage rep-index -> data I/O pipeline.
2057#[derive(Debug, Clone)]
2058enum FullZipReadSource {
2059    /// Fetch ranges from the storage backend through the encoding I/O interface.
2060    Remote(Arc<dyn EncodingsIo>),
2061    /// Slice ranges from an already-loaded FullZip page buffer.
2062    PrefetchedPage { base_offset: u64, data: LanceBuffer },
2063}
2064
2065impl FullZipReadSource {
2066    /// Materialize the requested ranges as decode-ready `LanceBuffer`s.
2067    ///
2068    /// The returned buffers preserve the input range order.
2069    fn fetch(
2070        &self,
2071        ranges: &[Range<u64>],
2072        priority: u64,
2073    ) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
2074        match self {
2075            Self::Remote(io) => {
2076                let io = io.clone();
2077                let ranges = ranges.to_vec();
2078                async move {
2079                    let data = io.submit_request(ranges, priority).await?;
2080                    Ok(data
2081                        .into_iter()
2082                        .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2083                        .collect::<VecDeque<_>>())
2084                }
2085                .boxed()
2086            }
2087            Self::PrefetchedPage { base_offset, data } => {
2088                let base_offset = *base_offset;
2089                let data = data.clone();
2090                let page_end = base_offset + data.len() as u64;
2091                std::future::ready(
2092                    ranges
2093                        .iter()
2094                        .map(|range| {
2095                            if range.start > range.end
2096                                || range.start < base_offset
2097                                || range.end > page_end
2098                            {
2099                                return Err(Error::internal(format!(
2100                                    "Requested range {:?} is outside page range {}..{}",
2101                                    range, base_offset, page_end
2102                                )));
2103                            }
2104                            let start = (range.start - base_offset) as usize;
2105                            let len = (range.end - range.start) as usize;
2106                            Ok(data.slice_with_length(start, len))
2107                        })
2108                        .collect::<Result<VecDeque<_>>>(),
2109                )
2110                .boxed()
2111            }
2112        }
2113    }
2114}
2115
2116/// A scheduler for full-zip encoded data
2117///
2118/// When the data type has a fixed-width then we simply need to map from
2119/// row ranges to byte ranges using the fixed-width of the data type.
2120///
2121/// When the data type is variable-width or has any repetition then a
2122/// repetition index is required.
2123#[derive(Debug)]
2124pub struct FullZipScheduler {
2125    data_buf_position: u64,
2126    data_buf_size: u64,
2127    rep_index: Option<FullZipRepIndexDetails>,
2128    priority: u64,
2129    rows_in_page: u64,
2130    bits_per_offset: u8,
2131    details: Arc<FullZipDecodeDetails>,
2132    /// Cached state containing the decoded repetition index
2133    cached_state: Option<Arc<FullZipCacheableState>>,
2134    /// Whether repetition index metadata should be cached during initialize.
2135    enable_cache: bool,
2136}
2137
2138impl FullZipScheduler {
2139    fn try_new(
2140        buffer_offsets_and_sizes: &[(u64, u64)],
2141        priority: u64,
2142        rows_in_page: u64,
2143        layout: &pb21::FullZipLayout,
2144        decompressors: &dyn DecompressionStrategy,
2145    ) -> Result<Self> {
2146        let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
2147        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
2148            let num_reps = rows_in_page + 1;
2149            let bytes_per_rep = len / num_reps;
2150            debug_assert_eq!(len % num_reps, 0);
2151            debug_assert!(
2152                bytes_per_rep == 1
2153                    || bytes_per_rep == 2
2154                    || bytes_per_rep == 4
2155                    || bytes_per_rep == 8
2156            );
2157            FullZipRepIndexDetails {
2158                buf_position: *pos,
2159                bytes_per_value: bytes_per_rep,
2160            }
2161        });
2162
2163        let value_decompressor = match layout.details {
2164            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
2165                let decompressor = decompressors.create_fixed_per_value_decompressor(
2166                    layout.value_compression.as_ref().unwrap(),
2167                )?;
2168                PerValueDecompressor::Fixed(decompressor.into())
2169            }
2170            Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
2171                let decompressor = decompressors.create_variable_per_value_decompressor(
2172                    layout.value_compression.as_ref().unwrap(),
2173                )?;
2174                PerValueDecompressor::Variable(decompressor.into())
2175            }
2176            None => {
2177                panic!("Full-zip layout must have a `details` field");
2178            }
2179        };
2180        let ctrl_word_parser = ControlWordParser::new(
2181            layout.bits_rep.try_into().unwrap(),
2182            layout.bits_def.try_into().unwrap(),
2183        );
2184        let def_meaning = layout
2185            .layers
2186            .iter()
2187            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2188            .collect::<Vec<_>>();
2189
2190        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
2191        let max_visible_def = def_meaning
2192            .iter()
2193            .filter(|d| !d.is_list())
2194            .map(|d| d.num_def_levels())
2195            .sum();
2196
2197        let bits_per_offset = match layout.details {
2198            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
2199            Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
2200                bits_per_offset as u8
2201            }
2202            None => panic!("Full-zip layout must have a `details` field"),
2203        };
2204
2205        let details = Arc::new(FullZipDecodeDetails {
2206            value_decompressor,
2207            def_meaning: def_meaning.into(),
2208            ctrl_word_parser,
2209            max_rep,
2210            max_visible_def,
2211        });
2212        Ok(Self {
2213            data_buf_position,
2214            data_buf_size,
2215            rep_index,
2216            details,
2217            priority,
2218            rows_in_page,
2219            bits_per_offset,
2220            cached_state: None,
2221            enable_cache: false,
2222        })
2223    }
2224
2225    fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
2226        if ranges.is_empty() {
2227            return false;
2228        }
2229        let mut expected_start = 0;
2230        for range in ranges {
2231            if range.start != expected_start || range.end > rows_in_page || range.end < range.start
2232            {
2233                return false;
2234            }
2235            expected_start = range.end;
2236        }
2237        expected_start == rows_in_page
2238    }
2239
2240    fn create_page_load_task(
2241        io_future: BoxFuture<'static, Result<Vec<Bytes>>>,
2242        num_rows: u64,
2243        details: Arc<FullZipDecodeDetails>,
2244        bits_per_offset: u8,
2245    ) -> PageLoadTask {
2246        let load_task = async move {
2247            let buffers = io_future.await?;
2248            let data = buffers
2249                .into_iter()
2250                .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2251                .collect::<VecDeque<_>>();
2252            Self::create_decoder(details, data, num_rows, bits_per_offset)
2253        }
2254        .boxed();
2255        PageLoadTask {
2256            decoder_fut: load_task,
2257            num_rows,
2258        }
2259    }
2260
2261    /// Creates a decoder from the loaded data
2262    fn create_decoder(
2263        details: Arc<FullZipDecodeDetails>,
2264        data: VecDeque<LanceBuffer>,
2265        num_rows: u64,
2266        bits_per_offset: u8,
2267    ) -> Result<Box<dyn StructuralPageDecoder>> {
2268        match &details.value_decompressor {
2269            PerValueDecompressor::Fixed(decompressor) => {
2270                let bits_per_value = decompressor.bits_per_value();
2271                if bits_per_value % 8 != 0 {
2272                    return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
2273                }
2274                let bytes_per_value = bits_per_value / 8;
2275                let total_bytes_per_value =
2276                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2277                if total_bytes_per_value == 0 {
2278                    return Err(lance_core::Error::internal(
2279                        "Invalid encoding: per-row byte width must be greater than 0",
2280                    ));
2281                }
2282                Ok(Box::new(FixedFullZipDecoder {
2283                    details,
2284                    data,
2285                    num_rows,
2286                    offset_in_current: 0,
2287                    bytes_per_value: bytes_per_value as usize,
2288                    total_bytes_per_value,
2289                }) as Box<dyn StructuralPageDecoder>)
2290            }
2291            PerValueDecompressor::Variable(_decompressor) => {
2292                Ok(Box::new(VariableFullZipDecoder::new(
2293                    details,
2294                    data,
2295                    num_rows,
2296                    bits_per_offset,
2297                    bits_per_offset,
2298                )))
2299            }
2300        }
2301    }
2302
2303    /// Extracts byte ranges from a repetition index buffer
2304    /// The buffer contains pairs of (start, end) values for each range
2305    fn extract_byte_ranges_from_pairs(
2306        buffer: LanceBuffer,
2307        bytes_per_value: u64,
2308        data_buf_position: u64,
2309    ) -> Vec<Range<u64>> {
2310        ByteUnpacker::new(buffer, bytes_per_value as usize)
2311            .chunks(2)
2312            .into_iter()
2313            .map(|mut c| {
2314                let start = c.next().unwrap() + data_buf_position;
2315                let end = c.next().unwrap() + data_buf_position;
2316                start..end
2317            })
2318            .collect::<Vec<_>>()
2319    }
2320
2321    /// Extracts byte ranges from a cached repetition index buffer
2322    /// The buffer contains all values and we need to extract specific ranges
2323    fn extract_byte_ranges_from_cached(
2324        buffer: &LanceBuffer,
2325        ranges: &[Range<u64>],
2326        bytes_per_value: u64,
2327        data_buf_position: u64,
2328    ) -> Vec<Range<u64>> {
2329        ranges
2330            .iter()
2331            .map(|r| {
2332                let start_offset = (r.start * bytes_per_value) as usize;
2333                let end_offset = (r.end * bytes_per_value) as usize;
2334
2335                let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2336                let start_val =
2337                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2338                        .next()
2339                        .unwrap();
2340
2341                let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2342                let end_val =
2343                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2344                        .next()
2345                        .unwrap();
2346
2347                (data_buf_position + start_val)..(data_buf_position + end_val)
2348            })
2349            .collect()
2350    }
2351
2352    /// Computes the ranges in the repetition index that need to be loaded
2353    fn compute_rep_index_ranges(
2354        ranges: &[Range<u64>],
2355        rep_index: &FullZipRepIndexDetails,
2356    ) -> Vec<Range<u64>> {
2357        ranges
2358            .iter()
2359            .flat_map(|r| {
2360                let first_val_start =
2361                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2362                let first_val_end = first_val_start + rep_index.bytes_per_value;
2363                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2364                let last_val_end = last_val_start + rep_index.bytes_per_value;
2365                [first_val_start..first_val_end, last_val_start..last_val_end]
2366            })
2367            .collect()
2368    }
2369
2370    /// Schedules ranges in the presence of a repetition index
2371    fn schedule_ranges_rep(
2372        &self,
2373        ranges: &[Range<u64>],
2374        io: &Arc<dyn EncodingsIo>,
2375        rep_index: FullZipRepIndexDetails,
2376    ) -> Result<Vec<PageLoadTask>> {
2377        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2378        let data_buf_position = self.data_buf_position;
2379        let priority = self.priority;
2380        let details = self.details.clone();
2381        let bits_per_offset = self.bits_per_offset;
2382
2383        if Self::covers_entire_page(ranges, self.rows_in_page) {
2384            let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
2385            let page_data = io.submit_single(full_range.clone(), priority);
2386            let load_task = async move {
2387                let page_data = page_data.await?;
2388                let source = FullZipReadSource::PrefetchedPage {
2389                    base_offset: full_range.start,
2390                    data: LanceBuffer::from_bytes(page_data, 1),
2391                };
2392                let read_ranges = vec![full_range];
2393                let data = source.fetch(&read_ranges, priority).await?;
2394                Self::create_decoder(details, data, num_rows, bits_per_offset)
2395            }
2396            .boxed();
2397            let page_load_task = PageLoadTask {
2398                decoder_fut: load_task,
2399                num_rows,
2400            };
2401            return Ok(vec![page_load_task]);
2402        }
2403
2404        if let Some(cached_state) = &self.cached_state {
2405            let byte_ranges = Self::extract_byte_ranges_from_cached(
2406                &cached_state.rep_index_buffer,
2407                ranges,
2408                rep_index.bytes_per_value,
2409                data_buf_position,
2410            );
2411            let io_future = io.submit_request(byte_ranges, priority);
2412            let page_load_task =
2413                Self::create_page_load_task(io_future, num_rows, details, bits_per_offset);
2414            return Ok(vec![page_load_task]);
2415        }
2416
2417        let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
2418        let rep_data = io.submit_request(rep_ranges, priority);
2419        let io_clone = io.clone();
2420        let load_task = async move {
2421            let rep_data = rep_data.await?;
2422            let rep_buffer = LanceBuffer::concat(
2423                &rep_data
2424                    .into_iter()
2425                    .map(|d| LanceBuffer::from_bytes(d, 1))
2426                    .collect::<Vec<_>>(),
2427            );
2428            let byte_ranges = Self::extract_byte_ranges_from_pairs(
2429                rep_buffer,
2430                rep_index.bytes_per_value,
2431                data_buf_position,
2432            );
2433            let source = FullZipReadSource::Remote(io_clone);
2434            let data = source.fetch(&byte_ranges, priority).await?;
2435            Self::create_decoder(details, data, num_rows, bits_per_offset)
2436        }
2437        .boxed();
2438        let page_load_task = PageLoadTask {
2439            decoder_fut: load_task,
2440            num_rows,
2441        };
2442        Ok(vec![page_load_task])
2443    }
2444
2445    // In the simple case there is no repetition and we just have large fixed-width
2446    // rows of data.  We can just map row ranges to byte ranges directly using the
2447    // fixed-width of the data type.
2448    fn schedule_ranges_simple(
2449        &self,
2450        ranges: &[Range<u64>],
2451        io: &Arc<dyn EncodingsIo>,
2452    ) -> Result<Vec<PageLoadTask>> {
2453        // Convert row ranges to item ranges (i.e. multiply by items per row)
2454        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2455
2456        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2457            unreachable!()
2458        };
2459
2460        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2461        let bits_per_value = decompressor.bits_per_value();
2462        assert_eq!(bits_per_value % 8, 0);
2463        let bytes_per_value = bits_per_value / 8;
2464        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2465        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2466        let byte_ranges = ranges
2467            .iter()
2468            .map(|r| {
2469                debug_assert!(r.end <= self.rows_in_page);
2470                let start = self.data_buf_position + r.start * total_bytes_per_value;
2471                let end = self.data_buf_position + r.end * total_bytes_per_value;
2472                start..end
2473            })
2474            .collect::<Vec<_>>();
2475
2476        let io_future = io.submit_request(byte_ranges, self.priority);
2477        let page_load_task = Self::create_page_load_task(
2478            io_future,
2479            num_rows,
2480            self.details.clone(),
2481            self.bits_per_offset,
2482        );
2483        Ok(vec![page_load_task])
2484    }
2485}
2486
2487/// Cacheable state for FullZip encoding, storing the decoded repetition index
2488#[derive(Debug)]
2489struct FullZipCacheableState {
2490    /// The raw repetition index buffer for future decoding
2491    rep_index_buffer: LanceBuffer,
2492}
2493
2494impl DeepSizeOf for FullZipCacheableState {
2495    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2496        self.rep_index_buffer.len()
2497    }
2498}
2499
2500impl CachedPageData for FullZipCacheableState {
2501    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2502        self
2503    }
2504}
2505
2506impl StructuralPageScheduler for FullZipScheduler {
2507    fn initialize<'a>(
2508        &'a mut self,
2509        io: &Arc<dyn EncodingsIo>,
2510    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2511        if self.enable_cache
2512            && let Some(rep_index) = self.rep_index
2513        {
2514            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2515            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2516            let io_clone = io.clone();
2517            return async move {
2518                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2519                let state = Arc::new(FullZipCacheableState {
2520                    rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
2521                });
2522                self.cached_state = Some(state.clone());
2523                Ok(state as Arc<dyn CachedPageData>)
2524            }
2525            .boxed();
2526        }
2527        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2528    }
2529
2530    /// Loads previously cached repetition index data from the cache system.
2531    /// This method is called when a scheduler instance needs to use cached data
2532    /// that was initialized by another instance or in a previous operation.
2533    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2534        // Try to downcast to our specific cache type
2535        if let Ok(cached_state) = cache
2536            .clone()
2537            .as_arc_any()
2538            .downcast::<FullZipCacheableState>()
2539        {
2540            // Store the cached state for use in schedule_ranges
2541            self.cached_state = Some(cached_state);
2542        }
2543    }
2544
2545    fn schedule_ranges(
2546        &self,
2547        ranges: &[Range<u64>],
2548        io: &Arc<dyn EncodingsIo>,
2549    ) -> Result<Vec<PageLoadTask>> {
2550        if let Some(rep_index) = self.rep_index {
2551            self.schedule_ranges_rep(ranges, io, rep_index)
2552        } else {
2553            self.schedule_ranges_simple(ranges, io)
2554        }
2555    }
2556}
2557
2558/// A decoder for full-zip encoded data when the data has a fixed-width
2559///
2560/// Here we need to unzip the control words from the values themselves and
2561/// then decompress the requested values.
2562///
2563/// We use a PerValueDecompressor because we will only be decompressing the
2564/// requested data.  This decoder / scheduler does not do any read amplification.
2565#[derive(Debug)]
2566struct FixedFullZipDecoder {
2567    details: Arc<FullZipDecodeDetails>,
2568    data: VecDeque<LanceBuffer>,
2569    offset_in_current: usize,
2570    bytes_per_value: usize,
2571    total_bytes_per_value: usize,
2572    num_rows: u64,
2573}
2574
2575impl FixedFullZipDecoder {
2576    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2577        debug_assert!(num_rows > 0);
2578        let cur_buf = self.data.front_mut().unwrap();
2579        let start = self.offset_in_current;
2580        if self.details.ctrl_word_parser.has_rep() {
2581            // This is a slightly slower path.  In order to figure out where to split we need to
2582            // examine the rep index so we can convert num_lists to num_rows
2583            let mut rows_started = 0;
2584            // We always need at least one value.  Now loop through until we have passed num_rows
2585            // values
2586            let mut num_items = 0;
2587            while self.offset_in_current < cur_buf.len() {
2588                let control = self.details.ctrl_word_parser.parse_desc(
2589                    &cur_buf[self.offset_in_current..],
2590                    self.details.max_rep,
2591                    self.details.max_visible_def,
2592                );
2593                if control.is_new_row {
2594                    if rows_started == num_rows {
2595                        break;
2596                    }
2597                    rows_started += 1;
2598                }
2599                num_items += 1;
2600                if control.is_visible {
2601                    self.offset_in_current += self.total_bytes_per_value;
2602                } else {
2603                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2604                }
2605            }
2606
2607            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2608            if self.offset_in_current == cur_buf.len() {
2609                self.data.pop_front();
2610                self.offset_in_current = 0;
2611            }
2612
2613            FullZipDecodeTaskItem {
2614                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2615                    data: task_slice,
2616                    bits_per_value: self.bytes_per_value as u64 * 8,
2617                    num_values: num_items,
2618                    block_info: BlockInfo::new(),
2619                }),
2620                rows_in_buf: rows_started,
2621            }
2622        } else {
2623            // If there's no repetition we can calculate the slicing point by just multiplying
2624            // the number of rows by the total bytes per value
2625            let cur_buf = self.data.front_mut().unwrap();
2626            let bytes_avail = cur_buf.len() - self.offset_in_current;
2627            let offset_in_cur = self.offset_in_current;
2628
2629            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2630            let mut rows_taken = num_rows;
2631            let task_slice = if bytes_needed >= bytes_avail {
2632                self.offset_in_current = 0;
2633                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2634                self.data
2635                    .pop_front()
2636                    .unwrap()
2637                    .slice_with_length(offset_in_cur, bytes_avail)
2638            } else {
2639                self.offset_in_current += bytes_needed;
2640                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2641            };
2642            FullZipDecodeTaskItem {
2643                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2644                    data: task_slice,
2645                    bits_per_value: self.bytes_per_value as u64 * 8,
2646                    num_values: rows_taken,
2647                    block_info: BlockInfo::new(),
2648                }),
2649                rows_in_buf: rows_taken,
2650            }
2651        }
2652    }
2653}
2654
2655impl StructuralPageDecoder for FixedFullZipDecoder {
2656    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2657        let mut task_data = Vec::with_capacity(self.data.len());
2658        let mut remaining = num_rows;
2659        while remaining > 0 {
2660            let task_item = self.slice_next_task(remaining);
2661            remaining -= task_item.rows_in_buf;
2662            task_data.push(task_item);
2663        }
2664        Ok(Box::new(FixedFullZipDecodeTask {
2665            details: self.details.clone(),
2666            data: task_data,
2667            bytes_per_value: self.bytes_per_value,
2668            num_rows: num_rows as usize,
2669        }))
2670    }
2671
2672    fn num_rows(&self) -> u64 {
2673        self.num_rows
2674    }
2675}
2676
2677/// A decoder for full-zip encoded data when the data has a variable-width
2678///
2679/// Here we need to unzip the control words AND lengths from the values and
2680/// then decompress the requested values.
2681#[derive(Debug)]
2682struct VariableFullZipDecoder {
2683    details: Arc<FullZipDecodeDetails>,
2684    decompressor: Arc<dyn VariablePerValueDecompressor>,
2685    data: LanceBuffer,
2686    offsets: LanceBuffer,
2687    rep: ScalarBuffer<u16>,
2688    def: ScalarBuffer<u16>,
2689    repdef_starts: Vec<usize>,
2690    data_starts: Vec<usize>,
2691    offset_starts: Vec<usize>,
2692    visible_item_counts: Vec<u64>,
2693    bits_per_offset: u8,
2694    current_idx: usize,
2695    num_rows: u64,
2696}
2697
2698impl VariableFullZipDecoder {
2699    fn new(
2700        details: Arc<FullZipDecodeDetails>,
2701        data: VecDeque<LanceBuffer>,
2702        num_rows: u64,
2703        in_bits_per_length: u8,
2704        out_bits_per_offset: u8,
2705    ) -> Self {
2706        let decompressor = match details.value_decompressor {
2707            PerValueDecompressor::Variable(ref d) => d.clone(),
2708            _ => unreachable!(),
2709        };
2710
2711        assert_eq!(in_bits_per_length % 8, 0);
2712        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2713
2714        let mut decoder = Self {
2715            details,
2716            decompressor,
2717            data: LanceBuffer::empty(),
2718            offsets: LanceBuffer::empty(),
2719            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2720            def: LanceBuffer::empty().borrow_to_typed_slice(),
2721            bits_per_offset: out_bits_per_offset,
2722            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2723            data_starts: Vec::with_capacity(num_rows as usize + 1),
2724            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2725            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2726            current_idx: 0,
2727            num_rows,
2728        };
2729
2730        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2731        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2732        // unpacked to know where the values start and end.
2733        //
2734        // We don't want to unzip on the decode thread because that is a single-threaded path
2735        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2736        //
2737        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2738        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2739        // we load all the data into memory and then throw it away only to load it all into memory again during
2740        // the decode.
2741        //
2742        // There are some alternatives to investigate:
2743        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2744        //     range in between.  This will give us the break points that we need for slicing and won't increase
2745        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2746        //     even when doing a full scan.
2747        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2748        //     has to do more work but the work is all fused.
2749        //   - We could just try doing this work on the decode thread and see if it is a problem.
2750        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2751
2752        decoder
2753    }
2754
2755    fn slice_batch_data_and_rebase_offsets_typed<T>(
2756        data: &LanceBuffer,
2757        offsets: &LanceBuffer,
2758    ) -> Result<(LanceBuffer, LanceBuffer)>
2759    where
2760        T: arrow_buffer::ArrowNativeType
2761            + Copy
2762            + PartialOrd
2763            + std::ops::Sub<Output = T>
2764            + std::fmt::Display
2765            + TryInto<usize>,
2766    {
2767        let offsets_slice = offsets.borrow_to_typed_slice::<T>();
2768        let offsets_slice = offsets_slice.as_ref();
2769        if offsets_slice.is_empty() {
2770            return Err(Error::internal(
2771                "Variable offsets cannot be empty".to_string(),
2772            ));
2773        }
2774
2775        let base = offsets_slice[0];
2776        let end = *offsets_slice.last().unwrap();
2777        if end < base {
2778            return Err(Error::internal(format!(
2779                "Invalid variable offsets: end ({end}) is less than base ({base})"
2780            )));
2781        }
2782
2783        let data_start = base.try_into().map_err(|_| {
2784            Error::internal(format!("Variable offset ({base}) does not fit into usize"))
2785        })?;
2786        let data_end = end.try_into().map_err(|_| {
2787            Error::internal(format!("Variable offset ({end}) does not fit into usize"))
2788        })?;
2789        if data_end > data.len() {
2790            return Err(Error::internal(format!(
2791                "Invalid variable offsets: end ({data_end}) exceeds data len ({})",
2792                data.len()
2793            )));
2794        }
2795
2796        let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
2797        for &offset in offsets_slice {
2798            if offset < base {
2799                return Err(Error::internal(format!(
2800                    "Invalid variable offsets: offset ({offset}) is less than base ({base})"
2801                )));
2802            }
2803            rebased_offsets.push(offset - base);
2804        }
2805
2806        let sliced_data = data.slice_with_length(data_start, data_end - data_start);
2807        // Copy into a compact buffer so each output batch owns only what it references.
2808        let sliced_data = LanceBuffer::copy_slice(&sliced_data);
2809        let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
2810        Ok((sliced_data, rebased_offsets))
2811    }
2812
2813    fn slice_batch_data_and_rebase_offsets(
2814        data: &LanceBuffer,
2815        offsets: &LanceBuffer,
2816        bits_per_offset: u8,
2817    ) -> Result<(LanceBuffer, LanceBuffer)> {
2818        match bits_per_offset {
2819            32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
2820            64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
2821            _ => Err(Error::internal(format!(
2822                "Unsupported bits_per_offset={bits_per_offset}"
2823            ))),
2824        }
2825    }
2826
2827    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2828        match bits_per_offset {
2829            8 => *data.get_unchecked(0) as u64,
2830            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2831            32 => u32::from_le_bytes([
2832                *data.get_unchecked(0),
2833                *data.get_unchecked(1),
2834                *data.get_unchecked(2),
2835                *data.get_unchecked(3),
2836            ]) as u64,
2837            64 => u64::from_le_bytes([
2838                *data.get_unchecked(0),
2839                *data.get_unchecked(1),
2840                *data.get_unchecked(2),
2841                *data.get_unchecked(3),
2842                *data.get_unchecked(4),
2843                *data.get_unchecked(5),
2844                *data.get_unchecked(6),
2845                *data.get_unchecked(7),
2846            ]),
2847            _ => unreachable!(),
2848        }
2849    }
2850
2851    fn unzip(
2852        &mut self,
2853        data: VecDeque<LanceBuffer>,
2854        in_bits_per_length: u8,
2855        out_bits_per_offset: u8,
2856        num_rows: u64,
2857    ) {
2858        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2859        let mut rep = Vec::with_capacity(num_rows as usize);
2860        let mut def = Vec::with_capacity(num_rows as usize);
2861        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2862
2863        // This undercounts if there are lists
2864        // It can also overcount if there are invisible items
2865        let bytes_per_offset = out_bits_per_offset as usize / 8;
2866        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2867        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2868
2869        let bytes_per_length = in_bits_per_length as usize / 8;
2870        let bytes_lengths = bytes_per_length * num_rows as usize;
2871
2872        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2873        // This overcounts since bytes_lengths and bytes_cw are undercounts
2874        // It can also undercount if there are invisible items (hence the saturating_sub)
2875        let mut unzipped_data =
2876            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2877
2878        let mut current_offset = 0_u64;
2879        let mut visible_item_count = 0_u64;
2880        for databuf in data.into_iter() {
2881            let mut databuf = databuf.as_ref();
2882            while !databuf.is_empty() {
2883                let data_start = unzipped_data.len();
2884                let offset_start = offsets_data.len();
2885                // We might have only-rep or only-def, neither, or both.  They move at the same
2886                // speed though so we only need one index into it
2887                let repdef_start = rep.len().max(def.len());
2888                // TODO: Kind of inefficient we parse the control word twice here
2889                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2890                    databuf,
2891                    self.details.max_rep,
2892                    self.details.max_visible_def,
2893                );
2894                self.details
2895                    .ctrl_word_parser
2896                    .parse(databuf, &mut rep, &mut def);
2897                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2898
2899                if ctrl_desc.is_new_row {
2900                    self.repdef_starts.push(repdef_start);
2901                    self.data_starts.push(data_start);
2902                    self.offset_starts.push(offset_start);
2903                    self.visible_item_counts.push(visible_item_count);
2904                }
2905                if ctrl_desc.is_visible {
2906                    visible_item_count += 1;
2907                    if ctrl_desc.is_valid_item {
2908                        // Safety: Data should have at least bytes_per_length bytes remaining
2909                        debug_assert!(databuf.len() >= bytes_per_length);
2910                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2911                        match out_bits_per_offset {
2912                            32 => offsets_data
2913                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2914                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2915                            _ => unreachable!(),
2916                        };
2917                        databuf = &databuf[bytes_per_offset..];
2918                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2919                        databuf = &databuf[length as usize..];
2920                        current_offset += length;
2921                    } else {
2922                        // Null items still get an offset
2923                        match out_bits_per_offset {
2924                            32 => offsets_data
2925                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2926                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2927                            _ => unreachable!(),
2928                        }
2929                    }
2930                }
2931            }
2932        }
2933        self.repdef_starts.push(rep.len().max(def.len()));
2934        self.data_starts.push(unzipped_data.len());
2935        self.offset_starts.push(offsets_data.len());
2936        self.visible_item_counts.push(visible_item_count);
2937        match out_bits_per_offset {
2938            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2939            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2940            _ => unreachable!(),
2941        };
2942        self.rep = ScalarBuffer::from(rep);
2943        self.def = ScalarBuffer::from(def);
2944        self.data = LanceBuffer::from(unzipped_data);
2945        self.offsets = LanceBuffer::from(offsets_data);
2946    }
2947}
2948
2949impl StructuralPageDecoder for VariableFullZipDecoder {
2950    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2951        let start = self.current_idx;
2952        let end = start + num_rows as usize;
2953
2954        let offset_start = self.offset_starts[start];
2955        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2956        let offsets = self
2957            .offsets
2958            .slice_with_length(offset_start, offset_end - offset_start);
2959        // Keep each batch's variable data buffer bounded to the selected rows.
2960        let (data, offsets) =
2961            Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
2962
2963        let repdef_start = self.repdef_starts[start];
2964        let repdef_end = self.repdef_starts[end];
2965        let rep = if self.rep.is_empty() {
2966            self.rep.clone()
2967        } else {
2968            self.rep.slice(repdef_start, repdef_end - repdef_start)
2969        };
2970        let def = if self.def.is_empty() {
2971            self.def.clone()
2972        } else {
2973            self.def.slice(repdef_start, repdef_end - repdef_start)
2974        };
2975
2976        let visible_item_counts_start = self.visible_item_counts[start];
2977        let visible_item_counts_end = self.visible_item_counts[end];
2978        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2979
2980        self.current_idx += num_rows as usize;
2981
2982        Ok(Box::new(VariableFullZipDecodeTask {
2983            details: self.details.clone(),
2984            decompressor: self.decompressor.clone(),
2985            data,
2986            offsets,
2987            bits_per_offset: self.bits_per_offset,
2988            num_visible_items,
2989            rep,
2990            def,
2991        }))
2992    }
2993
2994    fn num_rows(&self) -> u64 {
2995        self.num_rows
2996    }
2997}
2998
2999#[derive(Debug)]
3000struct VariableFullZipDecodeTask {
3001    details: Arc<FullZipDecodeDetails>,
3002    decompressor: Arc<dyn VariablePerValueDecompressor>,
3003    data: LanceBuffer,
3004    offsets: LanceBuffer,
3005    bits_per_offset: u8,
3006    num_visible_items: u64,
3007    rep: ScalarBuffer<u16>,
3008    def: ScalarBuffer<u16>,
3009}
3010
3011impl DecodePageTask for VariableFullZipDecodeTask {
3012    fn decode(self: Box<Self>) -> Result<DecodedPage> {
3013        let block = VariableWidthBlock {
3014            data: self.data,
3015            offsets: self.offsets,
3016            bits_per_offset: self.bits_per_offset,
3017            num_values: self.num_visible_items,
3018            block_info: BlockInfo::new(),
3019        };
3020        let decomopressed = self.decompressor.decompress(block)?;
3021        let rep = if self.rep.is_empty() {
3022            None
3023        } else {
3024            Some(self.rep.to_vec())
3025        };
3026        let def = if self.def.is_empty() {
3027            None
3028        } else {
3029            Some(self.def.to_vec())
3030        };
3031        let unraveler = RepDefUnraveler::new(
3032            rep,
3033            def,
3034            self.details.def_meaning.clone(),
3035            self.num_visible_items,
3036        );
3037        Ok(DecodedPage {
3038            data: decomopressed,
3039            repdef: unraveler,
3040        })
3041    }
3042}
3043
3044#[derive(Debug)]
3045struct FullZipDecodeTaskItem {
3046    data: PerValueDataBlock,
3047    rows_in_buf: u64,
3048}
3049
3050/// A task to unzip and decompress full-zip encoded data when that data
3051/// has a fixed-width.
3052#[derive(Debug)]
3053struct FixedFullZipDecodeTask {
3054    details: Arc<FullZipDecodeDetails>,
3055    data: Vec<FullZipDecodeTaskItem>,
3056    num_rows: usize,
3057    bytes_per_value: usize,
3058}
3059
3060impl DecodePageTask for FixedFullZipDecodeTask {
3061    fn decode(self: Box<Self>) -> Result<DecodedPage> {
3062        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
3063        let estimated_size_bytes = self
3064            .data
3065            .iter()
3066            .map(|task_item| task_item.data.data_size() as usize)
3067            .sum::<usize>()
3068            * 2;
3069        let mut data_builder =
3070            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
3071
3072        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
3073            // Fast path, no need to unzip because there is no rep/def
3074            //
3075            // We decompress each buffer and add it to our output buffer
3076            for task_item in self.data.into_iter() {
3077                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3078                    unreachable!()
3079                };
3080                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3081                else {
3082                    unreachable!()
3083                };
3084                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
3085                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
3086                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
3087            }
3088
3089            let unraveler = RepDefUnraveler::new(
3090                None,
3091                None,
3092                self.details.def_meaning.clone(),
3093                self.num_rows as u64,
3094            );
3095
3096            Ok(DecodedPage {
3097                data: data_builder.finish(),
3098                repdef: unraveler,
3099            })
3100        } else {
3101            // Slow path, unzipping needed
3102            let mut rep = Vec::with_capacity(self.num_rows);
3103            let mut def = Vec::with_capacity(self.num_rows);
3104
3105            for task_item in self.data.into_iter() {
3106                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3107                    unreachable!()
3108                };
3109                let mut buf_slice = fixed_data.data.as_ref();
3110                let num_values = fixed_data.num_values as usize;
3111                // We will be unzipping repdef in to `rep` and `def` and the
3112                // values into `values` (which contains the compressed values)
3113                let mut values = Vec::with_capacity(
3114                    fixed_data.data.len()
3115                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
3116                );
3117                let mut visible_items = 0;
3118                for _ in 0..num_values {
3119                    // Extract rep/def
3120                    self.details
3121                        .ctrl_word_parser
3122                        .parse(buf_slice, &mut rep, &mut def);
3123                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
3124
3125                    let is_visible = def
3126                        .last()
3127                        .map(|d| *d <= self.details.max_visible_def)
3128                        .unwrap_or(true);
3129                    if is_visible {
3130                        // Extract value
3131                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
3132                        buf_slice = &buf_slice[self.bytes_per_value..];
3133                        visible_items += 1;
3134                    }
3135                }
3136
3137                // Finally, we decompress the values and add them to our output buffer
3138                let values_buf = LanceBuffer::from(values);
3139                let fixed_data = FixedWidthDataBlock {
3140                    bits_per_value: self.bytes_per_value as u64 * 8,
3141                    block_info: BlockInfo::new(),
3142                    data: values_buf,
3143                    num_values: visible_items,
3144                };
3145                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3146                else {
3147                    unreachable!()
3148                };
3149                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
3150                data_builder.append(&decompressed, 0..visible_items);
3151            }
3152
3153            let repetition = if rep.is_empty() { None } else { Some(rep) };
3154            let definition = if def.is_empty() { None } else { Some(def) };
3155
3156            let unraveler = RepDefUnraveler::new(
3157                repetition,
3158                definition,
3159                self.details.def_meaning.clone(),
3160                self.num_rows as u64,
3161            );
3162            let data = data_builder.finish();
3163
3164            Ok(DecodedPage {
3165                data,
3166                repdef: unraveler,
3167            })
3168        }
3169    }
3170}
3171
3172#[derive(Debug)]
3173struct StructuralPrimitiveFieldSchedulingJob<'a> {
3174    scheduler: &'a StructuralPrimitiveFieldScheduler,
3175    ranges: Vec<Range<u64>>,
3176    page_idx: usize,
3177    range_idx: usize,
3178    global_row_offset: u64,
3179}
3180
3181impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
3182    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
3183        Self {
3184            scheduler,
3185            ranges,
3186            page_idx: 0,
3187            range_idx: 0,
3188            global_row_offset: 0,
3189        }
3190    }
3191}
3192
3193impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
3194    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
3195        if self.range_idx >= self.ranges.len() {
3196            return Ok(Vec::new());
3197        }
3198        // Get our current range
3199        let mut range = self.ranges[self.range_idx].clone();
3200        let priority = range.start;
3201
3202        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
3203        trace!(
3204            "Current range is {:?} and current page has {} rows",
3205            range, cur_page.num_rows
3206        );
3207        // Skip entire pages until we have some overlap with our next range
3208        while cur_page.num_rows + self.global_row_offset <= range.start {
3209            self.global_row_offset += cur_page.num_rows;
3210            self.page_idx += 1;
3211            trace!("Skipping entire page of {} rows", cur_page.num_rows);
3212            cur_page = &self.scheduler.page_schedulers[self.page_idx];
3213        }
3214
3215        // Now the cur_page has overlap with range.  Continue looping through ranges
3216        // until we find a range that exceeds the current page
3217
3218        let mut ranges_in_page = Vec::new();
3219        while cur_page.num_rows + self.global_row_offset > range.start {
3220            range.start = range.start.max(self.global_row_offset);
3221            let start_in_page = range.start - self.global_row_offset;
3222            let end_in_page = start_in_page + (range.end - range.start);
3223            let end_in_page = end_in_page.min(cur_page.num_rows);
3224            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
3225
3226            ranges_in_page.push(start_in_page..end_in_page);
3227            if last_in_range {
3228                self.range_idx += 1;
3229                if self.range_idx == self.ranges.len() {
3230                    break;
3231                }
3232                range = self.ranges[self.range_idx].clone();
3233            } else {
3234                break;
3235            }
3236        }
3237
3238        trace!(
3239            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
3240            ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
3241            ranges_in_page.len(),
3242            cur_page.num_rows,
3243            priority,
3244            self.scheduler.column_index,
3245            cur_page.page_index,
3246        );
3247
3248        self.global_row_offset += cur_page.num_rows;
3249        self.page_idx += 1;
3250
3251        let page_decoders = cur_page
3252            .scheduler
3253            .schedule_ranges(&ranges_in_page, context.io())?;
3254
3255        let cur_path = context.current_path();
3256        page_decoders
3257            .into_iter()
3258            .map(|page_load_task| {
3259                let cur_path = cur_path.clone();
3260                let page_decoder = page_load_task.decoder_fut;
3261                let unloaded_page = async move {
3262                    let page_decoder = page_decoder.await?;
3263                    Ok(LoadedPageShard {
3264                        decoder: page_decoder,
3265                        path: cur_path,
3266                    })
3267                }
3268                .boxed();
3269                Ok(ScheduledScanLine {
3270                    decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
3271                    rows_scheduled: page_load_task.num_rows,
3272                })
3273            })
3274            .collect::<Result<Vec<_>>>()
3275    }
3276}
3277
3278#[derive(Debug)]
3279struct PageInfoAndScheduler {
3280    page_index: usize,
3281    num_rows: u64,
3282    scheduler: Box<dyn StructuralPageScheduler>,
3283}
3284
3285/// A scheduler for a leaf node
3286///
3287/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
3288/// appropriate for the layout of the page.
3289#[derive(Debug)]
3290pub struct StructuralPrimitiveFieldScheduler {
3291    page_schedulers: Vec<PageInfoAndScheduler>,
3292    column_index: u32,
3293}
3294
3295impl StructuralPrimitiveFieldScheduler {
3296    pub fn try_new(
3297        column_info: &ColumnInfo,
3298        decompressors: &dyn DecompressionStrategy,
3299        cache_repetition_index: bool,
3300        target_field: &Field,
3301    ) -> Result<Self> {
3302        let page_schedulers = column_info
3303            .page_infos
3304            .iter()
3305            .enumerate()
3306            .map(|(page_index, page_info)| {
3307                Self::page_info_to_scheduler(
3308                    page_info,
3309                    page_index,
3310                    decompressors,
3311                    cache_repetition_index,
3312                    target_field,
3313                )
3314            })
3315            .collect::<Result<Vec<_>>>()?;
3316        Ok(Self {
3317            page_schedulers,
3318            column_index: column_info.index,
3319        })
3320    }
3321
3322    fn page_layout_to_scheduler(
3323        page_info: &PageInfo,
3324        page_layout: &PageLayout,
3325        decompressors: &dyn DecompressionStrategy,
3326        cache_repetition_index: bool,
3327        target_field: &Field,
3328    ) -> Result<Box<dyn StructuralPageScheduler>> {
3329        use pb21::page_layout::Layout;
3330        Ok(match page_layout.layout.as_ref().expect_ok()? {
3331            Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3332                &page_info.buffer_offsets_and_sizes,
3333                page_info.priority,
3334                mini_block.num_items,
3335                mini_block,
3336                decompressors,
3337            )?),
3338            Layout::FullZipLayout(full_zip) => {
3339                let mut scheduler = FullZipScheduler::try_new(
3340                    &page_info.buffer_offsets_and_sizes,
3341                    page_info.priority,
3342                    page_info.num_rows,
3343                    full_zip,
3344                    decompressors,
3345                )?;
3346                scheduler.enable_cache = cache_repetition_index;
3347                Box::new(scheduler)
3348            }
3349            Layout::ConstantLayout(constant_layout) => {
3350                let def_meaning = constant_layout
3351                    .layers
3352                    .iter()
3353                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3354                    .collect::<Vec<_>>();
3355                let has_scalar_value = constant_layout.inline_value.is_some()
3356                    || page_info.buffer_offsets_and_sizes.len() == 1
3357                    || page_info.buffer_offsets_and_sizes.len() == 3;
3358                if has_scalar_value {
3359                    Box::new(constant::ConstantPageScheduler::try_new(
3360                        page_info.buffer_offsets_and_sizes.clone(),
3361                        constant_layout.inline_value.clone(),
3362                        target_field.data_type(),
3363                        def_meaning.into(),
3364                    )?) as Box<dyn StructuralPageScheduler>
3365                } else if def_meaning.len() == 1
3366                    && def_meaning[0] == DefinitionInterpretation::NullableItem
3367                {
3368                    Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3369                } else {
3370                    let rep_decompressor = constant_layout
3371                        .rep_compression
3372                        .as_ref()
3373                        .map(|encoding| decompressors.create_block_decompressor(encoding))
3374                        .transpose()?
3375                        .map(Arc::from);
3376
3377                    let def_decompressor = constant_layout
3378                        .def_compression
3379                        .as_ref()
3380                        .map(|encoding| decompressors.create_block_decompressor(encoding))
3381                        .transpose()?
3382                        .map(Arc::from);
3383
3384                    Box::new(ComplexAllNullScheduler::new(
3385                        page_info.buffer_offsets_and_sizes.clone(),
3386                        def_meaning.into(),
3387                        rep_decompressor,
3388                        def_decompressor,
3389                        constant_layout.num_rep_values,
3390                        constant_layout.num_def_values,
3391                    )) as Box<dyn StructuralPageScheduler>
3392                }
3393            }
3394            Layout::BlobLayout(blob) => {
3395                let inner_scheduler = Self::page_layout_to_scheduler(
3396                    page_info,
3397                    blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3398                    decompressors,
3399                    cache_repetition_index,
3400                    target_field,
3401                )?;
3402                let def_meaning = blob
3403                    .layers
3404                    .iter()
3405                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3406                    .collect::<Vec<_>>();
3407                if matches!(target_field.data_type(), DataType::Struct(_)) {
3408                    // User wants to decode blob into struct
3409                    Box::new(BlobDescriptionPageScheduler::new(
3410                        inner_scheduler,
3411                        def_meaning.into(),
3412                    ))
3413                } else {
3414                    // User wants to decode blob into binary data
3415                    Box::new(BlobPageScheduler::new(
3416                        inner_scheduler,
3417                        page_info.priority,
3418                        page_info.num_rows,
3419                        def_meaning.into(),
3420                    ))
3421                }
3422            }
3423        })
3424    }
3425
3426    fn page_info_to_scheduler(
3427        page_info: &PageInfo,
3428        page_index: usize,
3429        decompressors: &dyn DecompressionStrategy,
3430        cache_repetition_index: bool,
3431        target_field: &Field,
3432    ) -> Result<PageInfoAndScheduler> {
3433        let page_layout = page_info.encoding.as_structural();
3434        let scheduler = Self::page_layout_to_scheduler(
3435            page_info,
3436            page_layout,
3437            decompressors,
3438            cache_repetition_index,
3439            target_field,
3440        )?;
3441        Ok(PageInfoAndScheduler {
3442            page_index,
3443            num_rows: page_info.num_rows,
3444            scheduler,
3445        })
3446    }
3447}
3448
3449pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3450    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3451}
3452
3453pub struct NoCachedPageData;
3454
3455impl DeepSizeOf for NoCachedPageData {
3456    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3457        0
3458    }
3459}
3460impl CachedPageData for NoCachedPageData {
3461    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3462        self
3463    }
3464}
3465
3466pub struct CachedFieldData {
3467    pages: Vec<Arc<dyn CachedPageData>>,
3468}
3469
3470impl DeepSizeOf for CachedFieldData {
3471    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3472        self.pages.deep_size_of_children(ctx)
3473    }
3474}
3475
3476// Cache key for field data
3477#[derive(Debug, Clone)]
3478pub struct FieldDataCacheKey {
3479    pub column_index: u32,
3480}
3481
3482impl CacheKey for FieldDataCacheKey {
3483    type ValueType = CachedFieldData;
3484
3485    fn key(&self) -> std::borrow::Cow<'_, str> {
3486        self.column_index.to_string().into()
3487    }
3488
3489    fn type_name() -> &'static str {
3490        "FieldData"
3491    }
3492}
3493
3494impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3495    fn initialize<'a>(
3496        &'a mut self,
3497        _filter: &'a FilterExpression,
3498        context: &'a SchedulerContext,
3499    ) -> BoxFuture<'a, Result<()>> {
3500        let cache_key = FieldDataCacheKey {
3501            column_index: self.column_index,
3502        };
3503        let cache = context.cache().clone();
3504
3505        async move {
3506            if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3507                self.page_schedulers
3508                    .iter_mut()
3509                    .zip(cached_data.pages.iter())
3510                    .for_each(|(page_scheduler, cached_data)| {
3511                        page_scheduler.scheduler.load(cached_data);
3512                    });
3513                return Ok(());
3514            }
3515
3516            let page_data = self
3517                .page_schedulers
3518                .iter_mut()
3519                .map(|s| s.scheduler.initialize(context.io()))
3520                .collect::<FuturesOrdered<_>>();
3521
3522            let page_data = page_data.try_collect::<Vec<_>>().await?;
3523            let cached_data = Arc::new(CachedFieldData { pages: page_data });
3524            cache.insert_with_key(&cache_key, cached_data).await;
3525            Ok(())
3526        }
3527        .boxed()
3528    }
3529
3530    fn schedule_ranges<'a>(
3531        &'a self,
3532        ranges: &[Range<u64>],
3533        _filter: &FilterExpression,
3534    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3535        let ranges = ranges.to_vec();
3536        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3537            self, ranges,
3538        )))
3539    }
3540}
3541
3542/// Takes the output from several pages decoders and
3543/// concatenates them.
3544#[derive(Debug)]
3545pub struct StructuralCompositeDecodeArrayTask {
3546    tasks: Vec<Box<dyn DecodePageTask>>,
3547    should_validate: bool,
3548    data_type: DataType,
3549}
3550
3551impl StructuralCompositeDecodeArrayTask {
3552    fn restore_validity(
3553        array: Arc<dyn Array>,
3554        unraveler: &mut CompositeRepDefUnraveler,
3555    ) -> Arc<dyn Array> {
3556        let validity = unraveler.unravel_validity(array.len());
3557        let Some(validity) = validity else {
3558            return array;
3559        };
3560        if array.data_type() == &DataType::Null {
3561            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3562            return array;
3563        }
3564        assert_eq!(validity.len(), array.len());
3565        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3566        // adding null buffers to the array here
3567        make_array(unsafe {
3568            array
3569                .to_data()
3570                .into_builder()
3571                .nulls(Some(validity))
3572                .build_unchecked()
3573        })
3574    }
3575}
3576
3577impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3578    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3579        let mut arrays = Vec::with_capacity(self.tasks.len());
3580        let mut unravelers = Vec::with_capacity(self.tasks.len());
3581        let mut data_size = 0u64;
3582        for task in self.tasks {
3583            let decoded = task.decode()?;
3584            data_size += decoded.data.data_size();
3585            unravelers.push(decoded.repdef);
3586
3587            let array = make_array(
3588                decoded
3589                    .data
3590                    .into_arrow(self.data_type.clone(), self.should_validate)?,
3591            );
3592
3593            arrays.push(array);
3594        }
3595        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3596        let array = arrow_select::concat::concat(&array_refs)?;
3597        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3598
3599        let array = Self::restore_validity(array, &mut repdef);
3600
3601        Ok(DecodedArray {
3602            array,
3603            repdef,
3604            data_size,
3605        })
3606    }
3607}
3608
3609#[derive(Debug)]
3610pub struct StructuralPrimitiveFieldDecoder {
3611    field: Arc<ArrowField>,
3612    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3613    should_validate: bool,
3614    rows_drained_in_current: u64,
3615}
3616
3617impl StructuralPrimitiveFieldDecoder {
3618    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3619        Self {
3620            field: field.clone(),
3621            page_decoders: VecDeque::new(),
3622            should_validate,
3623            rows_drained_in_current: 0,
3624        }
3625    }
3626}
3627
3628impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3629    fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3630        assert!(child.path.is_empty());
3631        self.page_decoders.push_back(child.decoder);
3632        Ok(())
3633    }
3634
3635    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3636        let mut remaining = num_rows;
3637        let mut tasks = Vec::new();
3638        while remaining > 0 {
3639            let cur_page = self.page_decoders.front_mut().unwrap();
3640            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3641            let to_take = num_in_page.min(remaining);
3642
3643            let task = cur_page.drain(to_take)?;
3644            tasks.push(task);
3645
3646            if to_take == num_in_page {
3647                self.page_decoders.pop_front();
3648                self.rows_drained_in_current = 0;
3649            } else {
3650                self.rows_drained_in_current += to_take;
3651            }
3652
3653            remaining -= to_take;
3654        }
3655        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3656            tasks,
3657            should_validate: self.should_validate,
3658            data_type: self.field.data_type().clone(),
3659        }))
3660    }
3661
3662    fn data_type(&self) -> &DataType {
3663        self.field.data_type()
3664    }
3665}
3666
3667/// The serialized representation of full-zip data
3668struct SerializedFullZip {
3669    /// The zipped values buffer
3670    values: LanceBuffer,
3671    /// The repetition index (only present if there is repetition)
3672    repetition_index: Option<LanceBuffer>,
3673}
3674
3675// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3676// to allow us to store a chunk size in 12 bits.
3677//
3678// If we directly record the size in bytes with 12 bits we would be limited to
3679// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3680// words we can store the # of words instead which gives us 32KiB.  We want
3681// at least 24KiB so we can handle even the worst case of
3682// - 4Ki values compressed into an 8186 byte buffer
3683// - 4 bytes to describe rep & def lengths
3684// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3685//   plan for it)
3686//
3687// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3688// values like offsets to be stored in a mini-block and safely read back out.  It also
3689// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3690// needed).
3691//
3692// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3693// the page" and "aligned to 8 bytes from the start of the file."
3694const MINIBLOCK_ALIGNMENT: usize = 8;
3695
3696/// An encoder for primitive (leaf) arrays
3697///
3698/// This encoder is fairly complicated and follows a number of paths depending
3699/// on the data.
3700///
3701/// First, we convert the validity & offsets information into repetition and
3702/// definition levels.  Then we compress the data itself into a single buffer.
3703///
3704/// If the data is narrow then we encode the data in small chunks (each chunk
3705/// should be a few disk sectors and contains a buffer of repetition, a buffer
3706/// of definition, and a buffer of value data).  This approach is called
3707/// "mini-block".  These mini-blocks are stored into a single data buffer.
3708///
3709/// If the data is wide then we zip together the repetition and definition value
3710/// with the value data into a single buffer.  This approach is called "zipped".
3711///
3712/// If there is any repetition information then we create a repetition index
3713///
3714/// In addition, the compression process may create zero or more metadata buffers.
3715/// For example, a dictionary compression will create dictionary metadata.  Any
3716/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3717/// stored in a separate buffer on disk and read at initialization time.
3718///
3719/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3720/// at (roughly) the end of the file so there is, at most, one read per column of
3721/// metadata per file.
3722pub struct PrimitiveStructuralEncoder {
3723    // Accumulates arrays until we have enough data to justify a disk page
3724    accumulation_queue: AccumulationQueue,
3725
3726    keep_original_array: bool,
3727    support_large_chunk: bool,
3728    accumulated_repdefs: Vec<RepDefBuilder>,
3729    // The compression strategy we will use to compress the data
3730    compression_strategy: Arc<dyn CompressionStrategy>,
3731    column_index: u32,
3732    field: Field,
3733    encoding_metadata: Arc<HashMap<String, String>>,
3734    version: LanceFileVersion,
3735}
3736
3737struct CompressedLevelsChunk {
3738    data: LanceBuffer,
3739    num_levels: u16,
3740}
3741
3742struct CompressedLevels {
3743    data: Vec<CompressedLevelsChunk>,
3744    compression: CompressiveEncoding,
3745    rep_index: Option<LanceBuffer>,
3746}
3747
3748struct SerializedMiniBlockPage {
3749    num_buffers: u64,
3750    data: LanceBuffer,
3751    metadata: LanceBuffer,
3752}
3753
3754#[derive(Debug, Clone, Copy)]
3755struct DictEncodingBudget {
3756    max_dict_entries: u32,
3757    max_encoded_size: usize,
3758}
3759
3760impl PrimitiveStructuralEncoder {
3761    pub fn try_new(
3762        options: &EncodingOptions,
3763        compression_strategy: Arc<dyn CompressionStrategy>,
3764        column_index: u32,
3765        field: Field,
3766        encoding_metadata: Arc<HashMap<String, String>>,
3767    ) -> Result<Self> {
3768        Ok(Self {
3769            accumulation_queue: AccumulationQueue::new(
3770                options.cache_bytes_per_column,
3771                column_index,
3772                options.keep_original_array,
3773            ),
3774            support_large_chunk: options.support_large_chunk(),
3775            keep_original_array: options.keep_original_array,
3776            accumulated_repdefs: Vec::new(),
3777            column_index,
3778            compression_strategy,
3779            field,
3780            encoding_metadata,
3781            version: options.version,
3782        })
3783    }
3784
3785    // TODO: This is a heuristic we may need to tune at some point
3786    //
3787    // As data gets narrow then the "zipping" process gets too expensive
3788    //   and we prefer mini-block
3789    // As data gets wide then the # of values per block shrinks (very wide)
3790    //   data doesn't even fit in a mini-block and the block overhead gets
3791    //   too large and we prefer zipped.
3792    fn is_narrow(data_block: &DataBlock) -> bool {
3793        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3794
3795        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3796            let max_len_array = max_len_array
3797                .as_any()
3798                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3799                .unwrap();
3800            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3801                return true;
3802            }
3803        }
3804        false
3805    }
3806
3807    fn prefers_miniblock(
3808        data_block: &DataBlock,
3809        encoding_metadata: &HashMap<String, String>,
3810    ) -> bool {
3811        // If the user specifically requested miniblock then use it
3812        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3813            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3814        }
3815        // Otherwise only use miniblock if it is narrow
3816        Self::is_narrow(data_block)
3817    }
3818
3819    /// Checks if the rep/def levels are too sparse for miniblock encoding.
3820    ///
3821    /// Miniblock chunks are limited to ~32KiB total. Data can use up to ~16KiB,
3822    /// leaving ~16KiB for both rep and def buffers combined. Each chunk has at most
3823    /// MAX_MINIBLOCK_VALUES (4096) data values, but when data has many empty/null
3824    /// lists, the number of rep/def levels can far exceed the number of data values
3825    /// (each empty list adds a level entry with no corresponding data value).
3826    ///
3827    /// We estimate the compressed bits per level by computing the max value in each
3828    /// buffer and taking ceil(log2(max_val + 1)) — the minimum bits needed to
3829    /// bitpack each level. We then calculate the maximum number of levels that fit
3830    /// in 16KiB and compare against the actual levels-to-values ratio.
3831    fn repdef_too_sparse_for_miniblock(
3832        repdef: &crate::repdef::SerializedRepDefs,
3833        num_values: u64,
3834    ) -> bool {
3835        if num_values == 0 {
3836            return false;
3837        }
3838        let num_levels = repdef
3839            .repetition_levels
3840            .as_ref()
3841            .map(|r| r.len() as u64)
3842            .max(repdef.definition_levels.as_ref().map(|d| d.len() as u64))
3843            .unwrap_or(0);
3844        if num_levels == 0 {
3845            return false;
3846        }
3847
3848        // Compute bits needed per level for each buffer (ceil of log2(max+1))
3849        let bits_per_rep = repdef
3850            .repetition_levels
3851            .as_ref()
3852            .and_then(|r| r.iter().max().copied())
3853            .map(|max_val| u16::BITS - max_val.leading_zeros())
3854            .unwrap_or(0) as u64;
3855        let bits_per_def = repdef
3856            .definition_levels
3857            .as_ref()
3858            .and_then(|d| d.iter().max().copied())
3859            .map(|max_val| u16::BITS - max_val.leading_zeros())
3860            .unwrap_or(0) as u64;
3861
3862        let bits_per_level = bits_per_rep + bits_per_def;
3863        if bits_per_level == 0 {
3864            return false;
3865        }
3866
3867        // 16KiB budget for rep+def combined (half the ~32KiB chunk limit)
3868        const REPDEF_BUDGET_BITS: u64 = 16 * 1024 * 8;
3869        let max_levels_per_chunk = REPDEF_BUDGET_BITS / bits_per_level;
3870
3871        // A chunk has at most MAX_MINIBLOCK_VALUES data values. The levels-to-values
3872        // ratio tells us how many levels a chunk of that size would need.
3873        let levels_per_chunk =
3874            (num_levels as f64 / num_values as f64) * *miniblock::MAX_MINIBLOCK_VALUES as f64;
3875
3876        levels_per_chunk > max_levels_per_chunk as f64
3877    }
3878
3879    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3880        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3881        // user specifically requested not to use it (in which case we're probably going
3882        // to emit an error)
3883        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3884            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3885        }
3886        true
3887    }
3888
3889    // Converts value data, repetition levels, and definition levels into a single
3890    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3891    // which tells us the size of each block.  Finally, if repetition is present then
3892    // we also create a buffer for the repetition index.
3893    //
3894    // Each chunk is serialized as:
3895    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3896    //
3897    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3898    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3899    //
3900    // Each block has a u16 word of metadata.  The upper 12 bits contain the
3901    // # of 8-byte words in the block (if the block does not fill the final word
3902    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3903    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3904    // 0xA)  All blocks except the last must have power-of-two number of values.
3905    // This not only makes metadata smaller but it makes decoding easier since
3906    // batch sizes are typically a power of 2.  4 bits would allow us to express
3907    // up to 16Ki values but we restrict this further to 4Ki values.
3908    //
3909    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3910    //
3911    // All metadata words are serialized (as little endian) into a single buffer
3912    // of metadata values.
3913    //
3914    // If there is repetition then we also create a repetition index.  This is a
3915    // single buffer of integer vectors (stored in row major order).  There is one
3916    // entry for each chunk.  The size of the vector is based on the depth of random
3917    // access we want to support.
3918    //
3919    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3920    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3921    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3922    // levels of nested access and so on.
3923    //
3924    // The first number in the vector is the number of top-level rows that complete in
3925    // the chunk.  The second number is the number of second-level rows that complete
3926    // after the final top-level row completed (or beginning of the chunk if no top-level
3927    // row completes in the chunk).  And so on.  The final number in the vector is always
3928    // the number of leftover items not covered by earlier entries in the vector.
3929    //
3930    // Currently we are limited to 0 levels of nested access but that will change in the
3931    // future.
3932    //
3933    // The repetition index and the chunk metadata are read at initialization time and
3934    // cached in memory.
3935    fn serialize_miniblocks(
3936        miniblocks: MiniBlockCompressed,
3937        rep: Option<Vec<CompressedLevelsChunk>>,
3938        def: Option<Vec<CompressedLevelsChunk>>,
3939        support_large_chunk: bool,
3940    ) -> Result<SerializedMiniBlockPage> {
3941        let bytes_rep = rep
3942            .as_ref()
3943            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3944            .unwrap_or(0);
3945        let bytes_def = def
3946            .as_ref()
3947            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3948            .unwrap_or(0);
3949        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3950        let mut num_buffers = miniblocks.data.len();
3951        if rep.is_some() {
3952            num_buffers += 1;
3953        }
3954        if def.is_some() {
3955            num_buffers += 1;
3956        }
3957        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3958        let max_extra = 9 * num_buffers;
3959        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3960        let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3961        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3962
3963        let mut rep_iter = rep.map(|r| r.into_iter());
3964        let mut def_iter = def.map(|d| d.into_iter());
3965
3966        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3967        for chunk in miniblocks.chunks {
3968            let start_pos = data_buffer.len();
3969            // Start of chunk should be aligned
3970            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3971
3972            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3973            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3974
3975            // Write the number of levels, or 0 if there is no rep/def
3976            let num_levels = rep
3977                .as_ref()
3978                .map(|r| r.num_levels)
3979                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3980            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3981
3982            // Write the buffer lengths
3983            if let Some(rep) = rep.as_ref() {
3984                let bytes_rep = u16::try_from(rep.data.len()).map_err(|_| {
3985                    Error::internal(format!(
3986                        "Repetition buffer size ({} bytes) too large",
3987                        rep.data.len()
3988                    ))
3989                })?;
3990                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3991            }
3992            if let Some(def) = def.as_ref() {
3993                let bytes_def = u16::try_from(def.data.len()).map_err(|_| {
3994                    Error::internal(format!(
3995                        "Definition buffer size ({} bytes) too large",
3996                        def.data.len()
3997                    ))
3998                })?;
3999                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
4000            }
4001
4002            if support_large_chunk {
4003                for &buffer_size in &chunk.buffer_sizes {
4004                    data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
4005                }
4006            } else {
4007                for &buffer_size in &chunk.buffer_sizes {
4008                    data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
4009                }
4010            }
4011
4012            // Pad
4013            let add_padding = |data_buffer: &mut Vec<u8>| {
4014                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
4015                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
4016            };
4017            add_padding(&mut data_buffer);
4018
4019            // Write the buffers themselves
4020            if let Some(rep) = rep.as_ref() {
4021                data_buffer.extend_from_slice(&rep.data);
4022                add_padding(&mut data_buffer);
4023            }
4024            if let Some(def) = def.as_ref() {
4025                data_buffer.extend_from_slice(&def.data);
4026                add_padding(&mut data_buffer);
4027            }
4028            for (buffer_size, (buffer, buffer_offset)) in chunk
4029                .buffer_sizes
4030                .iter()
4031                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
4032            {
4033                let start = *buffer_offset;
4034                let end = start + *buffer_size as usize;
4035                *buffer_offset += *buffer_size as usize;
4036                data_buffer.extend_from_slice(&buffer[start..end]);
4037                add_padding(&mut data_buffer);
4038            }
4039
4040            let chunk_bytes = data_buffer.len() - start_pos;
4041            let max_chunk_size = if support_large_chunk {
4042                4 * 1024 * 1024 * 1024 // 4GB limit with u32 metadata
4043            } else {
4044                32 * 1024 // 32KiB limit with u16 metadata
4045            };
4046            assert!(chunk_bytes <= max_chunk_size);
4047            assert!(chunk_bytes > 0);
4048            assert_eq!(chunk_bytes % 8, 0);
4049            // 4Ki values max
4050            assert!(chunk.log_num_values <= 12);
4051            // We subtract 1 here from chunk_bytes because we want to be able to express
4052            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
4053            // 0xFFF
4054            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
4055            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
4056
4057            let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
4058            if support_large_chunk {
4059                meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
4060            } else {
4061                meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
4062            }
4063        }
4064
4065        let data_buffer = LanceBuffer::from(data_buffer);
4066        let metadata_buffer = LanceBuffer::from(meta_buffer);
4067
4068        Ok(SerializedMiniBlockPage {
4069            num_buffers: miniblocks.data.len() as u64,
4070            data: data_buffer,
4071            metadata: metadata_buffer,
4072        })
4073    }
4074
4075    /// Compresses a buffer of levels into chunks
4076    ///
4077    /// If these are repetition levels then we also calculate the repetition index here (that
4078    /// is the third return value)
4079    fn compress_levels(
4080        mut levels: RepDefSlicer<'_>,
4081        num_elements: u64,
4082        compression_strategy: &dyn CompressionStrategy,
4083        chunks: &[MiniBlockChunk],
4084        // This will be 0 if we are compressing def levels
4085        max_rep: u16,
4086    ) -> Result<CompressedLevels> {
4087        let mut rep_index = if max_rep > 0 {
4088            Vec::with_capacity(chunks.len())
4089        } else {
4090            vec![]
4091        };
4092        // Make the levels into a FixedWidth data block
4093        let num_levels = levels.num_levels() as u64;
4094        let levels_buf = levels.all_levels().clone();
4095
4096        let mut fixed_width_block = FixedWidthDataBlock {
4097            data: levels_buf,
4098            bits_per_value: 16,
4099            num_values: num_levels,
4100            block_info: BlockInfo::new(),
4101        };
4102        // Compute statistics to enable optimal compression for rep/def levels
4103        fixed_width_block.compute_stat();
4104
4105        let levels_block = DataBlock::FixedWidth(fixed_width_block);
4106        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4107        // Pick a block compressor
4108        let (compressor, compressor_desc) =
4109            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4110        // Compress blocks of levels (sized according to the chunks)
4111        let mut level_chunks = Vec::with_capacity(chunks.len());
4112        let mut values_counter = 0;
4113        for (chunk_idx, chunk) in chunks.iter().enumerate() {
4114            let chunk_num_values = chunk.num_values(values_counter, num_elements);
4115            debug_assert!(chunk_num_values > 0);
4116            values_counter += chunk_num_values;
4117            let chunk_levels = if chunk_idx < chunks.len() - 1 {
4118                levels.slice_next(chunk_num_values as usize)
4119            } else {
4120                levels.slice_rest()
4121            };
4122            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
4123            if max_rep > 0 {
4124                // If max_rep > 0 then we are working with rep levels and we need
4125                // to calculate the repetition index.  The repetition index for a
4126                // chunk is currently 2 values (in the future it may be more).
4127                //
4128                // The first value is the number of rows that _finish_ in the
4129                // chunk.
4130                //
4131                // The second value is the number of "leftovers" after the last
4132                // finished row in the chunk.
4133                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
4134                let rep_values = rep_values.as_ref();
4135
4136                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
4137                // will count it in the previous chunk)
4138                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
4139                let num_leftovers = if chunk_idx < chunks.len() - 1 {
4140                    rep_values
4141                        .iter()
4142                        .rev()
4143                        .position(|v| *v == max_rep)
4144                        // # of leftovers includes the max_rep spot
4145                        .map(|pos| pos + 1)
4146                        .unwrap_or(rep_values.len())
4147                } else {
4148                    // Last chunk can't have leftovers
4149                    0
4150                };
4151
4152                if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
4153                    // This chunk starts with a new row and so, if we thought we had leftovers
4154                    // in the previous chunk, we were mistaken
4155                    // TODO: Can use unchecked here
4156                    let rep_len = rep_index.len();
4157                    if rep_index[rep_len - 1] != 0 {
4158                        // We thought we had leftovers but that was actually a full row
4159                        rep_index[rep_len - 2] += 1;
4160                        rep_index[rep_len - 1] = 0;
4161                    }
4162                }
4163
4164                if chunk_idx == chunks.len() - 1 {
4165                    // The final list
4166                    num_rows += 1;
4167                }
4168                rep_index.push(num_rows as u64);
4169                rep_index.push(num_leftovers as u64);
4170            }
4171            let mut chunk_fixed_width = FixedWidthDataBlock {
4172                data: chunk_levels,
4173                bits_per_value: 16,
4174                num_values: num_chunk_levels,
4175                block_info: BlockInfo::new(),
4176            };
4177            chunk_fixed_width.compute_stat();
4178            let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
4179            let compressed_levels = compressor.compress(chunk_levels_block)?;
4180            level_chunks.push(CompressedLevelsChunk {
4181                data: compressed_levels,
4182                num_levels: num_chunk_levels as u16,
4183            });
4184        }
4185        debug_assert_eq!(levels.num_levels_remaining(), 0);
4186        let rep_index = if rep_index.is_empty() {
4187            None
4188        } else {
4189            Some(LanceBuffer::reinterpret_vec(rep_index))
4190        };
4191        Ok(CompressedLevels {
4192            data: level_chunks,
4193            compression: compressor_desc,
4194            rep_index,
4195        })
4196    }
4197
4198    fn encode_simple_all_null(
4199        column_idx: u32,
4200        num_rows: u64,
4201        row_number: u64,
4202    ) -> Result<EncodedPage> {
4203        let description =
4204            ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
4205        Ok(EncodedPage {
4206            column_idx,
4207            data: vec![],
4208            description: PageEncoding::Structural(description),
4209            num_rows,
4210            row_number,
4211        })
4212    }
4213
4214    fn encode_complex_all_null_vals(
4215        data: &Arc<[u16]>,
4216        compression_strategy: &dyn CompressionStrategy,
4217    ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
4218        let buffer = LanceBuffer::reinterpret_slice(data.clone());
4219        let mut fixed_width_block = FixedWidthDataBlock {
4220            data: buffer,
4221            bits_per_value: 16,
4222            num_values: data.len() as u64,
4223            block_info: BlockInfo::new(),
4224        };
4225        fixed_width_block.compute_stat();
4226
4227        let levels_block = DataBlock::FixedWidth(fixed_width_block);
4228        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4229        let (compressor, encoding) =
4230            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4231        let compressed_buffer = compressor.compress(levels_block)?;
4232        Ok((compressed_buffer, encoding))
4233    }
4234
4235    // Encodes a page where all values are null but we have rep/def
4236    // information that we need to store (e.g. to distinguish between
4237    // different kinds of null)
4238    fn encode_complex_all_null(
4239        column_idx: u32,
4240        repdef: crate::repdef::SerializedRepDefs,
4241        row_number: u64,
4242        num_rows: u64,
4243        version: LanceFileVersion,
4244        compression_strategy: &dyn CompressionStrategy,
4245    ) -> Result<EncodedPage> {
4246        if version.resolve() < LanceFileVersion::V2_2 {
4247            let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
4248                LanceBuffer::reinterpret_slice(rep.clone())
4249            } else {
4250                LanceBuffer::empty()
4251            };
4252
4253            let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
4254                LanceBuffer::reinterpret_slice(def.clone())
4255            } else {
4256                LanceBuffer::empty()
4257            };
4258
4259            let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
4260            return Ok(EncodedPage {
4261                column_idx,
4262                data: vec![rep_bytes, def_bytes],
4263                description: PageEncoding::Structural(description),
4264                num_rows,
4265                row_number,
4266            });
4267        }
4268
4269        let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
4270            repdef.repetition_levels.as_ref()
4271        {
4272            let num_values = rep.len() as u64;
4273            let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
4274            (buffer, Some(encoding), num_values)
4275        } else {
4276            (LanceBuffer::empty(), None, 0)
4277        };
4278
4279        let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
4280            repdef.definition_levels.as_ref()
4281        {
4282            let num_values = def.len() as u64;
4283            let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
4284            (buffer, Some(encoding), num_values)
4285        } else {
4286            (LanceBuffer::empty(), None, 0)
4287        };
4288
4289        let description = ProtobufUtils21::compressed_all_null_constant_layout(
4290            &repdef.def_meaning,
4291            rep_encoding,
4292            def_encoding,
4293            num_rep_values,
4294            num_def_values,
4295        );
4296        Ok(EncodedPage {
4297            column_idx,
4298            data: vec![rep_bytes, def_bytes],
4299            description: PageEncoding::Structural(description),
4300            num_rows,
4301            row_number,
4302        })
4303    }
4304
4305    fn leaf_validity(
4306        repdef: &crate::repdef::SerializedRepDefs,
4307        num_values: usize,
4308    ) -> Result<Option<BooleanBuffer>> {
4309        let rep = repdef
4310            .repetition_levels
4311            .as_ref()
4312            .map(|rep| rep.as_ref().to_vec());
4313        let def = repdef
4314            .definition_levels
4315            .as_ref()
4316            .map(|def| def.as_ref().to_vec());
4317        let mut unraveler = RepDefUnraveler::new(
4318            rep,
4319            def,
4320            repdef.def_meaning.clone().into(),
4321            num_values as u64,
4322        );
4323        if unraveler.is_all_valid() {
4324            return Ok(None);
4325        }
4326        let mut validity = BooleanBufferBuilder::new(num_values);
4327        unraveler.unravel_validity(&mut validity);
4328        Ok(Some(validity.finish()))
4329    }
4330
4331    fn is_constant_values(
4332        arrays: &[ArrayRef],
4333        scalar: &ArrayRef,
4334        validity: Option<&BooleanBuffer>,
4335    ) -> Result<bool> {
4336        debug_assert_eq!(scalar.len(), 1);
4337        debug_assert_eq!(scalar.null_count(), 0);
4338
4339        match scalar.data_type() {
4340            DataType::Boolean => {
4341                let mut global_idx = 0usize;
4342                let scalar_val = scalar.as_boolean().value(0);
4343                for arr in arrays {
4344                    let bool_arr = arr.as_boolean();
4345                    for i in 0..arr.len() {
4346                        let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4347                        global_idx += 1;
4348                        if !is_valid {
4349                            continue;
4350                        }
4351                        if bool_arr.value(i) != scalar_val {
4352                            return Ok(false);
4353                        }
4354                    }
4355                }
4356                Ok(true)
4357            }
4358            DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
4359            DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
4360            DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
4361            DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
4362            data_type => {
4363                let mut global_idx = 0usize;
4364                let Some(byte_width) = data_type.byte_width_opt() else {
4365                    return Ok(false);
4366                };
4367                let scalar_data = scalar.to_data();
4368                if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
4369                    return Ok(false);
4370                }
4371                let scalar_bytes = scalar_data.buffers()[0].as_slice();
4372                if scalar_bytes.len() != byte_width {
4373                    return Ok(false);
4374                }
4375
4376                for arr in arrays {
4377                    let data = arr.to_data();
4378                    if data.buffers().is_empty() {
4379                        return Ok(false);
4380                    }
4381                    let buf = data.buffers()[0].as_slice();
4382                    let base = data.offset();
4383                    for i in 0..arr.len() {
4384                        let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4385                        global_idx += 1;
4386                        if !is_valid {
4387                            continue;
4388                        }
4389                        let start = (base + i) * byte_width;
4390                        if buf[start..start + byte_width] != scalar_bytes[..] {
4391                            return Ok(false);
4392                        }
4393                    }
4394                }
4395                Ok(true)
4396            }
4397        }
4398    }
4399
4400    fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
4401        arrays: &[ArrayRef],
4402        scalar: &ArrayRef,
4403        validity: Option<&BooleanBuffer>,
4404    ) -> Result<bool> {
4405        debug_assert_eq!(scalar.len(), 1);
4406        let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
4407        let mut global_idx = 0usize;
4408        for arr in arrays {
4409            let str_arr = arr.as_string::<O>();
4410            for i in 0..arr.len() {
4411                let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4412                global_idx += 1;
4413                if !is_valid {
4414                    continue;
4415                }
4416                if str_arr.value(i).as_bytes() != scalar_val {
4417                    return Ok(false);
4418                }
4419            }
4420        }
4421        Ok(true)
4422    }
4423
4424    fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
4425        arrays: &[ArrayRef],
4426        scalar: &ArrayRef,
4427        validity: Option<&BooleanBuffer>,
4428    ) -> Result<bool> {
4429        debug_assert_eq!(scalar.len(), 1);
4430        let scalar_val = scalar.as_binary::<O>().value(0);
4431        let mut global_idx = 0usize;
4432        for arr in arrays {
4433            let bin_arr = arr.as_binary::<O>();
4434            for i in 0..arr.len() {
4435                let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4436                global_idx += 1;
4437                if !is_valid {
4438                    continue;
4439                }
4440                if bin_arr.value(i) != scalar_val {
4441                    return Ok(false);
4442                }
4443            }
4444        }
4445        Ok(true)
4446    }
4447
4448    fn find_constant_scalar(
4449        arrays: &[ArrayRef],
4450        validity: Option<&BooleanBuffer>,
4451    ) -> Result<Option<ArrayRef>> {
4452        if arrays.is_empty() {
4453            return Ok(None);
4454        }
4455
4456        let global_scalar_idx = if let Some(validity) = validity {
4457            let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
4458                return Ok(None);
4459            };
4460            idx
4461        } else {
4462            0
4463        };
4464
4465        let mut idx_remaining = global_scalar_idx;
4466        let mut scalar_arr_idx = 0usize;
4467        while scalar_arr_idx < arrays.len() {
4468            let len = arrays[scalar_arr_idx].len();
4469            if idx_remaining < len {
4470                break;
4471            }
4472            idx_remaining -= len;
4473            scalar_arr_idx += 1;
4474        }
4475
4476        if scalar_arr_idx >= arrays.len() {
4477            return Ok(None);
4478        }
4479
4480        let scalar =
4481            lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
4482        if scalar.null_count() != 0 {
4483            return Ok(None);
4484        }
4485        if !Self::is_constant_values(arrays, &scalar, validity)? {
4486            return Ok(None);
4487        }
4488        Ok(Some(scalar))
4489    }
4490
4491    fn resolve_dict_values_compression_metadata(
4492        field_metadata: &HashMap<String, String>,
4493        env_compression: Option<String>,
4494        env_compression_level: Option<String>,
4495    ) -> HashMap<String, String> {
4496        let mut metadata = HashMap::new();
4497
4498        let compression = field_metadata
4499            .get(DICT_VALUES_COMPRESSION_META_KEY)
4500            .cloned()
4501            .or(env_compression)
4502            .unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
4503        metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
4504
4505        if let Some(compression_level) = field_metadata
4506            .get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
4507            .cloned()
4508            .or(env_compression_level)
4509        {
4510            metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
4511        }
4512
4513        metadata
4514    }
4515
4516    fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
4517        // This is an internal synthetic field used only to feed metadata into
4518        // `create_block_compressor` for dictionary values. The concrete type/name here
4519        // are not semantically meaningful; we rely on explicit metadata below to control
4520        // general compression selection for dictionary values.
4521        let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
4522        dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
4523            &field.metadata,
4524            env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
4525            env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
4526        );
4527        Ok(dict_values_field)
4528    }
4529
4530    #[allow(clippy::too_many_arguments)]
4531    fn encode_miniblock(
4532        column_idx: u32,
4533        field: &Field,
4534        compression_strategy: &dyn CompressionStrategy,
4535        data: DataBlock,
4536        repdef: crate::repdef::SerializedRepDefs,
4537        row_number: u64,
4538        dictionary_data: Option<DataBlock>,
4539        num_rows: u64,
4540        support_large_chunk: bool,
4541    ) -> Result<EncodedPage> {
4542        if let DataBlock::AllNull(_null_block) = data {
4543            // We should not be using mini-block for all-null.  There are other structural
4544            // encodings for that.
4545            unreachable!()
4546        }
4547
4548        let num_items = data.num_values();
4549
4550        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
4551        let (compressed_data, value_encoding) = compressor.compress(data)?;
4552
4553        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
4554
4555        let mut compressed_rep = repdef
4556            .rep_slicer()
4557            .map(|rep_slicer| {
4558                Self::compress_levels(
4559                    rep_slicer,
4560                    num_items,
4561                    compression_strategy,
4562                    &compressed_data.chunks,
4563                    max_rep,
4564                )
4565            })
4566            .transpose()?;
4567
4568        let (rep_index, rep_index_depth) =
4569            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
4570                Some(rep_index) => (Some(rep_index.clone()), 1),
4571                None => (None, 0),
4572            };
4573
4574        let mut compressed_def = repdef
4575            .def_slicer()
4576            .map(|def_slicer| {
4577                Self::compress_levels(
4578                    def_slicer,
4579                    num_items,
4580                    compression_strategy,
4581                    &compressed_data.chunks,
4582                    /*max_rep=*/ 0,
4583                )
4584            })
4585            .transpose()?;
4586
4587        // TODO: Parquet sparsely encodes values here.  We could do the same but
4588        // then we won't have log2 values per chunk.  This means more metadata
4589        // and potentially more decoder asymmetry.  However, it may be worth
4590        // investigating at some point
4591
4592        let rep_data = compressed_rep
4593            .as_mut()
4594            .map(|cr| std::mem::take(&mut cr.data));
4595        let def_data = compressed_def
4596            .as_mut()
4597            .map(|cd| std::mem::take(&mut cd.data));
4598
4599        let serialized =
4600            Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk)?;
4601
4602        // Metadata, Data, Dictionary, (maybe) Repetition Index
4603        let mut data = Vec::with_capacity(4);
4604        data.push(serialized.metadata);
4605        data.push(serialized.data);
4606
4607        if let Some(dictionary_data) = dictionary_data {
4608            let num_dictionary_items = dictionary_data.num_values();
4609            let dict_values_field = Self::build_dict_values_compressor_field(field)?;
4610
4611            let (compressor, dictionary_encoding) = compression_strategy
4612                .create_block_compressor(&dict_values_field, &dictionary_data)?;
4613            let dictionary_buffer = compressor.compress(dictionary_data)?;
4614
4615            data.push(dictionary_buffer);
4616            if let Some(rep_index) = rep_index {
4617                data.push(rep_index);
4618            }
4619
4620            let description = ProtobufUtils21::miniblock_layout(
4621                compressed_rep.map(|cr| cr.compression),
4622                compressed_def.map(|cd| cd.compression),
4623                value_encoding,
4624                rep_index_depth,
4625                serialized.num_buffers,
4626                Some((dictionary_encoding, num_dictionary_items)),
4627                &repdef.def_meaning,
4628                num_items,
4629                support_large_chunk,
4630            );
4631            Ok(EncodedPage {
4632                num_rows,
4633                column_idx,
4634                data,
4635                description: PageEncoding::Structural(description),
4636                row_number,
4637            })
4638        } else {
4639            let description = ProtobufUtils21::miniblock_layout(
4640                compressed_rep.map(|cr| cr.compression),
4641                compressed_def.map(|cd| cd.compression),
4642                value_encoding,
4643                rep_index_depth,
4644                serialized.num_buffers,
4645                None,
4646                &repdef.def_meaning,
4647                num_items,
4648                support_large_chunk,
4649            );
4650
4651            if let Some(rep_index) = rep_index {
4652                let view = rep_index.borrow_to_typed_slice::<u64>();
4653                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4654                debug_assert_eq!(total, num_rows);
4655
4656                data.push(rep_index);
4657            }
4658
4659            Ok(EncodedPage {
4660                num_rows,
4661                column_idx,
4662                data,
4663                description: PageEncoding::Structural(description),
4664                row_number,
4665            })
4666        }
4667    }
4668
4669    // For fixed-size data we encode < control word | data > for each value
4670    fn serialize_full_zip_fixed(
4671        fixed: FixedWidthDataBlock,
4672        mut repdef: ControlWordIterator,
4673        num_values: u64,
4674    ) -> SerializedFullZip {
4675        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4676        let mut zipped_data = Vec::with_capacity(len);
4677
4678        let max_rep_index_val = if repdef.has_repetition() {
4679            len as u64
4680        } else {
4681            // Setting this to 0 means we won't write a repetition index
4682            0
4683        };
4684        let mut rep_index_builder =
4685            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4686
4687        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
4688        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
4689        assert_eq!(
4690            fixed.bits_per_value % 8,
4691            0,
4692            "Non-byte aligned full-zip compression not yet supported"
4693        );
4694
4695        let bytes_per_value = fixed.bits_per_value as usize / 8;
4696        let mut offset = 0;
4697
4698        if bytes_per_value == 0 {
4699            // No data, just dump the repdef into the buffer
4700            while let Some(control) = repdef.append_next(&mut zipped_data) {
4701                if control.is_new_row {
4702                    // We have finished a row
4703                    debug_assert!(offset <= len);
4704                    // SAFETY: We know that `start <= len`
4705                    unsafe { rep_index_builder.append(offset as u64) };
4706                }
4707                offset = zipped_data.len();
4708            }
4709        } else {
4710            // We have data, zip it with the repdef
4711            let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4712            while let Some(control) = repdef.append_next(&mut zipped_data) {
4713                if control.is_new_row {
4714                    // We have finished a row
4715                    debug_assert!(offset <= len);
4716                    // SAFETY: We know that `start <= len`
4717                    unsafe { rep_index_builder.append(offset as u64) };
4718                }
4719                if control.is_visible {
4720                    let value = data_iter.next().unwrap();
4721                    zipped_data.extend_from_slice(value);
4722                }
4723                offset = zipped_data.len();
4724            }
4725        }
4726
4727        debug_assert_eq!(zipped_data.len(), len);
4728        // Put the final value in the rep index
4729        // SAFETY: `zipped_data.len() == len`
4730        unsafe {
4731            rep_index_builder.append(zipped_data.len() as u64);
4732        }
4733
4734        let zipped_data = LanceBuffer::from(zipped_data);
4735        let rep_index = rep_index_builder.into_data();
4736        let rep_index = if rep_index.is_empty() {
4737            None
4738        } else {
4739            Some(LanceBuffer::from(rep_index))
4740        };
4741        SerializedFullZip {
4742            values: zipped_data,
4743            repetition_index: rep_index,
4744        }
4745    }
4746
4747    // For variable-size data we encode < control word | length | data > for each value
4748    //
4749    // In addition, we create a second buffer, the repetition index
4750    fn serialize_full_zip_variable(
4751        variable: VariableWidthBlock,
4752        mut repdef: ControlWordIterator,
4753        num_items: u64,
4754    ) -> SerializedFullZip {
4755        let bytes_per_offset = variable.bits_per_offset as usize / 8;
4756        assert_eq!(
4757            variable.bits_per_offset % 8,
4758            0,
4759            "Only byte-aligned offsets supported"
4760        );
4761        let len = variable.data.len()
4762            + repdef.bytes_per_word() * num_items as usize
4763            + bytes_per_offset * variable.num_values as usize;
4764        let mut buf = Vec::with_capacity(len);
4765
4766        let max_rep_index_val = len as u64;
4767        let mut rep_index_builder =
4768            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4769
4770        // TODO: byte pack the item lengths with varint encoding
4771        match bytes_per_offset {
4772            4 => {
4773                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4774                let mut rep_offset = 0;
4775                let mut windows_iter = offs.as_ref().windows(2);
4776                while let Some(control) = repdef.append_next(&mut buf) {
4777                    if control.is_new_row {
4778                        // We have finished a row
4779                        debug_assert!(rep_offset <= len);
4780                        // SAFETY: We know that `buf.len() <= len`
4781                        unsafe { rep_index_builder.append(rep_offset as u64) };
4782                    }
4783                    if control.is_visible {
4784                        let window = windows_iter.next().unwrap();
4785                        if control.is_valid_item {
4786                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4787                            buf.extend_from_slice(
4788                                &variable.data[window[0] as usize..window[1] as usize],
4789                            );
4790                        }
4791                    }
4792                    rep_offset = buf.len();
4793                }
4794            }
4795            8 => {
4796                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4797                let mut rep_offset = 0;
4798                let mut windows_iter = offs.as_ref().windows(2);
4799                while let Some(control) = repdef.append_next(&mut buf) {
4800                    if control.is_new_row {
4801                        // We have finished a row
4802                        debug_assert!(rep_offset <= len);
4803                        // SAFETY: We know that `buf.len() <= len`
4804                        unsafe { rep_index_builder.append(rep_offset as u64) };
4805                    }
4806                    if control.is_visible {
4807                        let window = windows_iter.next().unwrap();
4808                        if control.is_valid_item {
4809                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4810                            buf.extend_from_slice(
4811                                &variable.data[window[0] as usize..window[1] as usize],
4812                            );
4813                        }
4814                    }
4815                    rep_offset = buf.len();
4816                }
4817            }
4818            _ => panic!("Unsupported offset size"),
4819        }
4820
4821        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4822        // if we are over `len` then we have a bug.
4823        debug_assert!(buf.len() <= len);
4824        // Put the final value in the rep index
4825        // SAFETY: `zipped_data.len() == len`
4826        unsafe {
4827            rep_index_builder.append(buf.len() as u64);
4828        }
4829
4830        let zipped_data = LanceBuffer::from(buf);
4831        let rep_index = rep_index_builder.into_data();
4832        debug_assert!(!rep_index.is_empty());
4833        let rep_index = Some(LanceBuffer::from(rep_index));
4834        SerializedFullZip {
4835            values: zipped_data,
4836            repetition_index: rep_index,
4837        }
4838    }
4839
4840    /// Serializes data into a single buffer according to the full-zip format which zips
4841    /// together the repetition, definition, and value data into a single buffer.
4842    fn serialize_full_zip(
4843        compressed_data: PerValueDataBlock,
4844        repdef: ControlWordIterator,
4845        num_items: u64,
4846    ) -> SerializedFullZip {
4847        match compressed_data {
4848            PerValueDataBlock::Fixed(fixed) => {
4849                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4850            }
4851            PerValueDataBlock::Variable(var) => {
4852                Self::serialize_full_zip_variable(var, repdef, num_items)
4853            }
4854        }
4855    }
4856
4857    fn expand_boolean_to_bytes(fixed: FixedWidthDataBlock) -> FixedWidthDataBlock {
4858        debug_assert_eq!(fixed.bits_per_value, 1);
4859        let num_values = fixed.num_values as usize;
4860        let bool_buf = BooleanBuffer::new(fixed.data.into_buffer(), 0, num_values);
4861        let expanded: Vec<u8> = (0..num_values).map(|i| bool_buf.value(i) as u8).collect();
4862        FixedWidthDataBlock {
4863            data: LanceBuffer::from(expanded),
4864            bits_per_value: 8,
4865            num_values: fixed.num_values,
4866            block_info: BlockInfo::new(),
4867        }
4868    }
4869
4870    fn encode_full_zip(
4871        column_idx: u32,
4872        field: &Field,
4873        compression_strategy: &dyn CompressionStrategy,
4874        data: DataBlock,
4875        repdef: crate::repdef::SerializedRepDefs,
4876        row_number: u64,
4877        num_lists: u64,
4878    ) -> Result<EncodedPage> {
4879        let max_rep = repdef
4880            .repetition_levels
4881            .as_ref()
4882            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4883        let max_def = repdef
4884            .definition_levels
4885            .as_ref()
4886            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4887
4888        // To handle FSL we just flatten
4889        // let data = data.flatten();
4890
4891        let (num_items, num_visible_items) =
4892            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4893                // If there are rep levels there may be "invisible" items and we need to encode
4894                // rep_levels.len() things which might be larger than data.num_values()
4895                (rep_levels.len() as u64, data.num_values())
4896            } else {
4897                // If there are no rep levels then we encode data.num_values() things
4898                (data.num_values(), data.num_values())
4899            };
4900
4901        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4902
4903        let repdef_iter = build_control_word_iterator(
4904            repdef.repetition_levels.as_deref(),
4905            max_rep,
4906            repdef.definition_levels.as_deref(),
4907            max_def,
4908            max_visible_def,
4909            num_items as usize,
4910        );
4911        let bits_rep = repdef_iter.bits_rep();
4912        let bits_def = repdef_iter.bits_def();
4913
4914        // Full-zip requires byte-aligned values; expand 1-bit booleans to 1 byte each.
4915        let data = match data {
4916            DataBlock::FixedWidth(fixed) if fixed.bits_per_value == 1 => {
4917                DataBlock::FixedWidth(Self::expand_boolean_to_bytes(fixed))
4918            }
4919            other => other,
4920        };
4921
4922        let compressor = compression_strategy.create_per_value(field, &data)?;
4923        let (compressed_data, value_encoding) = compressor.compress(data)?;
4924
4925        let description = match &compressed_data {
4926            PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4927                bits_rep,
4928                bits_def,
4929                fixed.bits_per_value as u32,
4930                value_encoding,
4931                &repdef.def_meaning,
4932                num_items as u32,
4933                num_visible_items as u32,
4934            ),
4935            PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4936                bits_rep,
4937                bits_def,
4938                variable.bits_per_offset as u32,
4939                value_encoding,
4940                &repdef.def_meaning,
4941                num_items as u32,
4942                num_visible_items as u32,
4943            ),
4944        };
4945
4946        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4947
4948        let data = if let Some(repindex) = zipped.repetition_index {
4949            vec![zipped.values, repindex]
4950        } else {
4951            vec![zipped.values]
4952        };
4953
4954        Ok(EncodedPage {
4955            num_rows: num_lists,
4956            column_idx,
4957            data,
4958            description: PageEncoding::Structural(description),
4959            row_number,
4960        })
4961    }
4962
4963    fn should_dictionary_encode(
4964        data_block: &DataBlock,
4965        field: &Field,
4966        version: LanceFileVersion,
4967    ) -> Option<DictEncodingBudget> {
4968        const DEFAULT_SAMPLE_SIZE: usize = 4096;
4969        const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
4970
4971        // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip
4972        // estimating the size for other types.
4973        match data_block {
4974            DataBlock::FixedWidth(fixed) => {
4975                if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4976                    return None;
4977                }
4978                if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4979                    return None;
4980                }
4981                if fixed.bits_per_value % 8 != 0 {
4982                    return None;
4983                }
4984            }
4985            DataBlock::VariableWidth(var) => {
4986                if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4987                    return None;
4988                }
4989            }
4990            _ => return None,
4991        }
4992
4993        // Don't dictionary encode tiny arrays.
4994        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4995            .ok()
4996            .and_then(|val| val.parse().ok())
4997            .unwrap_or(100);
4998        if data_block.num_values() < too_small {
4999            return None;
5000        }
5001
5002        let num_values = data_block.num_values();
5003
5004        // Apply divisor threshold and cap. This is intentionally conservative: the goal is to
5005        // avoid spending too much CPU trying to estimate very high cardinalities.
5006        let divisor: u64 = field
5007            .metadata
5008            .get(DICT_DIVISOR_META_KEY)
5009            .and_then(|val| val.parse().ok())
5010            .or_else(|| {
5011                env::var("LANCE_ENCODING_DICT_DIVISOR")
5012                    .ok()
5013                    .and_then(|val| val.parse().ok())
5014            })
5015            .unwrap_or(DEFAULT_DICT_DIVISOR);
5016
5017        let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
5018            .ok()
5019            .and_then(|val| val.parse().ok())
5020            .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
5021
5022        let threshold_cardinality = num_values
5023            .checked_div(divisor.max(1))
5024            .unwrap_or(0)
5025            .min(max_cardinality);
5026        if threshold_cardinality == 0 {
5027            return None;
5028        }
5029
5030        // Get size ratio from metadata or env var.
5031        let threshold_ratio = field
5032            .metadata
5033            .get(DICT_SIZE_RATIO_META_KEY)
5034            .and_then(|val| val.parse::<f64>().ok())
5035            .or_else(|| {
5036                env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
5037                    .ok()
5038                    .and_then(|val| val.parse().ok())
5039            })
5040            .unwrap_or(DEFAULT_DICT_SIZE_RATIO);
5041
5042        if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
5043            panic!(
5044                "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
5045                threshold_ratio
5046            );
5047        }
5048
5049        let data_size = data_block.data_size();
5050        if data_size == 0 {
5051            return None;
5052        }
5053
5054        let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
5055        let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
5056
5057        // Avoid probing dictionary encoding on data that appears to be near-unique.
5058        if Self::sample_is_near_unique(
5059            data_block,
5060            DEFAULT_SAMPLE_SIZE,
5061            DEFAULT_SAMPLE_UNIQUE_RATIO,
5062        )? {
5063            return None;
5064        }
5065
5066        let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
5067        Some(DictEncodingBudget {
5068            max_dict_entries,
5069            max_encoded_size,
5070        })
5071    }
5072
5073    /// Probe whether a page looks near-unique before attempting dictionary encoding.
5074    ///
5075    /// The probe uses deterministic stride sampling (not RNG sampling), which keeps
5076    /// the check cheap and reproducible across runs. The result is only a gate for
5077    /// whether we try dictionary encoding, not a cardinality statistic.
5078    fn sample_is_near_unique(
5079        data_block: &DataBlock,
5080        max_samples: usize,
5081        unique_ratio_threshold: f64,
5082    ) -> Option<bool> {
5083        use std::collections::HashSet;
5084
5085        if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 {
5086            return None;
5087        }
5088
5089        let num_values = usize::try_from(data_block.num_values()).ok()?;
5090        if num_values == 0 {
5091            return Some(false);
5092        }
5093
5094        let sample_count = num_values.min(max_samples).max(1);
5095        // Uniform stride sampling across the page.
5096        let step = (num_values / sample_count).max(1);
5097
5098        match data_block {
5099            DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
5100                64 => {
5101                    let values = fixed.data.borrow_to_typed_slice::<u64>();
5102                    let values = values.as_ref();
5103                    let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5104                    for idx in (0..num_values).step_by(step).take(sample_count) {
5105                        unique.insert(values.get(idx).copied()?);
5106                    }
5107                    let ratio = unique.len() as f64 / sample_count as f64;
5108                    // Avoid overreacting to tiny pages with too few samples.
5109                    Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5110                }
5111                128 => {
5112                    let values = fixed.data.borrow_to_typed_slice::<u128>();
5113                    let values = values.as_ref();
5114                    let mut unique: HashSet<u128> = HashSet::with_capacity(sample_count.min(1024));
5115                    for idx in (0..num_values).step_by(step).take(sample_count) {
5116                        unique.insert(values.get(idx).copied()?);
5117                    }
5118                    let ratio = unique.len() as f64 / sample_count as f64;
5119                    Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5120                }
5121                _ => Some(false),
5122            },
5123            DataBlock::VariableWidth(var) => {
5124                use xxhash_rust::xxh3::xxh3_64;
5125
5126                // Hash variable-width slices instead of storing borrowed slice keys.
5127                let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
5128                match var.bits_per_offset {
5129                    32 => {
5130                        let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
5131                        let offsets: &[u32] = offsets_ref.as_ref();
5132                        for i in (0..num_values).step_by(step).take(sample_count) {
5133                            let start = usize::try_from(*offsets.get(i)?).ok()?;
5134                            let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5135                            if start > end || end > var.data.len() {
5136                                return None;
5137                            }
5138                            unique.insert(xxh3_64(&var.data[start..end]));
5139                        }
5140                    }
5141                    64 => {
5142                        let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
5143                        let offsets: &[u64] = offsets_ref.as_ref();
5144                        for i in (0..num_values).step_by(step).take(sample_count) {
5145                            let start = usize::try_from(*offsets.get(i)?).ok()?;
5146                            let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5147                            if start > end || end > var.data.len() {
5148                                return None;
5149                            }
5150                            unique.insert(xxh3_64(&var.data[start..end]));
5151                        }
5152                    }
5153                    _ => return Some(false),
5154                }
5155                let ratio = unique.len() as f64 / sample_count as f64;
5156                Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
5157            }
5158            _ => Some(false),
5159        }
5160    }
5161
5162    // Creates an encode task, consuming all buffered data
5163    fn do_flush(
5164        &mut self,
5165        arrays: Vec<ArrayRef>,
5166        repdefs: Vec<RepDefBuilder>,
5167        row_number: u64,
5168        num_rows: u64,
5169    ) -> Result<Vec<EncodeTask>> {
5170        let column_idx = self.column_index;
5171        let compression_strategy = self.compression_strategy.clone();
5172        let field = self.field.clone();
5173        let encoding_metadata = self.encoding_metadata.clone();
5174        let support_large_chunk = self.support_large_chunk;
5175        let version = self.version;
5176        let task = spawn_cpu(move || {
5177            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5178            let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
5179            let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
5180            let repdef = RepDefBuilder::serialize(repdefs);
5181
5182            if num_values == 0 {
5183                // We should not encode empty arrays.  So if we get here that should mean that we
5184                // either have all empty lists or all null lists (or a mix).  We still need to encode
5185                // the rep/def information but we can skip the data encoding.
5186                log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
5187                return Self::encode_complex_all_null(
5188                    column_idx,
5189                    repdef,
5190                    row_number,
5191                    num_rows,
5192                    version,
5193                    compression_strategy.as_ref(),
5194                );
5195            }
5196
5197            let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
5198            let all_null = leaf_validity
5199                .as_ref()
5200                .map(|validity| validity.count_set_bits() == 0)
5201                .unwrap_or(false);
5202
5203            if all_null {
5204                return if is_simple_validity {
5205                    log::debug!(
5206                        "Encoding column {} with {} items ({} rows) using simple-null layout",
5207                        column_idx,
5208                        num_values,
5209                        num_rows
5210                    );
5211                    Self::encode_simple_all_null(column_idx, num_values, row_number)
5212                } else {
5213                    log::debug!(
5214                        "Encoding column {} with {} items ({} rows) using complex-null layout",
5215                        column_idx,
5216                        num_values,
5217                        num_rows
5218                    );
5219                    Self::encode_complex_all_null(
5220                        column_idx,
5221                        repdef,
5222                        row_number,
5223                        num_rows,
5224                        version,
5225                        compression_strategy.as_ref(),
5226                    )
5227                };
5228            }
5229
5230            if let DataType::Struct(fields) = &field.data_type()
5231                && fields.is_empty()
5232            {
5233                if has_repdef_info {
5234                    return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported.  The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
5235                }
5236                // This is maybe a little confusing but the reader should never look at this anyways and it
5237                // seems like overkill to invent a new layout just for "empty structs".
5238                return Self::encode_simple_all_null(column_idx, num_values, row_number);
5239            }
5240
5241            let data_block = DataBlock::from_arrays(&arrays, num_values);
5242
5243            if version.resolve() >= LanceFileVersion::V2_2
5244                && let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
5245            {
5246                log::debug!(
5247                    "Encoding column {} with {} items ({} rows) using constant layout",
5248                    column_idx,
5249                    num_values,
5250                    num_rows
5251                );
5252                return constant::encode_constant_page(
5253                    column_idx,
5254                    scalar,
5255                    repdef,
5256                    row_number,
5257                    num_rows,
5258                );
5259            }
5260
5261            let requires_full_zip_packed_struct =
5262                if let DataBlock::Struct(ref struct_data_block) = data_block {
5263                    struct_data_block.has_variable_width_child()
5264                } else {
5265                    false
5266                };
5267
5268            if requires_full_zip_packed_struct {
5269                log::debug!(
5270                    "Encoding column {} with {} items using full-zip packed struct layout",
5271                    column_idx,
5272                    num_values
5273                );
5274                return Self::encode_full_zip(
5275                    column_idx,
5276                    &field,
5277                    compression_strategy.as_ref(),
5278                    data_block,
5279                    repdef,
5280                    row_number,
5281                    num_rows,
5282                );
5283            }
5284
5285            // If the rep/def levels are too sparse for miniblock (e.g. many empty
5286            // lists with very few values), fall back to fullzip to avoid exceeding
5287            // the u16 per-chunk rep/def buffer size limit.
5288            let too_sparse = Self::repdef_too_sparse_for_miniblock(&repdef, num_values);
5289
5290            if !too_sparse {
5291                if let DataBlock::Dictionary(dict) = data_block {
5292                    log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
5293                    let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
5294                    // TODO: https://github.com/lancedb/lance/issues/4809
5295                    // If we compute stats on dictionary_data_block => panic.
5296                    // If we don't compute stats on indices_data_block => panic.
5297                    // This is messy.  Don't make me call compute_stat ever.
5298                    indices_data_block.compute_stat();
5299                    return Self::encode_miniblock(
5300                        column_idx,
5301                        &field,
5302                        compression_strategy.as_ref(),
5303                        indices_data_block,
5304                        repdef,
5305                        row_number,
5306                        Some(dictionary_data_block),
5307                        num_rows,
5308                        support_large_chunk,
5309                    );
5310                }
5311            } else {
5312                log::debug!(
5313                    "Encoding column {} with {} items using full-zip layout \
5314                     (rep/def too sparse for mini-block)",
5315                    column_idx,
5316                    num_values
5317                );
5318            }
5319
5320            {
5321                // Try dictionary encoding first if applicable. If encoding aborts, fall back to the
5322                // preferred structural encoding.
5323                let dict_result = if too_sparse {
5324                    None
5325                } else {
5326                    Self::should_dictionary_encode(&data_block, &field, version)
5327                        .and_then(|budget| {
5328                            log::debug!(
5329                                "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
5330                                column_idx,
5331                                num_values
5332                            );
5333                            dict::dictionary_encode(
5334                                &data_block,
5335                                budget.max_dict_entries,
5336                                budget.max_encoded_size,
5337                            )
5338                        })
5339                };
5340
5341                if let Some((indices_data_block, dictionary_data_block)) = dict_result {
5342                    Self::encode_miniblock(
5343                        column_idx,
5344                        &field,
5345                        compression_strategy.as_ref(),
5346                        indices_data_block,
5347                        repdef,
5348                        row_number,
5349                        Some(dictionary_data_block),
5350                        num_rows,
5351                        support_large_chunk,
5352                    )
5353                } else if !too_sparse && Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
5354                    log::debug!(
5355                        "Encoding column {} with {} items using mini-block layout",
5356                        column_idx,
5357                        num_values
5358                    );
5359                    Self::encode_miniblock(
5360                        column_idx,
5361                        &field,
5362                        compression_strategy.as_ref(),
5363                        data_block,
5364                        repdef,
5365                        row_number,
5366                        None,
5367                        num_rows,
5368                        support_large_chunk,
5369                    )
5370                } else if too_sparse || Self::prefers_fullzip(encoding_metadata.as_ref()) {
5371                    log::debug!(
5372                        "Encoding column {} with {} items using full-zip layout",
5373                        column_idx,
5374                        num_values
5375                    );
5376                    Self::encode_full_zip(
5377                        column_idx,
5378                        &field,
5379                        compression_strategy.as_ref(),
5380                        data_block,
5381                        repdef,
5382                        row_number,
5383                        num_rows,
5384                    )
5385                } else {
5386                    Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
5387                }
5388            }
5389        })
5390        .boxed();
5391        Ok(vec![task])
5392    }
5393
5394    fn extract_validity_buf(
5395        array: Arc<dyn Array>,
5396        repdef: &mut RepDefBuilder,
5397        keep_original_array: bool,
5398    ) -> Result<Arc<dyn Array>> {
5399        if let Some(validity) = array.nulls() {
5400            if keep_original_array {
5401                repdef.add_validity_bitmap(validity.clone());
5402            } else {
5403                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
5404            }
5405            let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
5406            Ok(make_array(data_no_nulls))
5407        } else {
5408            repdef.add_no_null(array.len());
5409            Ok(array)
5410        }
5411    }
5412
5413    fn extract_validity(
5414        mut array: Arc<dyn Array>,
5415        repdef: &mut RepDefBuilder,
5416        keep_original_array: bool,
5417    ) -> Result<Arc<dyn Array>> {
5418        match array.data_type() {
5419            DataType::Null => {
5420                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
5421                Ok(array)
5422            }
5423            DataType::Dictionary(_, _) => {
5424                array = dict::normalize_dict_nulls(array)?;
5425                Self::extract_validity_buf(array, repdef, keep_original_array)
5426            }
5427            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
5428            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
5429            // NOT want to increase the repdef depth.
5430            //
5431            // This would be quite catasrophic for something like vector embeddings.  Imagine we
5432            // had thousands of vectors and some were null but no vector contained null items.  If
5433            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
5434            // with a rep/def value for every single item in the vector.
5435            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
5436        }
5437    }
5438}
5439
5440impl FieldEncoder for PrimitiveStructuralEncoder {
5441    // Buffers data, if there is enough to write a page then we create an encode task
5442    fn maybe_encode(
5443        &mut self,
5444        array: ArrayRef,
5445        _external_buffers: &mut OutOfLineBuffers,
5446        mut repdef: RepDefBuilder,
5447        row_number: u64,
5448        num_rows: u64,
5449    ) -> Result<Vec<EncodeTask>> {
5450        let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
5451        self.accumulated_repdefs.push(repdef);
5452
5453        if let Some((arrays, row_number, num_rows)) =
5454            self.accumulation_queue.insert(array, row_number, num_rows)
5455        {
5456            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5457            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5458        } else {
5459            Ok(vec![])
5460        }
5461    }
5462
5463    // If there is any data left in the buffer then create an encode task from it
5464    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
5465        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
5466            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5467            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5468        } else {
5469            Ok(vec![])
5470        }
5471    }
5472
5473    fn num_columns(&self) -> u32 {
5474        1
5475    }
5476
5477    fn finish(
5478        &mut self,
5479        _external_buffers: &mut OutOfLineBuffers,
5480    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
5481        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
5482    }
5483}
5484
5485#[cfg(test)]
5486#[allow(clippy::single_range_in_vec_init)]
5487mod tests {
5488    use super::{
5489        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
5490        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
5491        FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
5492        PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
5493    };
5494    use crate::buffer::LanceBuffer;
5495    use crate::compression::DefaultDecompressionStrategy;
5496    use crate::constants::{
5497        COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
5498        DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
5499        STRUCTURAL_ENCODING_MINIBLOCK,
5500    };
5501    use crate::data::BlockInfo;
5502    use crate::decoder::PageEncoding;
5503    use crate::encodings::logical::primitive::{
5504        ChunkDrainInstructions, PrimitiveStructuralEncoder,
5505    };
5506    use crate::format::ProtobufUtils21;
5507    use crate::format::pb21;
5508    use crate::format::pb21::compressive_encoding::Compression;
5509    use crate::testing::{TestCases, check_round_trip_encoding_of_data};
5510    use crate::version::LanceFileVersion;
5511    use arrow_array::{ArrayRef, Int8Array, StringArray};
5512    use arrow_schema::DataType;
5513    use std::collections::HashMap;
5514    use std::{collections::VecDeque, sync::Arc};
5515
5516    #[test]
5517    fn test_is_narrow() {
5518        let int8_array = Int8Array::from(vec![1, 2, 3]);
5519        let array_ref: ArrayRef = Arc::new(int8_array);
5520        let block = DataBlock::from_array(array_ref);
5521
5522        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5523
5524        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
5525        let block = DataBlock::from_array(string_array);
5526        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5527
5528        let string_array = StringArray::from(vec![
5529            Some("hello world".repeat(100)),
5530            Some("world".to_string()),
5531        ]);
5532        let block = DataBlock::from_array(string_array);
5533        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
5534    }
5535
5536    #[test]
5537    fn test_map_range() {
5538        // Null in the middle
5539        // [[A, B, C], [D, E], NULL, [F, G, H]]
5540        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
5541        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
5542        let max_visible_def = 0;
5543        let total_items = 8;
5544        let max_rep = 1;
5545
5546        let check = |range, expected_item_range, expected_level_range| {
5547            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5548                range,
5549                rep.as_ref(),
5550                def.as_ref(),
5551                max_rep,
5552                max_visible_def,
5553                total_items,
5554                PreambleAction::Absent,
5555            );
5556            assert_eq!(item_range, expected_item_range);
5557            assert_eq!(level_range, expected_level_range);
5558        };
5559
5560        check(0..1, 0..3, 0..3);
5561        check(1..2, 3..5, 3..5);
5562        check(2..3, 5..5, 5..6);
5563        check(3..4, 5..8, 6..9);
5564        check(0..2, 0..5, 0..5);
5565        check(1..3, 3..5, 3..6);
5566        check(2..4, 5..8, 5..9);
5567        check(0..3, 0..5, 0..6);
5568        check(1..4, 3..8, 3..9);
5569        check(0..4, 0..8, 0..9);
5570
5571        // Null at start
5572        // [NULL, [A, B], [C]]
5573        let rep = Some(vec![1, 1, 0, 1]);
5574        let def = Some(vec![1, 0, 0, 0]);
5575        let max_visible_def = 0;
5576        let total_items = 3;
5577
5578        let check = |range, expected_item_range, expected_level_range| {
5579            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5580                range,
5581                rep.as_ref(),
5582                def.as_ref(),
5583                max_rep,
5584                max_visible_def,
5585                total_items,
5586                PreambleAction::Absent,
5587            );
5588            assert_eq!(item_range, expected_item_range);
5589            assert_eq!(level_range, expected_level_range);
5590        };
5591
5592        check(0..1, 0..0, 0..1);
5593        check(1..2, 0..2, 1..3);
5594        check(2..3, 2..3, 3..4);
5595        check(0..2, 0..2, 0..3);
5596        check(1..3, 0..3, 1..4);
5597        check(0..3, 0..3, 0..4);
5598
5599        // Null at end
5600        // [[A], [B, C], NULL]
5601        let rep = Some(vec![1, 1, 0, 1]);
5602        let def = Some(vec![0, 0, 0, 1]);
5603        let max_visible_def = 0;
5604        let total_items = 3;
5605
5606        let check = |range, expected_item_range, expected_level_range| {
5607            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5608                range,
5609                rep.as_ref(),
5610                def.as_ref(),
5611                max_rep,
5612                max_visible_def,
5613                total_items,
5614                PreambleAction::Absent,
5615            );
5616            assert_eq!(item_range, expected_item_range);
5617            assert_eq!(level_range, expected_level_range);
5618        };
5619
5620        check(0..1, 0..1, 0..1);
5621        check(1..2, 1..3, 1..3);
5622        check(2..3, 3..3, 3..4);
5623        check(0..2, 0..3, 0..3);
5624        check(1..3, 1..3, 1..4);
5625        check(0..3, 0..3, 0..4);
5626
5627        // No nulls, with repetition
5628        // [[A, B], [C, D], [E, F]]
5629        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
5630        let def: Option<&[u16]> = None;
5631        let max_visible_def = 0;
5632        let total_items = 6;
5633
5634        let check = |range, expected_item_range, expected_level_range| {
5635            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5636                range,
5637                rep.as_ref(),
5638                def.as_ref(),
5639                max_rep,
5640                max_visible_def,
5641                total_items,
5642                PreambleAction::Absent,
5643            );
5644            assert_eq!(item_range, expected_item_range);
5645            assert_eq!(level_range, expected_level_range);
5646        };
5647
5648        check(0..1, 0..2, 0..2);
5649        check(1..2, 2..4, 2..4);
5650        check(2..3, 4..6, 4..6);
5651        check(0..2, 0..4, 0..4);
5652        check(1..3, 2..6, 2..6);
5653        check(0..3, 0..6, 0..6);
5654
5655        // No repetition, with nulls (this case is trivial)
5656        // [A, B, NULL, C]
5657        let rep: Option<&[u16]> = None;
5658        let def = Some(vec![0, 0, 1, 0]);
5659        let max_visible_def = 1;
5660        let total_items = 4;
5661
5662        let check = |range, expected_item_range, expected_level_range| {
5663            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5664                range,
5665                rep.as_ref(),
5666                def.as_ref(),
5667                max_rep,
5668                max_visible_def,
5669                total_items,
5670                PreambleAction::Absent,
5671            );
5672            assert_eq!(item_range, expected_item_range);
5673            assert_eq!(level_range, expected_level_range);
5674        };
5675
5676        check(0..1, 0..1, 0..1);
5677        check(1..2, 1..2, 1..2);
5678        check(2..3, 2..3, 2..3);
5679        check(0..2, 0..2, 0..2);
5680        check(1..3, 1..3, 1..3);
5681        check(0..3, 0..3, 0..3);
5682
5683        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
5684        // [[..., A] [B, C], NULL]
5685        //
5686        // What we do will depend on the preamble action
5687        let rep = Some(vec![0, 1, 0, 1]);
5688        let def = Some(vec![0, 0, 0, 1]);
5689        let max_visible_def = 0;
5690        let total_items = 3;
5691
5692        let check = |range, expected_item_range, expected_level_range| {
5693            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5694                range,
5695                rep.as_ref(),
5696                def.as_ref(),
5697                max_rep,
5698                max_visible_def,
5699                total_items,
5700                PreambleAction::Take,
5701            );
5702            assert_eq!(item_range, expected_item_range);
5703            assert_eq!(level_range, expected_level_range);
5704        };
5705
5706        // If we are taking the preamble then the range must start at 0
5707        check(0..1, 0..3, 0..3);
5708        check(0..2, 0..3, 0..4);
5709
5710        let check = |range, expected_item_range, expected_level_range| {
5711            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5712                range,
5713                rep.as_ref(),
5714                def.as_ref(),
5715                max_rep,
5716                max_visible_def,
5717                total_items,
5718                PreambleAction::Skip,
5719            );
5720            assert_eq!(item_range, expected_item_range);
5721            assert_eq!(level_range, expected_level_range);
5722        };
5723
5724        check(0..1, 1..3, 1..3);
5725        check(1..2, 3..3, 3..4);
5726        check(0..2, 1..3, 1..4);
5727
5728        // Another preamble case but now it doesn't end with a new list
5729        // [[..., A], NULL, [D, E]]
5730        //
5731        // What we do will depend on the preamble action
5732        let rep = Some(vec![0, 1, 1, 0]);
5733        let def = Some(vec![0, 1, 0, 0]);
5734        let max_visible_def = 0;
5735        let total_items = 4;
5736
5737        let check = |range, expected_item_range, expected_level_range| {
5738            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5739                range,
5740                rep.as_ref(),
5741                def.as_ref(),
5742                max_rep,
5743                max_visible_def,
5744                total_items,
5745                PreambleAction::Take,
5746            );
5747            assert_eq!(item_range, expected_item_range);
5748            assert_eq!(level_range, expected_level_range);
5749        };
5750
5751        // If we are taking the preamble then the range must start at 0
5752        check(0..1, 0..1, 0..2);
5753        check(0..2, 0..3, 0..4);
5754
5755        let check = |range, expected_item_range, expected_level_range| {
5756            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5757                range,
5758                rep.as_ref(),
5759                def.as_ref(),
5760                max_rep,
5761                max_visible_def,
5762                total_items,
5763                PreambleAction::Skip,
5764            );
5765            assert_eq!(item_range, expected_item_range);
5766            assert_eq!(level_range, expected_level_range);
5767        };
5768
5769        // If we are taking the preamble then the range must start at 0
5770        check(0..1, 1..1, 1..2);
5771        check(1..2, 1..3, 2..4);
5772        check(0..2, 1..3, 1..4);
5773
5774        // Now a preamble case without any definition levels
5775        // [[..., A] [B, C], [D]]
5776        let rep = Some(vec![0, 1, 0, 1]);
5777        let def: Option<Vec<u16>> = None;
5778        let max_visible_def = 0;
5779        let total_items = 4;
5780
5781        let check = |range, expected_item_range, expected_level_range| {
5782            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5783                range,
5784                rep.as_ref(),
5785                def.as_ref(),
5786                max_rep,
5787                max_visible_def,
5788                total_items,
5789                PreambleAction::Take,
5790            );
5791            assert_eq!(item_range, expected_item_range);
5792            assert_eq!(level_range, expected_level_range);
5793        };
5794
5795        // If we are taking the preamble then the range must start at 0
5796        check(0..1, 0..3, 0..3);
5797        check(0..2, 0..4, 0..4);
5798
5799        let check = |range, expected_item_range, expected_level_range| {
5800            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5801                range,
5802                rep.as_ref(),
5803                def.as_ref(),
5804                max_rep,
5805                max_visible_def,
5806                total_items,
5807                PreambleAction::Skip,
5808            );
5809            assert_eq!(item_range, expected_item_range);
5810            assert_eq!(level_range, expected_level_range);
5811        };
5812
5813        check(0..1, 1..3, 1..3);
5814        check(1..2, 3..4, 3..4);
5815        check(0..2, 1..4, 1..4);
5816
5817        // If we have nested lists then non-top level lists may be empty/null
5818        // and we need to make sure we still handle them as invisible items (we
5819        // failed to do this previously)
5820        let rep = Some(vec![2, 1, 2, 0, 1, 2]);
5821        let def = Some(vec![0, 1, 2, 0, 0, 0]);
5822        let max_rep = 2;
5823        let max_visible_def = 0;
5824        let total_items = 4;
5825
5826        let check = |range, expected_item_range, expected_level_range| {
5827            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5828                range,
5829                rep.as_ref(),
5830                def.as_ref(),
5831                max_rep,
5832                max_visible_def,
5833                total_items,
5834                PreambleAction::Absent,
5835            );
5836            assert_eq!(item_range, expected_item_range);
5837            assert_eq!(level_range, expected_level_range);
5838        };
5839
5840        check(0..3, 0..4, 0..6);
5841        check(0..1, 0..1, 0..2);
5842        check(1..2, 1..3, 2..5);
5843        check(2..3, 3..4, 5..6);
5844
5845        // Invisible items in a preamble that we are taking (regressing a previous failure)
5846        let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5847        let def = Some(vec![0, 1, 0, 0, 0, 0]);
5848        let max_rep = 1;
5849        let max_visible_def = 0;
5850        let total_items = 5;
5851
5852        let check = |range, expected_item_range, expected_level_range| {
5853            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5854                range,
5855                rep.as_ref(),
5856                def.as_ref(),
5857                max_rep,
5858                max_visible_def,
5859                total_items,
5860                PreambleAction::Take,
5861            );
5862            assert_eq!(item_range, expected_item_range);
5863            assert_eq!(level_range, expected_level_range);
5864        };
5865
5866        check(0..0, 0..1, 0..2);
5867        check(0..1, 0..3, 0..4);
5868        check(0..2, 0..4, 0..5);
5869
5870        // Skip preamble (with invis items) and skip a few rows (with invis items)
5871        // and then take a few rows but not all the rows
5872        let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5873        let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5874        let max_rep = 1;
5875        let max_visible_def = 0;
5876        let total_items = 5;
5877
5878        let check = |range, expected_item_range, expected_level_range| {
5879            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5880                range,
5881                rep.as_ref(),
5882                def.as_ref(),
5883                max_rep,
5884                max_visible_def,
5885                total_items,
5886                PreambleAction::Skip,
5887            );
5888            assert_eq!(item_range, expected_item_range);
5889            assert_eq!(level_range, expected_level_range);
5890        };
5891
5892        check(2..3, 2..4, 5..7);
5893    }
5894
5895    #[test]
5896    fn test_slice_batch_data_and_rebase_offsets_u32() {
5897        let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
5898        let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
5899
5900        let (sliced_data, normalized_offsets) =
5901            VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5902                .unwrap();
5903
5904        assert_eq!(sliced_data.as_ref(), b"6789ab");
5905        let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
5906        assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
5907    }
5908
5909    #[test]
5910    fn test_slice_batch_data_and_rebase_offsets_u64() {
5911        let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
5912        let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
5913
5914        let (sliced_data, normalized_offsets) =
5915            VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
5916                .unwrap();
5917
5918        assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
5919        let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
5920        assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
5921    }
5922
5923    #[test]
5924    fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
5925        let data = LanceBuffer::copy_slice(b"abcd");
5926        let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
5927
5928        let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5929            .expect_err("offset end before start should error");
5930        assert!(err.to_string().contains("less than base"));
5931    }
5932
5933    #[test]
5934    fn test_schedule_instructions() {
5935        // Convert repetition index to bytes for testing
5936        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5937        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5938        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5939
5940        let check = |user_ranges, expected_instructions| {
5941            let instructions =
5942                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5943            assert_eq!(instructions, expected_instructions);
5944        };
5945
5946        // The instructions we expect if we're grabbing the whole range
5947        let expected_take_all = vec![
5948            ChunkInstructions {
5949                chunk_idx: 0,
5950                preamble: PreambleAction::Absent,
5951                rows_to_skip: 0,
5952                rows_to_take: 6,
5953                take_trailer: true,
5954            },
5955            ChunkInstructions {
5956                chunk_idx: 1,
5957                preamble: PreambleAction::Take,
5958                rows_to_skip: 0,
5959                rows_to_take: 2,
5960                take_trailer: false,
5961            },
5962            ChunkInstructions {
5963                chunk_idx: 2,
5964                preamble: PreambleAction::Absent,
5965                rows_to_skip: 0,
5966                rows_to_take: 5,
5967                take_trailer: true,
5968            },
5969            ChunkInstructions {
5970                chunk_idx: 3,
5971                preamble: PreambleAction::Take,
5972                rows_to_skip: 0,
5973                rows_to_take: 1,
5974                take_trailer: false,
5975            },
5976        ];
5977
5978        // Take all as 1 range
5979        check(&[0..14], expected_take_all.clone());
5980
5981        // Take all a individual rows
5982        check(
5983            &[
5984                0..1,
5985                1..2,
5986                2..3,
5987                3..4,
5988                4..5,
5989                5..6,
5990                6..7,
5991                7..8,
5992                8..9,
5993                9..10,
5994                10..11,
5995                11..12,
5996                12..13,
5997                13..14,
5998            ],
5999            expected_take_all,
6000        );
6001
6002        // Test some partial takes
6003
6004        // 2 rows in the same chunk but not contiguous
6005        check(
6006            &[0..1, 3..4],
6007            vec![
6008                ChunkInstructions {
6009                    chunk_idx: 0,
6010                    preamble: PreambleAction::Absent,
6011                    rows_to_skip: 0,
6012                    rows_to_take: 1,
6013                    take_trailer: false,
6014                },
6015                ChunkInstructions {
6016                    chunk_idx: 0,
6017                    preamble: PreambleAction::Absent,
6018                    rows_to_skip: 3,
6019                    rows_to_take: 1,
6020                    take_trailer: false,
6021                },
6022            ],
6023        );
6024
6025        // Taking just a trailer/preamble
6026        check(
6027            &[5..6],
6028            vec![
6029                ChunkInstructions {
6030                    chunk_idx: 0,
6031                    preamble: PreambleAction::Absent,
6032                    rows_to_skip: 5,
6033                    rows_to_take: 1,
6034                    take_trailer: true,
6035                },
6036                ChunkInstructions {
6037                    chunk_idx: 1,
6038                    preamble: PreambleAction::Take,
6039                    rows_to_skip: 0,
6040                    rows_to_take: 0,
6041                    take_trailer: false,
6042                },
6043            ],
6044        );
6045
6046        // Skipping an entire chunk
6047        check(
6048            &[7..10],
6049            vec![
6050                ChunkInstructions {
6051                    chunk_idx: 1,
6052                    preamble: PreambleAction::Skip,
6053                    rows_to_skip: 1,
6054                    rows_to_take: 1,
6055                    take_trailer: false,
6056                },
6057                ChunkInstructions {
6058                    chunk_idx: 2,
6059                    preamble: PreambleAction::Absent,
6060                    rows_to_skip: 0,
6061                    rows_to_take: 2,
6062                    take_trailer: false,
6063                },
6064            ],
6065        );
6066    }
6067
6068    #[test]
6069    fn test_drain_instructions() {
6070        fn drain_from_instructions(
6071            instructions: &mut VecDeque<ChunkInstructions>,
6072            mut rows_desired: u64,
6073            need_preamble: &mut bool,
6074            skip_in_chunk: &mut u64,
6075        ) -> Vec<ChunkDrainInstructions> {
6076            // Note: instructions.len() is an upper bound, we typically take much fewer
6077            let mut drain_instructions = Vec::with_capacity(instructions.len());
6078            while rows_desired > 0 || *need_preamble {
6079                let (next_instructions, consumed_chunk) = instructions
6080                    .front()
6081                    .unwrap()
6082                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
6083                if consumed_chunk {
6084                    instructions.pop_front();
6085                }
6086                drain_instructions.push(next_instructions);
6087            }
6088            drain_instructions
6089        }
6090
6091        // Convert repetition index to bytes for testing
6092        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
6093        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6094        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6095        let user_ranges = vec![1..7, 10..14];
6096
6097        // First, schedule the ranges
6098        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6099
6100        let mut to_drain = VecDeque::from(scheduled.clone());
6101
6102        // Now we drain in batches of 4
6103
6104        let mut need_preamble = false;
6105        let mut skip_in_chunk = 0;
6106
6107        let next_batch =
6108            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6109
6110        assert!(!need_preamble);
6111        assert_eq!(skip_in_chunk, 4);
6112        assert_eq!(
6113            next_batch,
6114            vec![ChunkDrainInstructions {
6115                chunk_instructions: scheduled[0].clone(),
6116                rows_to_take: 4,
6117                rows_to_skip: 0,
6118                preamble_action: PreambleAction::Absent,
6119            }]
6120        );
6121
6122        let next_batch =
6123            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6124
6125        assert!(!need_preamble);
6126        assert_eq!(skip_in_chunk, 2);
6127
6128        assert_eq!(
6129            next_batch,
6130            vec![
6131                ChunkDrainInstructions {
6132                    chunk_instructions: scheduled[0].clone(),
6133                    rows_to_take: 1,
6134                    rows_to_skip: 4,
6135                    preamble_action: PreambleAction::Absent,
6136                },
6137                ChunkDrainInstructions {
6138                    chunk_instructions: scheduled[1].clone(),
6139                    rows_to_take: 1,
6140                    rows_to_skip: 0,
6141                    preamble_action: PreambleAction::Take,
6142                },
6143                ChunkDrainInstructions {
6144                    chunk_instructions: scheduled[2].clone(),
6145                    rows_to_take: 2,
6146                    rows_to_skip: 0,
6147                    preamble_action: PreambleAction::Absent,
6148                }
6149            ]
6150        );
6151
6152        let next_batch =
6153            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6154
6155        assert!(!need_preamble);
6156        assert_eq!(skip_in_chunk, 0);
6157
6158        assert_eq!(
6159            next_batch,
6160            vec![
6161                ChunkDrainInstructions {
6162                    chunk_instructions: scheduled[2].clone(),
6163                    rows_to_take: 1,
6164                    rows_to_skip: 2,
6165                    preamble_action: PreambleAction::Absent,
6166                },
6167                ChunkDrainInstructions {
6168                    chunk_instructions: scheduled[3].clone(),
6169                    rows_to_take: 1,
6170                    rows_to_skip: 0,
6171                    preamble_action: PreambleAction::Take,
6172                },
6173            ]
6174        );
6175
6176        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
6177        let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
6178        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6179        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6180        let user_ranges = vec![0..28];
6181
6182        // First, schedule the ranges
6183        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6184
6185        let mut to_drain = VecDeque::from(scheduled.clone());
6186
6187        // Drain first chunk and some of second chunk
6188
6189        let mut need_preamble = false;
6190        let mut skip_in_chunk = 0;
6191
6192        let next_batch =
6193            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
6194
6195        assert_eq!(
6196            next_batch,
6197            vec![
6198                ChunkDrainInstructions {
6199                    chunk_instructions: scheduled[0].clone(),
6200                    rows_to_take: 6,
6201                    rows_to_skip: 0,
6202                    preamble_action: PreambleAction::Absent,
6203                },
6204                ChunkDrainInstructions {
6205                    chunk_instructions: scheduled[1].clone(),
6206                    rows_to_take: 1,
6207                    rows_to_skip: 0,
6208                    preamble_action: PreambleAction::Take,
6209                },
6210            ]
6211        );
6212
6213        assert!(!need_preamble);
6214        assert_eq!(skip_in_chunk, 1);
6215
6216        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
6217        // we get a drain task to take the preamble of the third chunk (and nothing else)
6218        let next_batch =
6219            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6220
6221        assert_eq!(
6222            next_batch,
6223            vec![
6224                ChunkDrainInstructions {
6225                    chunk_instructions: scheduled[1].clone(),
6226                    rows_to_take: 2,
6227                    rows_to_skip: 1,
6228                    preamble_action: PreambleAction::Skip,
6229                },
6230                ChunkDrainInstructions {
6231                    chunk_instructions: scheduled[2].clone(),
6232                    rows_to_take: 0,
6233                    rows_to_skip: 0,
6234                    preamble_action: PreambleAction::Take,
6235                },
6236            ]
6237        );
6238
6239        assert!(!need_preamble);
6240        assert_eq!(skip_in_chunk, 0);
6241    }
6242
6243    #[tokio::test]
6244    async fn test_fullzip_initialize_is_lazy() {
6245        use futures::{FutureExt, future::BoxFuture};
6246        use std::ops::Range;
6247        use std::sync::Mutex;
6248
6249        #[derive(Debug, Clone)]
6250        struct RecordingScheduler {
6251            data: bytes::Bytes,
6252            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6253        }
6254
6255        impl RecordingScheduler {
6256            fn new(data: bytes::Bytes) -> Self {
6257                Self {
6258                    data,
6259                    requests: Arc::new(Mutex::new(Vec::new())),
6260                }
6261            }
6262
6263            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6264                self.requests.lock().unwrap().clone()
6265            }
6266        }
6267
6268        impl crate::EncodingsIo for RecordingScheduler {
6269            fn submit_request(
6270                &self,
6271                ranges: Vec<Range<u64>>,
6272                _priority: u64,
6273            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6274                self.requests.lock().unwrap().push(ranges.clone());
6275                let data = ranges
6276                    .into_iter()
6277                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6278                    .collect::<Vec<_>>();
6279                std::future::ready(Ok(data)).boxed()
6280            }
6281        }
6282
6283        #[derive(Debug)]
6284        struct TestFixedDecompressor;
6285
6286        impl FixedPerValueDecompressor for TestFixedDecompressor {
6287            fn decompress(
6288                &self,
6289                _data: FixedWidthDataBlock,
6290                _num_rows: u64,
6291            ) -> crate::Result<DataBlock> {
6292                unimplemented!("Test decompressor")
6293            }
6294
6295            fn bits_per_value(&self) -> u64 {
6296                32
6297            }
6298        }
6299
6300        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
6301            0;
6302            16 * 1024
6303        ])));
6304        let mut scheduler = FullZipScheduler {
6305            data_buf_position: 0,
6306            data_buf_size: 4096,
6307            rep_index: Some(FullZipRepIndexDetails {
6308                buf_position: 1000,
6309                bytes_per_value: 4,
6310            }),
6311            priority: 0,
6312            rows_in_page: 100,
6313            bits_per_offset: 32,
6314            details: Arc::new(FullZipDecodeDetails {
6315                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6316                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6317                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6318                max_rep: 0,
6319                max_visible_def: 0,
6320            }),
6321            cached_state: None,
6322            enable_cache: false,
6323        };
6324
6325        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6326        let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6327
6328        assert!(
6329            cached_data
6330                .as_arc_any()
6331                .downcast_ref::<super::NoCachedPageData>()
6332                .is_some(),
6333            "FullZip initialize should not eagerly load repetition index data"
6334        );
6335        assert!(scheduler.cached_state.is_none());
6336        assert!(
6337            io.requests().is_empty(),
6338            "FullZip initialize should not issue any I/O"
6339        );
6340    }
6341
6342    #[tokio::test]
6343    async fn test_fullzip_read_source_slices_prefetched_page() {
6344        let page_start = 200_u64;
6345        let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
6346        let source = FullZipReadSource::PrefetchedPage {
6347            base_offset: page_start,
6348            data: page_data,
6349        };
6350        let ranges = vec![
6351            page_start..(page_start + 3),
6352            (page_start + 4)..(page_start + 8),
6353        ];
6354        let mut data = source.fetch(&ranges, 0).await.unwrap();
6355        assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
6356        assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
6357    }
6358
6359    #[tokio::test]
6360    async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
6361        use futures::{FutureExt, future::BoxFuture};
6362        use std::ops::Range;
6363        use std::sync::Mutex;
6364
6365        #[derive(Debug, Clone)]
6366        struct RecordingScheduler {
6367            data: bytes::Bytes,
6368            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6369        }
6370
6371        impl RecordingScheduler {
6372            fn new(data: bytes::Bytes) -> Self {
6373                Self {
6374                    data,
6375                    requests: Arc::new(Mutex::new(Vec::new())),
6376                }
6377            }
6378
6379            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6380                self.requests.lock().unwrap().clone()
6381            }
6382        }
6383
6384        impl crate::EncodingsIo for RecordingScheduler {
6385            fn submit_request(
6386                &self,
6387                ranges: Vec<Range<u64>>,
6388                _priority: u64,
6389            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6390                self.requests.lock().unwrap().push(ranges.clone());
6391                let data = ranges
6392                    .into_iter()
6393                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6394                    .collect::<Vec<_>>();
6395                std::future::ready(Ok(data)).boxed()
6396            }
6397        }
6398
6399        #[derive(Debug)]
6400        struct TestFixedDecompressor;
6401
6402        impl FixedPerValueDecompressor for TestFixedDecompressor {
6403            fn decompress(
6404                &self,
6405                _data: FixedWidthDataBlock,
6406                _num_rows: u64,
6407            ) -> crate::Result<DataBlock> {
6408                unimplemented!("Test decompressor")
6409            }
6410
6411            fn bits_per_value(&self) -> u64 {
6412                32
6413            }
6414        }
6415
6416        let rows_in_page = 100_u64;
6417        let bytes_per_value = 4_u64;
6418        let rep_start = 1000_u64;
6419        let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
6420        let mut data = vec![0_u8; 16 * 1024];
6421        data[rep_start as usize..rep_start as usize + rep_size].fill(7);
6422        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
6423
6424        let mut scheduler = FullZipScheduler {
6425            data_buf_position: 0,
6426            data_buf_size: 4096,
6427            rep_index: Some(FullZipRepIndexDetails {
6428                buf_position: rep_start,
6429                bytes_per_value,
6430            }),
6431            priority: 0,
6432            rows_in_page,
6433            bits_per_offset: 32,
6434            details: Arc::new(FullZipDecodeDetails {
6435                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6436                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6437                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6438                max_rep: 0,
6439                max_visible_def: 0,
6440            }),
6441            cached_state: None,
6442            enable_cache: true,
6443        };
6444
6445        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6446        let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6447        assert!(
6448            cached_data
6449                .as_arc_any()
6450                .downcast_ref::<FullZipCacheableState>()
6451                .is_some()
6452        );
6453        assert!(scheduler.cached_state.is_some());
6454        assert_eq!(
6455            io.requests(),
6456            vec![vec![
6457                rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
6458            ]]
6459        );
6460    }
6461
6462    #[tokio::test]
6463    async fn test_fullzip_full_page_bypasses_rep_index_io() {
6464        use futures::{FutureExt, future::BoxFuture};
6465        use std::ops::Range;
6466        use std::sync::Mutex;
6467
6468        #[derive(Debug, Clone)]
6469        struct RecordingScheduler {
6470            data: bytes::Bytes,
6471            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6472        }
6473
6474        impl RecordingScheduler {
6475            fn new(data: bytes::Bytes) -> Self {
6476                Self {
6477                    data,
6478                    requests: Arc::new(Mutex::new(Vec::new())),
6479                }
6480            }
6481
6482            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6483                self.requests.lock().unwrap().clone()
6484            }
6485        }
6486
6487        impl crate::EncodingsIo for RecordingScheduler {
6488            fn submit_request(
6489                &self,
6490                ranges: Vec<Range<u64>>,
6491                _priority: u64,
6492            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6493                self.requests.lock().unwrap().push(ranges.clone());
6494                let data = ranges
6495                    .into_iter()
6496                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6497                    .collect::<Vec<_>>();
6498                std::future::ready(Ok(data)).boxed()
6499            }
6500        }
6501
6502        #[derive(Debug)]
6503        struct TestFixedDecompressor;
6504
6505        impl FixedPerValueDecompressor for TestFixedDecompressor {
6506            fn decompress(
6507                &self,
6508                _data: FixedWidthDataBlock,
6509                _num_rows: u64,
6510            ) -> crate::Result<DataBlock> {
6511                unimplemented!("Test decompressor")
6512            }
6513
6514            fn bits_per_value(&self) -> u64 {
6515                32
6516            }
6517        }
6518
6519        let rows_in_page = 100_u64;
6520        let data_start = 256_u64;
6521        let data_size = 500_u64;
6522        let rep_start = 4096_u64;
6523        let bytes_per_value = 4_u64;
6524
6525        let mut bytes = vec![0_u8; 16 * 1024];
6526        for i in 0..=rows_in_page {
6527            let offset = (i * 5) as u32;
6528            let pos = rep_start as usize + (i * bytes_per_value) as usize;
6529            bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
6530        }
6531        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
6532
6533        let scheduler = FullZipScheduler {
6534            data_buf_position: data_start,
6535            data_buf_size: data_size,
6536            rep_index: Some(FullZipRepIndexDetails {
6537                buf_position: rep_start,
6538                bytes_per_value,
6539            }),
6540            priority: 0,
6541            rows_in_page,
6542            bits_per_offset: 32,
6543            details: Arc::new(FullZipDecodeDetails {
6544                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6545                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6546                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6547                max_rep: 0,
6548                max_visible_def: 0,
6549            }),
6550            cached_state: None,
6551            enable_cache: false,
6552        };
6553
6554        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6555        let tasks = scheduler
6556            .schedule_ranges_rep(
6557                &[0..rows_in_page],
6558                &io_dyn,
6559                FullZipRepIndexDetails {
6560                    buf_position: rep_start,
6561                    bytes_per_value,
6562                },
6563            )
6564            .unwrap();
6565
6566        let requests = io.requests();
6567        assert_eq!(requests.len(), 1);
6568        assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
6569
6570        let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
6571        let requests_after_await = io.requests();
6572        assert_eq!(
6573            requests_after_await.len(),
6574            1,
6575            "full page path should not issue rep-index I/O"
6576        );
6577    }
6578
6579    /// This test is used to reproduce fuzz test https://github.com/lancedb/lance/issues/4492
6580    #[tokio::test]
6581    async fn test_fuzz_issue_4492_empty_rep_values() {
6582        use lance_datagen::{RowCount, Seed, array, gen_batch};
6583
6584        let seed = 1823859942947654717u64;
6585        let num_rows = 2741usize;
6586
6587        // Generate the exact same data that caused the failure
6588        let batch_gen = gen_batch().with_seed(Seed::from(seed));
6589        let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
6590        let list_generator = array::rand_list_any(base_generator, false);
6591
6592        let batch = batch_gen
6593            .anon_col(list_generator)
6594            .into_batch_rows(RowCount::from(num_rows as u64))
6595            .unwrap();
6596
6597        let list_array = batch.column(0).clone();
6598
6599        // Force miniblock encoding
6600        let mut metadata = HashMap::new();
6601        metadata.insert(
6602            STRUCTURAL_ENCODING_META_KEY.to_string(),
6603            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6604        );
6605
6606        let test_cases = TestCases::default()
6607            .with_min_file_version(LanceFileVersion::V2_1)
6608            .with_batch_size(100)
6609            .with_range(0..num_rows.min(500) as u64)
6610            .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
6611
6612        check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
6613    }
6614
6615    async fn test_minichunk_size_helper(
6616        string_data: Vec<Option<String>>,
6617        minichunk_size: u64,
6618        file_version: LanceFileVersion,
6619    ) {
6620        use crate::constants::MINICHUNK_SIZE_META_KEY;
6621        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6622        use arrow_array::{ArrayRef, StringArray};
6623        use std::sync::Arc;
6624
6625        let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
6626
6627        let mut metadata = HashMap::new();
6628        metadata.insert(
6629            MINICHUNK_SIZE_META_KEY.to_string(),
6630            minichunk_size.to_string(),
6631        );
6632        metadata.insert(
6633            STRUCTURAL_ENCODING_META_KEY.to_string(),
6634            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6635        );
6636
6637        let test_cases = TestCases::default()
6638            .with_min_file_version(file_version)
6639            .with_batch_size(1000);
6640
6641        check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
6642    }
6643
6644    #[tokio::test]
6645    async fn test_minichunk_size_roundtrip() {
6646        // Test that minichunk size can be configured and works correctly in round-trip encoding
6647        let mut string_data = Vec::new();
6648        for i in 0..100 {
6649            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6650        }
6651        // configure minichunk size to 64 bytes (smaller than the default 4kb) for Lance 2.1
6652        test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
6653    }
6654
6655    #[tokio::test]
6656    async fn test_minichunk_size_128kb_v2_2() {
6657        // Test that minichunk size can be configured to 128KB and works correctly with Lance 2.2
6658        let mut string_data = Vec::new();
6659        // create a 500kb string array
6660        for i in 0..10000 {
6661            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6662        }
6663        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6664    }
6665
6666    #[tokio::test]
6667    async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
6668        let mut string_data = Vec::new();
6669        // 128kb/chunk / 6 bytes (t_9999) = 21845 > max 4096 items per chunk
6670        for i in 0..10000 {
6671            string_data.push(Some(format!("t_{}", i)));
6672        }
6673        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6674    }
6675
6676    #[tokio::test]
6677    async fn test_large_dictionary_general_compression() {
6678        use arrow_array::{ArrayRef, StringArray};
6679        use std::collections::HashMap;
6680        use std::sync::Arc;
6681
6682        // Create large string dictionary data (>32KiB) with low cardinality
6683        // Use 100 unique strings, each 500 bytes long = 50KB dictionary
6684        let unique_values: Vec<String> = (0..100)
6685            .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
6686            .collect();
6687
6688        // Repeat these strings many times to create a large array
6689        let repeated_strings: Vec<_> = unique_values
6690            .iter()
6691            .cycle()
6692            .take(100_000)
6693            .map(|s| Some(s.as_str()))
6694            .collect();
6695
6696        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
6697
6698        // Configure test to use V2_2 and verify encoding
6699        let test_cases = TestCases::default()
6700            .with_min_file_version(LanceFileVersion::V2_2)
6701            .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
6702                assert_eq!(cols.len(), 1);
6703                let col = &cols[0];
6704
6705                // Navigate to the dictionary encoding in the page layout
6706                if let Some(PageEncoding::Structural(page_layout)) =
6707                    &col.final_pages.first().map(|p| &p.description)
6708                    && let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
6709                        &page_layout.layout
6710                    && let Some(dictionary_encoding) = &mini_block.dictionary
6711                {
6712                    match dictionary_encoding.compression.as_ref() {
6713                        Some(Compression::General(general)) => {
6714                            // Verify it's using LZ4 or Zstd
6715                            let compression = general.compression.as_ref().unwrap();
6716                            assert!(
6717                                compression.scheme()
6718                                    == pb21::CompressionScheme::CompressionAlgorithmLz4
6719                                    || compression.scheme()
6720                                        == pb21::CompressionScheme::CompressionAlgorithmZstd,
6721                                "Expected LZ4 or Zstd compression for large dictionary"
6722                            );
6723                        }
6724                        _ => panic!("Expected General compression for large dictionary"),
6725                    }
6726                }
6727            }));
6728
6729        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
6730    }
6731
6732    fn dictionary_encoding_from_page(
6733        page: &crate::encoder::EncodedPage,
6734    ) -> &crate::format::pb21::CompressiveEncoding {
6735        let PageEncoding::Structural(layout) = &page.description else {
6736            panic!("Expected structural page encoding");
6737        };
6738        let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
6739        else {
6740            panic!("Expected mini-block layout");
6741        };
6742        layout
6743            .dictionary
6744            .as_ref()
6745            .unwrap_or_else(|| panic!("Expected dictionary encoding"))
6746    }
6747
6748    async fn encode_variable_dict_page(
6749        metadata: HashMap<String, String>,
6750    ) -> crate::encoder::EncodedPage {
6751        use arrow_array::types::Int32Type;
6752        use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
6753
6754        let values = Arc::new(StringArray::from(
6755            (0..128)
6756                .map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
6757                .collect::<Vec<_>>(),
6758        )) as ArrayRef;
6759        let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
6760        let dict_array =
6761            Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
6762
6763        let field = arrow_schema::Field::new(
6764            "dict_col",
6765            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
6766            false,
6767        )
6768        .with_metadata(metadata);
6769
6770        encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
6771    }
6772
6773    async fn encode_auto_fixed_dict_page(
6774        metadata: HashMap<String, String>,
6775    ) -> crate::encoder::EncodedPage {
6776        use arrow_array::{ArrayRef, Decimal128Array};
6777
6778        // 128-bit fixed-width values with low cardinality to trigger dictionary encoding.
6779        let values = (0..20_000)
6780            .map(|i| match i % 3 {
6781                0 => 10_i128,
6782                1 => 20_i128,
6783                _ => 30_i128,
6784            })
6785            .collect::<Vec<_>>();
6786        let decimal = Decimal128Array::from_iter_values(values)
6787            .with_precision_and_scale(38, 0)
6788            .unwrap();
6789        let decimal = Arc::new(decimal) as ArrayRef;
6790
6791        let mut field_metadata = metadata;
6792        // Strongly encourage dictionary encoding for this synthetic test data.
6793        field_metadata.insert(
6794            "lance-encoding:dict-size-ratio".to_string(),
6795            "0.99".to_string(),
6796        );
6797        let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
6798            .with_metadata(field_metadata);
6799
6800        encode_first_page(field, decimal, LanceFileVersion::V2_2).await
6801    }
6802
6803    #[tokio::test]
6804    async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
6805        let page = encode_variable_dict_page(HashMap::new()).await;
6806        let dictionary_encoding = dictionary_encoding_from_page(&page);
6807        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6808            panic!("Expected General compression for dictionary values");
6809        };
6810        let compression = general.compression.as_ref().unwrap();
6811        assert_eq!(
6812            compression.scheme(),
6813            pb21::CompressionScheme::CompressionAlgorithmLz4
6814        );
6815    }
6816
6817    #[tokio::test]
6818    async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
6819        let page = encode_auto_fixed_dict_page(HashMap::new()).await;
6820        let dictionary_encoding = dictionary_encoding_from_page(&page);
6821        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6822            panic!("Expected General compression for dictionary values");
6823        };
6824        let compression = general.compression.as_ref().unwrap();
6825        assert_eq!(
6826            compression.scheme(),
6827            pb21::CompressionScheme::CompressionAlgorithmLz4
6828        );
6829    }
6830
6831    #[tokio::test]
6832    async fn test_dict_values_general_compression_zstd() {
6833        let mut metadata = HashMap::new();
6834        metadata.insert(
6835            DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6836            "zstd".to_string(),
6837        );
6838        let page = encode_variable_dict_page(metadata).await;
6839        let dictionary_encoding = dictionary_encoding_from_page(&page);
6840        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6841            panic!("Expected General compression for dictionary values");
6842        };
6843        let compression = general.compression.as_ref().unwrap();
6844        assert_eq!(
6845            compression.scheme(),
6846            pb21::CompressionScheme::CompressionAlgorithmZstd
6847        );
6848    }
6849
6850    #[tokio::test]
6851    async fn test_dict_values_general_compression_none() {
6852        let mut metadata = HashMap::new();
6853        metadata.insert(
6854            DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6855            "none".to_string(),
6856        );
6857        let page = encode_variable_dict_page(metadata).await;
6858        let dictionary_encoding = dictionary_encoding_from_page(&page);
6859        assert!(
6860            !matches!(
6861                dictionary_encoding.compression.as_ref(),
6862                Some(Compression::General(_))
6863            ),
6864            "Expected dictionary values to avoid General compression"
6865        );
6866    }
6867
6868    #[test]
6869    fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
6870        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6871            &HashMap::new(),
6872            None,
6873            None,
6874        );
6875        assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
6876        assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
6877    }
6878
6879    #[test]
6880    fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
6881        let field_metadata = HashMap::from([
6882            (
6883                DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6884                "none".to_string(),
6885            ),
6886            (
6887                DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
6888                "7".to_string(),
6889            ),
6890        ]);
6891        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6892            &field_metadata,
6893            Some("zstd".to_string()),
6894            Some("3".to_string()),
6895        );
6896        assert_eq!(
6897            metadata.get(COMPRESSION_META_KEY),
6898            Some(&"none".to_string()),
6899        );
6900        assert_eq!(
6901            metadata.get(COMPRESSION_LEVEL_META_KEY),
6902            Some(&"7".to_string()),
6903        );
6904    }
6905
6906    #[test]
6907    fn test_resolve_dict_values_compression_metadata_env_fallback() {
6908        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6909            &HashMap::new(),
6910            Some("zstd".to_string()),
6911            Some("9".to_string()),
6912        );
6913        assert_eq!(
6914            metadata.get(COMPRESSION_META_KEY),
6915            Some(&"zstd".to_string()),
6916        );
6917        assert_eq!(
6918            metadata.get(COMPRESSION_LEVEL_META_KEY),
6919            Some(&"9".to_string()),
6920        );
6921    }
6922
6923    #[tokio::test]
6924    async fn test_dictionary_encode_int64() {
6925        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6926        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6927        use crate::version::LanceFileVersion;
6928        use arrow_array::{ArrayRef, Int64Array};
6929        use std::collections::HashMap;
6930        use std::sync::Arc;
6931
6932        // Low cardinality with poor RLE opportunity.
6933        let values = (0..1000)
6934            .map(|i| match i % 3 {
6935                0 => 10i64,
6936                1 => 20i64,
6937                _ => 30i64,
6938            })
6939            .collect::<Vec<_>>();
6940        let array = Arc::new(Int64Array::from(values)) as ArrayRef;
6941
6942        let mut metadata = HashMap::new();
6943        metadata.insert(
6944            STRUCTURAL_ENCODING_META_KEY.to_string(),
6945            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6946        );
6947        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6948
6949        let test_cases = TestCases::default()
6950            .with_min_file_version(LanceFileVersion::V2_2)
6951            .with_batch_size(1000)
6952            .with_range(0..1000)
6953            .with_indices(vec![0, 1, 10, 999])
6954            .with_expected_encoding("dictionary");
6955
6956        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6957    }
6958
6959    #[tokio::test]
6960    async fn test_dictionary_encode_float64() {
6961        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6962        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6963        use crate::version::LanceFileVersion;
6964        use arrow_array::{ArrayRef, Float64Array};
6965        use std::collections::HashMap;
6966        use std::sync::Arc;
6967
6968        // Low cardinality with poor RLE opportunity.
6969        let values = (0..1000)
6970            .map(|i| match i % 3 {
6971                0 => 0.1f64,
6972                1 => 0.2f64,
6973                _ => 0.3f64,
6974            })
6975            .collect::<Vec<_>>();
6976        let array = Arc::new(Float64Array::from(values)) as ArrayRef;
6977
6978        let mut metadata = HashMap::new();
6979        metadata.insert(
6980            STRUCTURAL_ENCODING_META_KEY.to_string(),
6981            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6982        );
6983        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6984
6985        let test_cases = TestCases::default()
6986            .with_min_file_version(LanceFileVersion::V2_2)
6987            .with_batch_size(1000)
6988            .with_range(0..1000)
6989            .with_indices(vec![0, 1, 10, 999])
6990            .with_expected_encoding("dictionary");
6991
6992        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6993    }
6994
6995    #[test]
6996    fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
6997        let rows = 10_000;
6998        let unique_values = 2_000;
6999
7000        let dictionary_encoding =
7001            ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
7002        let layout = pb21::MiniBlockLayout {
7003            rep_compression: None,
7004            def_compression: None,
7005            value_compression: Some(ProtobufUtils21::flat(64, None)),
7006            dictionary: Some(dictionary_encoding),
7007            num_dictionary_items: unique_values,
7008            layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
7009            num_buffers: 1,
7010            repetition_index_depth: 0,
7011            num_items: rows,
7012            has_large_chunk: false,
7013        };
7014
7015        let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
7016        let scheduler = super::MiniBlockScheduler::try_new(
7017            &buffer_offsets_and_sizes,
7018            /*priority=*/ 0,
7019            /*items_in_page=*/ rows,
7020            &layout,
7021            &DefaultDecompressionStrategy::default(),
7022        )
7023        .unwrap();
7024
7025        let dictionary = scheduler.dictionary.unwrap();
7026        assert_eq!(dictionary.num_dictionary_items, unique_values);
7027        assert_eq!(
7028            dictionary.dictionary_data_alignment,
7029            crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
7030        );
7031    }
7032
7033    // Dictionary encoding decision tests
7034    fn create_test_fixed_data_block(
7035        num_values: u64,
7036        cardinality: u64,
7037        bits_per_value: u64,
7038    ) -> DataBlock {
7039        assert!(cardinality > 0);
7040        assert!(cardinality <= num_values);
7041        let block_info = BlockInfo::default();
7042
7043        assert_eq!(bits_per_value % 8, 0);
7044        let data = match bits_per_value {
7045            32 => {
7046                let values = (0..num_values)
7047                    .map(|i| (i % cardinality) as u32)
7048                    .collect::<Vec<_>>();
7049                crate::buffer::LanceBuffer::reinterpret_vec(values)
7050            }
7051            64 => {
7052                let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
7053                crate::buffer::LanceBuffer::reinterpret_vec(values)
7054            }
7055            128 => {
7056                let values = (0..num_values)
7057                    .map(|i| (i % cardinality) as u128)
7058                    .collect::<Vec<_>>();
7059                crate::buffer::LanceBuffer::reinterpret_vec(values)
7060            }
7061            _ => unreachable!(),
7062        };
7063        DataBlock::FixedWidth(FixedWidthDataBlock {
7064            bits_per_value,
7065            data,
7066            num_values,
7067            block_info,
7068        })
7069    }
7070
7071    /// Helper to create VariableWidth (string) test data block with exact cardinality
7072    fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
7073        use arrow_array::StringArray;
7074
7075        assert!(cardinality <= num_values && cardinality > 0);
7076
7077        let mut values = Vec::with_capacity(num_values as usize);
7078        for i in 0..num_values {
7079            values.push(format!("value_{:016}", i % cardinality));
7080        }
7081
7082        let array = StringArray::from(values);
7083        DataBlock::from_array(Arc::new(array) as ArrayRef)
7084    }
7085
7086    #[test]
7087    fn test_should_dictionary_encode() {
7088        use crate::constants::DICT_SIZE_RATIO_META_KEY;
7089        use lance_core::datatypes::Field as LanceField;
7090
7091        // Create data where dict encoding saves space
7092        let block = create_test_variable_width_block(1000, 10);
7093
7094        let mut metadata = HashMap::new();
7095        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7096        let arrow_field =
7097            arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7098        let field = LanceField::try_from(&arrow_field).unwrap();
7099
7100        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7101            &block,
7102            &field,
7103            LanceFileVersion::V2_1,
7104        );
7105
7106        assert!(
7107            result.is_some(),
7108            "Should use dictionary encode based on size"
7109        );
7110    }
7111
7112    #[test]
7113    fn test_should_not_dictionary_encode_unsupported_bits() {
7114        use crate::constants::DICT_SIZE_RATIO_META_KEY;
7115        use lance_core::datatypes::Field as LanceField;
7116
7117        let block = create_test_fixed_data_block(1000, 1000, 32);
7118
7119        let mut metadata = HashMap::new();
7120        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7121        let arrow_field =
7122            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
7123        let field = LanceField::try_from(&arrow_field).unwrap();
7124
7125        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7126            &block,
7127            &field,
7128            LanceFileVersion::V2_1,
7129        );
7130
7131        assert!(
7132            result.is_none(),
7133            "Should not use dictionary encode for unsupported bit width"
7134        );
7135    }
7136
7137    #[test]
7138    fn test_should_not_dictionary_encode_near_unique_sample() {
7139        use crate::constants::DICT_SIZE_RATIO_META_KEY;
7140        use lance_core::datatypes::Field as LanceField;
7141
7142        let num_values = 5000;
7143        let block = create_test_variable_width_block(num_values, num_values);
7144
7145        let mut metadata = HashMap::new();
7146        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
7147        let arrow_field =
7148            arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7149        let field = LanceField::try_from(&arrow_field).unwrap();
7150
7151        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7152            &block,
7153            &field,
7154            LanceFileVersion::V2_1,
7155        );
7156
7157        assert!(
7158            result.is_none(),
7159            "Should not probe dictionary encoding for near-unique data"
7160        );
7161    }
7162
7163    async fn encode_first_page(
7164        field: arrow_schema::Field,
7165        array: ArrayRef,
7166        version: LanceFileVersion,
7167    ) -> crate::encoder::EncodedPage {
7168        use crate::encoder::{
7169            ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
7170            default_encoding_strategy,
7171        };
7172        use crate::repdef::RepDefBuilder;
7173
7174        let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
7175        let encoding_strategy = default_encoding_strategy(version);
7176        let mut column_index_seq = ColumnIndexSequence::default();
7177        let encoding_options = EncodingOptions {
7178            cache_bytes_per_column: 1,
7179            max_page_bytes: 32 * 1024 * 1024,
7180            keep_original_array: true,
7181            buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
7182            version,
7183        };
7184
7185        let mut encoder = encoding_strategy
7186            .create_field_encoder(
7187                encoding_strategy.as_ref(),
7188                &lance_field,
7189                &mut column_index_seq,
7190                &encoding_options,
7191            )
7192            .unwrap();
7193
7194        let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
7195        let repdef = RepDefBuilder::default();
7196        let num_rows = array.len() as u64;
7197        let mut pages = Vec::new();
7198        for task in encoder
7199            .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
7200            .unwrap()
7201        {
7202            pages.push(task.await.unwrap());
7203        }
7204        for task in encoder.flush(&mut external_buffers).unwrap() {
7205            pages.push(task.await.unwrap());
7206        }
7207        pages.into_iter().next().unwrap()
7208    }
7209
7210    #[tokio::test]
7211    async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
7212        use crate::format::pb21::page_layout::Layout;
7213
7214        let val = vec![0xABu8; 33];
7215        let arr: ArrayRef = Arc::new(
7216            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
7217                std::iter::repeat_n(Some(val.as_slice()), 256),
7218                33,
7219            )
7220            .unwrap(),
7221        );
7222        let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
7223        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7224
7225        let PageEncoding::Structural(layout) = &page.description else {
7226            panic!("Expected structural encoding");
7227        };
7228        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7229            panic!("Expected constant layout in slot 2");
7230        };
7231        assert!(layout.inline_value.is_none());
7232        assert_eq!(page.data.len(), 1);
7233
7234        let test_cases = TestCases::default()
7235            .with_min_file_version(LanceFileVersion::V2_2)
7236            .with_max_file_version(LanceFileVersion::V2_2)
7237            .with_page_sizes(vec![4096]);
7238        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7239    }
7240
7241    #[tokio::test]
7242    async fn test_constant_layout_out_of_line_utf8_v2_2() {
7243        use crate::format::pb21::page_layout::Layout;
7244
7245        let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
7246            std::iter::repeat_n("hello", 512),
7247        ));
7248        let field = arrow_schema::Field::new("c", DataType::Utf8, true);
7249        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7250
7251        let PageEncoding::Structural(layout) = &page.description else {
7252            panic!("Expected structural encoding");
7253        };
7254        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7255            panic!("Expected constant layout in slot 2");
7256        };
7257        assert!(layout.inline_value.is_none());
7258        assert_eq!(page.data.len(), 1);
7259
7260        let test_cases = TestCases::default()
7261            .with_min_file_version(LanceFileVersion::V2_2)
7262            .with_max_file_version(LanceFileVersion::V2_2)
7263            .with_page_sizes(vec![4096]);
7264        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7265    }
7266
7267    #[tokio::test]
7268    async fn test_constant_layout_nullable_item_v2_2() {
7269        use crate::format::pb21::page_layout::Layout;
7270
7271        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
7272            Some(7),
7273            None,
7274            Some(7),
7275            None,
7276            Some(7),
7277        ]));
7278        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7279        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7280
7281        let PageEncoding::Structural(layout) = &page.description else {
7282            panic!("Expected structural encoding");
7283        };
7284        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7285            panic!("Expected constant layout in slot 2");
7286        };
7287        assert!(layout.inline_value.is_some());
7288        assert_eq!(page.data.len(), 2);
7289
7290        let test_cases = TestCases::default()
7291            .with_min_file_version(LanceFileVersion::V2_2)
7292            .with_max_file_version(LanceFileVersion::V2_2)
7293            .with_page_sizes(vec![4096]);
7294        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7295    }
7296
7297    #[tokio::test]
7298    async fn test_constant_layout_list_repdef_v2_2() {
7299        use crate::format::pb21::page_layout::Layout;
7300        use arrow_array::builder::{Int32Builder, ListBuilder};
7301
7302        let mut builder = ListBuilder::new(Int32Builder::new());
7303        builder.values().append_value(7);
7304        builder.values().append_null();
7305        builder.values().append_value(7);
7306        builder.append(true);
7307
7308        builder.append(true);
7309
7310        builder.values().append_value(7);
7311        builder.append(true);
7312
7313        builder.append_null();
7314
7315        let arr: ArrayRef = Arc::new(builder.finish());
7316        let field = arrow_schema::Field::new(
7317            "c",
7318            DataType::List(Arc::new(arrow_schema::Field::new(
7319                "item",
7320                DataType::Int32,
7321                true,
7322            ))),
7323            true,
7324        );
7325        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7326
7327        let PageEncoding::Structural(layout) = &page.description else {
7328            panic!("Expected structural encoding");
7329        };
7330        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7331            panic!("Expected constant layout in slot 2");
7332        };
7333        assert!(layout.inline_value.is_some());
7334        assert_eq!(page.data.len(), 2);
7335
7336        let test_cases = TestCases::default()
7337            .with_min_file_version(LanceFileVersion::V2_2)
7338            .with_max_file_version(LanceFileVersion::V2_2)
7339            .with_page_sizes(vec![4096]);
7340        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7341    }
7342
7343    #[tokio::test]
7344    async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
7345        use crate::format::pb21::page_layout::Layout;
7346        use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
7347
7348        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
7349        for _ in 0..64 {
7350            builder.values().append_value(1);
7351            builder.values().append_null();
7352            builder.values().append_value(3);
7353            builder.append(true);
7354        }
7355        let arr: ArrayRef = Arc::new(builder.finish());
7356        let field = arrow_schema::Field::new(
7357            "c",
7358            DataType::FixedSizeList(
7359                Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
7360                3,
7361            ),
7362            true,
7363        );
7364        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7365
7366        if let PageEncoding::Structural(layout) = &page.description {
7367            assert!(
7368                !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7369                "FixedSizeList should not use constant layout yet"
7370            );
7371        }
7372
7373        let test_cases = TestCases::default()
7374            .with_min_file_version(LanceFileVersion::V2_2)
7375            .with_max_file_version(LanceFileVersion::V2_2)
7376            .with_page_sizes(vec![4096]);
7377        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7378    }
7379
7380    #[tokio::test]
7381    async fn test_constant_layout_not_written_before_v2_2() {
7382        use crate::format::pb21::page_layout::Layout;
7383
7384        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
7385        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7386        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
7387
7388        let PageEncoding::Structural(layout) = &page.description else {
7389            return;
7390        };
7391        assert!(
7392            !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7393            "Should not emit constant layout before v2.2"
7394        );
7395
7396        let test_cases = TestCases::default()
7397            .with_min_file_version(LanceFileVersion::V2_1)
7398            .with_max_file_version(LanceFileVersion::V2_1)
7399            .with_page_sizes(vec![4096]);
7400        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7401    }
7402
7403    #[tokio::test]
7404    async fn test_all_null_constant_layout_still_works_v2_2() {
7405        use crate::format::pb21::page_layout::Layout;
7406
7407        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
7408        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7409        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7410
7411        let PageEncoding::Structural(layout) = &page.description else {
7412            panic!("Expected structural encoding");
7413        };
7414        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7415            panic!("Expected layout in slot 2");
7416        };
7417        assert!(layout.inline_value.is_none());
7418        assert_eq!(page.data.len(), 0);
7419
7420        let test_cases = TestCases::default()
7421            .with_min_file_version(LanceFileVersion::V2_2)
7422            .with_max_file_version(LanceFileVersion::V2_2)
7423            .with_page_sizes(vec![4096]);
7424        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7425    }
7426
7427    #[test]
7428    fn test_encode_decode_complex_all_null_vals_roundtrip() {
7429        use crate::compression::{
7430            DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
7431        };
7432
7433        let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
7434
7435        let compression_strategy = DefaultCompressionStrategy::default();
7436        let decompression_strategy = DefaultDecompressionStrategy::default();
7437
7438        let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
7439            &values,
7440            &compression_strategy,
7441        )
7442        .unwrap();
7443
7444        let decompressor = decompression_strategy
7445            .create_block_decompressor(&encoding)
7446            .unwrap();
7447        let decompressed = decompressor
7448            .decompress(compressed_buf, values.len() as u64)
7449            .unwrap();
7450        let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
7451        assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
7452        assert_eq!(decompressed_fixed_width.bits_per_value, 16);
7453        let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
7454        assert_eq!(rep_result.as_ref(), values.as_ref());
7455    }
7456
7457    #[tokio::test]
7458    async fn test_complex_all_null_compression_gated_by_version() {
7459        use crate::format::pb21::page_layout::Layout;
7460        use arrow_array::ListArray;
7461
7462        let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7463            (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7464        );
7465        let arr: ArrayRef = Arc::new(list_array);
7466        let field = arrow_schema::Field::new(
7467            "c",
7468            DataType::List(Arc::new(arrow_schema::Field::new(
7469                "item",
7470                DataType::Int32,
7471                true,
7472            ))),
7473            true,
7474        );
7475
7476        let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
7477        let PageEncoding::Structural(layout_v21) = &page_v21.description else {
7478            panic!("Expected structural encoding");
7479        };
7480        let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
7481            panic!("Expected constant layout");
7482        };
7483        assert!(layout_v21.rep_compression.is_none());
7484        assert!(layout_v21.def_compression.is_none());
7485        assert_eq!(layout_v21.num_rep_values, 0);
7486        assert_eq!(layout_v21.num_def_values, 0);
7487
7488        let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
7489        let PageEncoding::Structural(layout_v22) = &page_v22.description else {
7490            panic!("Expected structural encoding");
7491        };
7492        let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
7493            panic!("Expected constant layout");
7494        };
7495        assert!(layout_v22.def_compression.is_some());
7496        assert!(layout_v22.num_def_values > 0);
7497    }
7498
7499    #[tokio::test]
7500    async fn test_complex_all_null_round_trip() {
7501        use arrow_array::ListArray;
7502
7503        let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7504            (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7505        );
7506
7507        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
7508        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
7509            .await;
7510    }
7511
7512    // https://github.com/lance-format/lance/issues/6681
7513    #[tokio::test]
7514    async fn test_sparse_boolean_list_roundtrip() {
7515        use arrow_array::builder::{BooleanBuilder, ListBuilder};
7516
7517        let mut list_builder = ListBuilder::new(BooleanBuilder::new());
7518        for i in 0..1000i32 {
7519            if i % 64 == 0 {
7520                // Alternate true/false so the array is not constant (constant path avoids the bug).
7521                list_builder.values().append_value(i % 128 == 0);
7522                list_builder.append(true);
7523            } else {
7524                list_builder.append(false);
7525            }
7526        }
7527        let list_array = Arc::new(list_builder.finish());
7528
7529        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
7530        check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
7531    }
7532}