Skip to main content

lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use crate::{
16    constants::{
17        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18    },
19    data::DictionaryDataBlock,
20    encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21    format::{
22        ProtobufUtils21,
23        pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
24    },
25};
26use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
27use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
31use itertools::Itertools;
32use lance_arrow::DataTypeExt;
33use lance_arrow::deepcopy::deep_copy_nulls;
34use lance_core::{
35    cache::{CacheKey, Context, DeepSizeOf},
36    error::{Error, LanceOptionExt},
37    utils::bit::pad_bytes,
38};
39use log::trace;
40
41use crate::{
42    compression::{
43        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44    },
45    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46    utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50    encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56    encodings::logical::primitive::miniblock::MiniBlockCompressed,
57    statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60    repdef::{
61        CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
62        RepDefSlicer, build_control_word_iterator,
63    },
64    utils::accumulation::AccumulationQueue,
65};
66use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
67
68use crate::constants::{
69    COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
70    DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
71    DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
72    DICT_VALUES_COMPRESSION_META_KEY,
73};
74use crate::version::LanceFileVersion;
75use crate::{
76    EncodingsIo,
77    buffer::LanceBuffer,
78    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
79    decoder::{
80        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
81        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
82        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
83        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
84    },
85    encoder::{
86        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
87    },
88    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
89};
90
91pub mod blob;
92pub mod constant;
93pub mod dict;
94pub mod fullzip;
95pub mod miniblock;
96
97const FILL_BYTE: u8 = 0xFE;
98const DEFAULT_DICT_DIVISOR: u64 = 2;
99const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
100const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
101const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
102
103struct PageLoadTask {
104    decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
105    num_rows: u64,
106}
107
108/// A trait for figuring out how to schedule the data within
109/// a single page.
110trait StructuralPageScheduler: std::fmt::Debug + Send {
111    /// Fetches any metadata required for the page
112    fn initialize<'a>(
113        &'a mut self,
114        io: &Arc<dyn EncodingsIo>,
115    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
116    /// Loads metadata from a previous initialize call
117    fn load(&mut self, data: &Arc<dyn CachedPageData>);
118    /// Schedules the read of the given ranges in the page
119    ///
120    /// The read may be split into multiple "shards" if the page is extremely large.
121    /// Each shard maps to one or more rows and can be decoded independently.
122    ///
123    /// Note: this sharding is for splitting up very large pages into smaller reads to
124    /// avoid buffering too much data in memory.  It is not related to the batch size or
125    /// compute units in any way.
126    fn schedule_ranges(
127        &self,
128        ranges: &[Range<u64>],
129        io: &Arc<dyn EncodingsIo>,
130    ) -> Result<Vec<PageLoadTask>>;
131}
132
133/// Metadata describing the decoded size of a mini-block
134#[derive(Debug)]
135struct ChunkMeta {
136    num_values: u64,
137    chunk_size_bytes: u64,
138    offset_bytes: u64,
139}
140
141/// A mini-block chunk that has been decoded and decompressed
142#[derive(Debug, Clone)]
143struct DecodedMiniBlockChunk {
144    rep: Option<ScalarBuffer<u16>>,
145    def: Option<ScalarBuffer<u16>>,
146    values: DataBlock,
147}
148
149/// A task to decode a one or more mini-blocks of data into an output batch
150///
151/// Note: Two batches might share the same mini-block of data.  When this happens
152/// then each batch gets a copy of the block and each batch decodes the block independently.
153///
154/// This means we have duplicated work but it is necessary to avoid having to synchronize
155/// the decoding of the block. (TODO: test this theory)
156#[derive(Debug)]
157struct DecodeMiniBlockTask {
158    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
159    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
160    value_decompressor: Arc<dyn MiniBlockDecompressor>,
161    dictionary_data: Option<Arc<DataBlock>>,
162    def_meaning: Arc<[DefinitionInterpretation]>,
163    num_buffers: u64,
164    max_visible_level: u16,
165    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
166    has_large_chunk: bool,
167}
168
169impl DecodeMiniBlockTask {
170    fn decode_levels(
171        rep_decompressor: &dyn BlockDecompressor,
172        levels: LanceBuffer,
173        num_levels: u16,
174    ) -> Result<ScalarBuffer<u16>> {
175        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
176        let rep = rep.as_fixed_width().unwrap();
177        debug_assert_eq!(rep.num_values, num_levels as u64);
178        debug_assert_eq!(rep.bits_per_value, 16);
179        Ok(rep.data.borrow_to_typed_slice::<u16>())
180    }
181
182    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
183    // values from `level_buf` starting at `offset`.
184    //
185    // We need to handle both the case where `levels` is None (no nulls encountered
186    // yet) and the case where `level_buf` is None (the input we are copying from has
187    // no nulls)
188    fn extend_levels(
189        range: Range<u64>,
190        levels: &mut Option<LevelBuffer>,
191        level_buf: &Option<impl AsRef<[u16]>>,
192        dest_offset: usize,
193    ) {
194        if let Some(level_buf) = level_buf {
195            if levels.is_none() {
196                // This is the first non-empty def buf we've hit, fill in the past
197                // with 0 (valid)
198                let mut new_levels_vec =
199                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
200                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
201                *levels = Some(new_levels_vec);
202            }
203            levels.as_mut().unwrap().extend(
204                level_buf.as_ref()[range.start as usize..range.end as usize]
205                    .iter()
206                    .copied(),
207            );
208        } else if let Some(levels) = levels {
209            let num_values = (range.end - range.start) as usize;
210            // This is an all-valid level_buf but we had nulls earlier and so we
211            // need to materialize it
212            levels.extend(iter::repeat_n(0, num_values));
213        }
214    }
215
216    /// Maps a range of rows to a range of items and a range of levels
217    ///
218    /// If there is no repetition information this just returns the range as-is.
219    ///
220    /// If there is repetition information then we need to do some work to figure out what
221    /// range of items corresponds to the requested range of rows.
222    ///
223    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
224    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
225    ///
226    /// Idx: 0 1 2 3 4 5 6
227    /// Rep: 1 0 0 1 0 1 0
228    ///
229    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
230    ///
231    /// If there are invisible items then we don't count them when calculating the range of items we
232    /// are interested in but we do count them when calculating the range of levels we are interested
233    /// in.  As a result we have to return both the item range (first return value) and the level range
234    /// (second return value).
235    ///
236    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
237    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
238    ///
239    /// Idx: 0 1 2 3 4 5 6 7 8
240    /// Rep: 1 0 0 1 0 1 1 0 0
241    /// Def: 0 0 0 0 0 1 0 0 0
242    /// Itm: 1 2 3 4 5 6 7 8
243    ///
244    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
245    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
246    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
247    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
248    ///
249    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
250    ///
251    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
252    fn map_range(
253        range: Range<u64>,
254        rep: Option<&impl AsRef<[u16]>>,
255        def: Option<&impl AsRef<[u16]>>,
256        max_rep: u16,
257        max_visible_def: u16,
258        // The total number of items (not rows) in the chunk.  This is not quite the same as
259        // rep.len() / def.len() because it doesn't count invisible items
260        total_items: u64,
261        preamble_action: PreambleAction,
262    ) -> (Range<u64>, Range<u64>) {
263        if let Some(rep) = rep {
264            let mut rep = rep.as_ref();
265            // If there is a preamble and we need to skip it then do that first.  The work is the same
266            // whether there is def information or not
267            let mut items_in_preamble = 0_u64;
268            let first_row_start = match preamble_action {
269                PreambleAction::Skip | PreambleAction::Take => {
270                    let first_row_start = if let Some(def) = def.as_ref() {
271                        let mut first_row_start = None;
272                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
273                            if *rep == max_rep {
274                                first_row_start = Some(idx as u64);
275                                break;
276                            }
277                            if *def <= max_visible_def {
278                                items_in_preamble += 1;
279                            }
280                        }
281                        first_row_start
282                    } else {
283                        let first_row_start =
284                            rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
285                        items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
286                        first_row_start
287                    };
288                    // It is possible for a chunk to be entirely partial values but if it is then it
289                    // should never show up as a preamble to skip
290                    if first_row_start.is_none() {
291                        assert!(preamble_action == PreambleAction::Take);
292                        return (0..total_items, 0..rep.len() as u64);
293                    }
294                    let first_row_start = first_row_start.unwrap();
295                    rep = &rep[first_row_start as usize..];
296                    first_row_start
297                }
298                PreambleAction::Absent => {
299                    debug_assert!(rep[0] == max_rep);
300                    0
301                }
302            };
303
304            // We hit this case when all we needed was the preamble
305            if range.start == range.end {
306                debug_assert!(preamble_action == PreambleAction::Take);
307                debug_assert!(items_in_preamble <= total_items);
308                return (0..items_in_preamble, 0..first_row_start);
309            }
310            assert!(range.start < range.end);
311
312            let mut rows_seen = 0;
313            let mut new_start = 0;
314            let mut new_levels_start = 0;
315
316            if let Some(def) = def {
317                let def = &def.as_ref()[first_row_start as usize..];
318
319                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
320                let mut lead_invis_seen = 0;
321
322                if range.start > 0 {
323                    if def[0] > max_visible_def {
324                        lead_invis_seen += 1;
325                    }
326                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
327                        if *rep == max_rep {
328                            rows_seen += 1;
329                            if rows_seen == range.start {
330                                new_start = idx as u64 + 1 - lead_invis_seen;
331                                new_levels_start = idx as u64 + 1;
332                                break;
333                            }
334                        }
335                        if *def > max_visible_def {
336                            lead_invis_seen += 1;
337                        }
338                    }
339                }
340
341                rows_seen += 1;
342
343                let mut new_end = u64::MAX;
344                let mut new_levels_end = rep.len() as u64;
345                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
346                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
347                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
348                    .iter()
349                    .zip(&def[(new_levels_start + 1) as usize..])
350                    .enumerate()
351                {
352                    if *rep == max_rep {
353                        rows_seen += 1;
354                        if rows_seen == range.end + 1 {
355                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
356                            new_levels_end = idx as u64 + new_levels_start + 1;
357                            break;
358                        }
359                    }
360                    if *def > max_visible_def {
361                        tail_invis_seen += 1;
362                    }
363                }
364
365                if new_end == u64::MAX {
366                    new_levels_end = rep.len() as u64;
367                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
368                    new_end = rep.len() as u64 - total_invis_seen;
369                }
370
371                assert_ne!(new_end, u64::MAX);
372
373                // Adjust for any skipped preamble
374                if preamble_action == PreambleAction::Skip {
375                    new_start += items_in_preamble;
376                    new_end += items_in_preamble;
377                    new_levels_start += first_row_start;
378                    new_levels_end += first_row_start;
379                } else if preamble_action == PreambleAction::Take {
380                    debug_assert_eq!(new_start, 0);
381                    debug_assert_eq!(new_levels_start, 0);
382                    new_end += items_in_preamble;
383                    new_levels_end += first_row_start;
384                }
385
386                debug_assert!(new_end <= total_items);
387                (new_start..new_end, new_levels_start..new_levels_end)
388            } else {
389                // Easy case, there are no invisible items, so we don't need to check for them
390                // The items range and levels range will be the same.  We do still need to walk
391                // the rep levels to find the row boundaries
392
393                // range.start == 0 always maps to 0, otherwise we need to walk
394                if range.start > 0 {
395                    for (idx, rep) in rep.iter().skip(1).enumerate() {
396                        if *rep == max_rep {
397                            rows_seen += 1;
398                            if rows_seen == range.start {
399                                new_start = idx as u64 + 1;
400                                break;
401                            }
402                        }
403                    }
404                }
405                let mut new_end = rep.len() as u64;
406                // range.end == max_items always maps to rep.len(), otherwise we need to walk
407                if range.end < total_items {
408                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
409                        if *rep == max_rep {
410                            rows_seen += 1;
411                            if rows_seen == range.end {
412                                new_end = idx as u64 + new_start + 1;
413                                break;
414                            }
415                        }
416                    }
417                }
418
419                // Adjust for any skipped preamble
420                if preamble_action == PreambleAction::Skip {
421                    new_start += first_row_start;
422                    new_end += first_row_start;
423                } else if preamble_action == PreambleAction::Take {
424                    debug_assert_eq!(new_start, 0);
425                    new_end += first_row_start;
426                }
427
428                debug_assert!(new_end <= total_items);
429                (new_start..new_end, new_start..new_end)
430            }
431        } else {
432            // No repetition info, easy case, just use the range as-is and the item
433            // and level ranges are the same
434            (range.clone(), range)
435        }
436    }
437
438    // read `num_buffers` buffer sizes from `buf` starting at `offset`
439    fn read_buffer_sizes<const LARGE: bool>(
440        buf: &[u8],
441        offset: &mut usize,
442        num_buffers: u64,
443    ) -> Vec<u32> {
444        let read_size = if LARGE { 4 } else { 2 };
445        (0..num_buffers)
446            .map(|_| {
447                let bytes = &buf[*offset..*offset + read_size];
448                let size = if LARGE {
449                    u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
450                } else {
451                    // the buffer size is read from u16 but is stored as u32 after decoding for consistency
452                    u16::from_le_bytes([bytes[0], bytes[1]]) as u32
453                };
454                *offset += read_size;
455                size
456            })
457            .collect()
458    }
459
460    // Unserialize a miniblock into a collection of vectors
461    fn decode_miniblock_chunk(
462        &self,
463        buf: &LanceBuffer,
464        items_in_chunk: u64,
465    ) -> Result<DecodedMiniBlockChunk> {
466        let mut offset = 0;
467        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
468        offset += 2;
469
470        let rep_size = if self.rep_decompressor.is_some() {
471            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
472            offset += 2;
473            Some(rep_size)
474        } else {
475            None
476        };
477        let def_size = if self.def_decompressor.is_some() {
478            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
479            offset += 2;
480            Some(def_size)
481        } else {
482            None
483        };
484
485        let buffer_sizes = if self.has_large_chunk {
486            Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
487        } else {
488            Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
489        };
490
491        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
492
493        let rep = rep_size.map(|rep_size| {
494            let rep = buf.slice_with_length(offset, rep_size as usize);
495            offset += rep_size as usize;
496            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
497            rep
498        });
499
500        let def = def_size.map(|def_size| {
501            let def = buf.slice_with_length(offset, def_size as usize);
502            offset += def_size as usize;
503            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504            def
505        });
506
507        let buffers = buffer_sizes
508            .into_iter()
509            .map(|buf_size| {
510                let buf = buf.slice_with_length(offset, buf_size as usize);
511                offset += buf_size as usize;
512                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
513                buf
514            })
515            .collect::<Vec<_>>();
516
517        let values = self
518            .value_decompressor
519            .decompress(buffers, items_in_chunk)?;
520
521        let rep = rep
522            .map(|rep| {
523                Self::decode_levels(
524                    self.rep_decompressor.as_ref().unwrap().as_ref(),
525                    rep,
526                    num_levels,
527                )
528            })
529            .transpose()?;
530        let def = def
531            .map(|def| {
532                Self::decode_levels(
533                    self.def_decompressor.as_ref().unwrap().as_ref(),
534                    def,
535                    num_levels,
536                )
537            })
538            .transpose()?;
539
540        Ok(DecodedMiniBlockChunk { rep, def, values })
541    }
542}
543
544impl DecodePageTask for DecodeMiniBlockTask {
545    fn decode(self: Box<Self>) -> Result<DecodedPage> {
546        // First, we create output buffers for the rep and def and data
547        let mut repbuf: Option<LevelBuffer> = None;
548        let mut defbuf: Option<LevelBuffer> = None;
549
550        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
551
552        // This is probably an over-estimate but it's quick and easy to calculate
553        let estimated_size_bytes = self
554            .instructions
555            .iter()
556            .map(|(_, chunk)| chunk.data.len())
557            .sum::<usize>()
558            * 2;
559        let mut data_builder =
560            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
561
562        // We need to keep track of the offset into repbuf/defbuf that we are building up
563        let mut level_offset = 0;
564
565        // Pre-compute caching needs for each chunk by checking if the next chunk is the same
566        let needs_caching: Vec<bool> = self
567            .instructions
568            .windows(2)
569            .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
570            .chain(std::iter::once(false)) // the last one never needs caching
571            .collect();
572
573        // Cache for storing decoded chunks when beneficial
574        let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
575
576        // Now we iterate through each instruction and process it
577        for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
578            let should_cache_this_chunk = needs_caching[idx];
579
580            let decoded_chunk = match &chunk_cache {
581                Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
582                    // Clone only when we have a cache hit (much cheaper than decoding)
583                    cached_chunk.clone()
584                }
585                _ => {
586                    // Cache miss, need to decode
587                    let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
588
589                    // Only update cache if this chunk will benefit the next access
590                    if should_cache_this_chunk {
591                        chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
592                    }
593                    decoded
594                }
595            };
596
597            let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
598
599            // Our instructions tell us which rows we want to take from this chunk
600            let row_range_start =
601                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
602            let row_range_end = row_range_start + instructions.rows_to_take;
603
604            // We use the rep info to map the row range to an item range / levels range
605            let (item_range, level_range) = Self::map_range(
606                row_range_start..row_range_end,
607                rep.as_ref(),
608                def.as_ref(),
609                max_rep,
610                self.max_visible_level,
611                chunk.items_in_chunk,
612                instructions.preamble_action,
613            );
614            if item_range.end - item_range.start > chunk.items_in_chunk {
615                return Err(lance_core::Error::internal(format!(
616                    "Item range {:?} is greater than chunk items in chunk {:?}",
617                    item_range, chunk.items_in_chunk
618                )));
619            }
620
621            // Now we append the data to the output buffers
622            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
623            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
624            level_offset += (level_range.end - level_range.start) as usize;
625            data_builder.append(&values, item_range);
626        }
627
628        let mut data = data_builder.finish();
629
630        let unraveler =
631            RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
632
633        if let Some(dictionary) = &self.dictionary_data {
634            // Don't decode here, that happens later (if needed)
635            let DataBlock::FixedWidth(indices) = data else {
636                return Err(lance_core::Error::internal(format!(
637                    "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
638                    data
639                )));
640            };
641            data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
642                indices,
643                dictionary.as_ref().clone(),
644            ));
645        }
646
647        Ok(DecodedPage {
648            data,
649            repdef: unraveler,
650        })
651    }
652}
653
654/// A chunk that has been loaded by the miniblock scheduler (but not
655/// yet decoded)
656#[derive(Debug)]
657struct LoadedChunk {
658    data: LanceBuffer,
659    items_in_chunk: u64,
660    byte_range: Range<u64>,
661    chunk_idx: usize,
662}
663
664impl Clone for LoadedChunk {
665    fn clone(&self) -> Self {
666        Self {
667            // Safe as we always create borrowed buffers here
668            data: self.data.clone(),
669            items_in_chunk: self.items_in_chunk,
670            byte_range: self.byte_range.clone(),
671            chunk_idx: self.chunk_idx,
672        }
673    }
674}
675
676/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
677/// details on the different layouts.
678#[derive(Debug)]
679struct MiniBlockDecoder {
680    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
681    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
682    value_decompressor: Arc<dyn MiniBlockDecompressor>,
683    def_meaning: Arc<[DefinitionInterpretation]>,
684    loaded_chunks: VecDeque<LoadedChunk>,
685    instructions: VecDeque<ChunkInstructions>,
686    offset_in_current_chunk: u64,
687    num_rows: u64,
688    num_buffers: u64,
689    dictionary: Option<Arc<DataBlock>>,
690    has_large_chunk: bool,
691}
692
693/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
694/// process for miniblock encoded data.
695impl StructuralPageDecoder for MiniBlockDecoder {
696    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
697        let mut items_desired = num_rows;
698        let mut need_preamble = false;
699        let mut skip_in_chunk = self.offset_in_current_chunk;
700        let mut drain_instructions = Vec::new();
701        while items_desired > 0 || need_preamble {
702            let (instructions, consumed) = self
703                .instructions
704                .front()
705                .unwrap()
706                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
707
708            while self.loaded_chunks.front().unwrap().chunk_idx
709                != instructions.chunk_instructions.chunk_idx
710            {
711                self.loaded_chunks.pop_front();
712            }
713            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
714            if consumed {
715                self.instructions.pop_front();
716            }
717        }
718        // We can throw away need_preamble here because it must be false.  If it were true it would mean
719        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
720        self.offset_in_current_chunk = skip_in_chunk;
721
722        let max_visible_level = self
723            .def_meaning
724            .iter()
725            .take_while(|l| !l.is_list())
726            .map(|l| l.num_def_levels())
727            .sum::<u16>();
728
729        Ok(Box::new(DecodeMiniBlockTask {
730            instructions: drain_instructions,
731            def_decompressor: self.def_decompressor.clone(),
732            rep_decompressor: self.rep_decompressor.clone(),
733            value_decompressor: self.value_decompressor.clone(),
734            dictionary_data: self.dictionary.clone(),
735            def_meaning: self.def_meaning.clone(),
736            num_buffers: self.num_buffers,
737            max_visible_level,
738            has_large_chunk: self.has_large_chunk,
739        }))
740    }
741
742    fn num_rows(&self) -> u64 {
743        self.num_rows
744    }
745}
746
747#[derive(Debug)]
748struct CachedComplexAllNullState {
749    rep: Option<ScalarBuffer<u16>>,
750    def: Option<ScalarBuffer<u16>>,
751}
752
753impl DeepSizeOf for CachedComplexAllNullState {
754    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
755        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
757    }
758}
759
760impl CachedPageData for CachedComplexAllNullState {
761    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
762        self
763    }
764}
765
766/// A scheduler for all-null data that has repetition and definition levels
767///
768/// We still need to do some I/O in this case because we need to figure out what kind of null we
769/// are dealing with (null list, null struct, what level null struct, etc.)
770///
771/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
772/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
773/// and fast so...maybe not :)
774#[derive(Debug)]
775pub struct ComplexAllNullScheduler {
776    // Set from protobuf
777    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
778    def_meaning: Arc<[DefinitionInterpretation]>,
779    repdef: Option<Arc<CachedComplexAllNullState>>,
780    max_visible_level: u16,
781    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
782    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
783    num_rep_values: u64,
784    num_def_values: u64,
785}
786
787impl ComplexAllNullScheduler {
788    pub fn new(
789        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
790        def_meaning: Arc<[DefinitionInterpretation]>,
791        rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
792        def_decompressor: Option<Arc<dyn BlockDecompressor>>,
793        num_rep_values: u64,
794        num_def_values: u64,
795    ) -> Self {
796        let max_visible_level = def_meaning
797            .iter()
798            .take_while(|l| !l.is_list())
799            .map(|l| l.num_def_levels())
800            .sum::<u16>();
801        Self {
802            buffer_offsets_and_sizes,
803            def_meaning,
804            repdef: None,
805            max_visible_level,
806            rep_decompressor,
807            def_decompressor,
808            num_rep_values,
809            num_def_values,
810        }
811    }
812}
813
814impl StructuralPageScheduler for ComplexAllNullScheduler {
815    fn initialize<'a>(
816        &'a mut self,
817        io: &Arc<dyn EncodingsIo>,
818    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
819        // Fully load the rep & def buffers, as needed
820        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
821        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
822        let has_rep = rep_size > 0;
823        let has_def = def_size > 0;
824
825        let mut reads = Vec::with_capacity(2);
826        if has_rep {
827            reads.push(rep_pos..rep_pos + rep_size);
828        }
829        if has_def {
830            reads.push(def_pos..def_pos + def_size);
831        }
832
833        let data = io.submit_request(reads, 0);
834        let rep_decompressor = self.rep_decompressor.clone();
835        let def_decompressor = self.def_decompressor.clone();
836        let num_rep_values = self.num_rep_values;
837        let num_def_values = self.num_def_values;
838
839        async move {
840            let data = data.await?;
841            let mut data_iter = data.into_iter();
842
843            let decompress_levels = |compressed_bytes: Bytes,
844                                     decompressor: &Arc<dyn BlockDecompressor>,
845                                     num_values: u64,
846                                     level_type: &str|
847             -> Result<ScalarBuffer<u16>> {
848                let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
849                let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
850                match decompressed {
851                    DataBlock::FixedWidth(block) => {
852                        if block.num_values != num_values {
853                            return Err(Error::invalid_input_source(format!(
854                                "Unexpected {} level count after decompression: expected {}, got {}",
855                                level_type, num_values, block.num_values
856                            )
857                            .into()));
858                        }
859                        if block.bits_per_value != 16 {
860                            return Err(Error::invalid_input_source(format!(
861                                "Unexpected {} level bit width after decompression: expected 16, got {}",
862                                level_type, block.bits_per_value
863                            )
864                            .into()));
865                        }
866                        Ok(block.data.borrow_to_typed_slice::<u16>())
867                    }
868                    _ => Err(Error::invalid_input_source(format!(
869                        "Expected fixed-width data block for {} levels",
870                        level_type
871                    )
872                    .into())),
873                }
874            };
875
876            let rep = if has_rep {
877                let rep = data_iter.next().unwrap();
878                if let Some(rep_decompressor) = rep_decompressor.as_ref() {
879                    Some(decompress_levels(
880                        rep,
881                        rep_decompressor,
882                        num_rep_values,
883                        "repetition",
884                    )?)
885                } else {
886                    let rep = LanceBuffer::from_bytes(rep, 2);
887                    let rep = rep.borrow_to_typed_slice::<u16>();
888                    Some(rep)
889                }
890            } else {
891                None
892            };
893
894            let def = if has_def {
895                let def = data_iter.next().unwrap();
896                if let Some(def_decompressor) = def_decompressor.as_ref() {
897                    Some(decompress_levels(
898                        def,
899                        def_decompressor,
900                        num_def_values,
901                        "definition",
902                    )?)
903                } else {
904                    let def = LanceBuffer::from_bytes(def, 2);
905                    let def = def.borrow_to_typed_slice::<u16>();
906                    Some(def)
907                }
908            } else {
909                None
910            };
911
912            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
913
914            self.repdef = Some(repdef.clone());
915
916            Ok(repdef as Arc<dyn CachedPageData>)
917        }
918        .boxed()
919    }
920
921    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
922        self.repdef = Some(
923            data.clone()
924                .as_arc_any()
925                .downcast::<CachedComplexAllNullState>()
926                .unwrap(),
927        );
928    }
929
930    fn schedule_ranges(
931        &self,
932        ranges: &[Range<u64>],
933        _io: &Arc<dyn EncodingsIo>,
934    ) -> Result<Vec<PageLoadTask>> {
935        let ranges = VecDeque::from_iter(ranges.iter().cloned());
936        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
937        let decoder = Box::new(ComplexAllNullPageDecoder {
938            ranges,
939            rep: self.repdef.as_ref().unwrap().rep.clone(),
940            def: self.repdef.as_ref().unwrap().def.clone(),
941            num_rows,
942            def_meaning: self.def_meaning.clone(),
943            max_visible_level: self.max_visible_level,
944        }) as Box<dyn StructuralPageDecoder>;
945        let page_load_task = PageLoadTask {
946            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
947            num_rows,
948        };
949        Ok(vec![page_load_task])
950    }
951}
952
953#[derive(Debug)]
954pub struct ComplexAllNullPageDecoder {
955    ranges: VecDeque<Range<u64>>,
956    rep: Option<ScalarBuffer<u16>>,
957    def: Option<ScalarBuffer<u16>>,
958    num_rows: u64,
959    def_meaning: Arc<[DefinitionInterpretation]>,
960    max_visible_level: u16,
961}
962
963impl ComplexAllNullPageDecoder {
964    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
965        let mut rows_desired = num_rows;
966        let mut ranges = Vec::with_capacity(self.ranges.len());
967        while rows_desired > 0 {
968            let front = self.ranges.front_mut().unwrap();
969            let avail = front.end - front.start;
970            if avail > rows_desired {
971                ranges.push(front.start..front.start + rows_desired);
972                front.start += rows_desired;
973                rows_desired = 0;
974            } else {
975                ranges.push(self.ranges.pop_front().unwrap());
976                rows_desired -= avail;
977            }
978        }
979        ranges
980    }
981}
982
983impl StructuralPageDecoder for ComplexAllNullPageDecoder {
984    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
985        let drained_ranges = self.drain_ranges(num_rows);
986        Ok(Box::new(DecodeComplexAllNullTask {
987            ranges: drained_ranges,
988            rep: self.rep.clone(),
989            def: self.def.clone(),
990            def_meaning: self.def_meaning.clone(),
991            max_visible_level: self.max_visible_level,
992        }))
993    }
994
995    fn num_rows(&self) -> u64 {
996        self.num_rows
997    }
998}
999
1000/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
1001/// for the null data.
1002#[derive(Debug)]
1003pub struct DecodeComplexAllNullTask {
1004    ranges: Vec<Range<u64>>,
1005    rep: Option<ScalarBuffer<u16>>,
1006    def: Option<ScalarBuffer<u16>>,
1007    def_meaning: Arc<[DefinitionInterpretation]>,
1008    max_visible_level: u16,
1009}
1010
1011impl DecodeComplexAllNullTask {
1012    fn decode_level(
1013        &self,
1014        levels: &Option<ScalarBuffer<u16>>,
1015        num_values: u64,
1016    ) -> Option<Vec<u16>> {
1017        levels.as_ref().map(|levels| {
1018            let mut referenced_levels = Vec::with_capacity(num_values as usize);
1019            for range in &self.ranges {
1020                referenced_levels.extend(
1021                    levels[range.start as usize..range.end as usize]
1022                        .iter()
1023                        .copied(),
1024                );
1025            }
1026            referenced_levels
1027        })
1028    }
1029}
1030
1031impl DecodePageTask for DecodeComplexAllNullTask {
1032    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1033        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1034        let rep = self.decode_level(&self.rep, num_values);
1035        let def = self.decode_level(&self.def, num_values);
1036
1037        // If there are definition levels there may be empty / null lists which are not visible
1038        // in the items array.  We need to account for that here to figure out how many values
1039        // should be in the items array.
1040        let num_values = if let Some(def) = &def {
1041            def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
1042        } else {
1043            num_values
1044        };
1045
1046        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1047        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
1048        Ok(DecodedPage {
1049            data,
1050            repdef: unraveler,
1051        })
1052    }
1053}
1054
1055/// A scheduler for simple all-null data
1056///
1057/// "simple" all-null data is data that is all null and only has a single level of definition and
1058/// no repetition.  We don't need to read any data at all in this case.
1059#[derive(Debug, Default)]
1060pub struct SimpleAllNullScheduler {}
1061
1062impl StructuralPageScheduler for SimpleAllNullScheduler {
1063    fn initialize<'a>(
1064        &'a mut self,
1065        _io: &Arc<dyn EncodingsIo>,
1066    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1067        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1068    }
1069
1070    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1071
1072    fn schedule_ranges(
1073        &self,
1074        ranges: &[Range<u64>],
1075        _io: &Arc<dyn EncodingsIo>,
1076    ) -> Result<Vec<PageLoadTask>> {
1077        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1078        let decoder =
1079            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1080        let page_load_task = PageLoadTask {
1081            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1082            num_rows,
1083        };
1084        Ok(vec![page_load_task])
1085    }
1086}
1087
1088/// A page decode task for all-null data without any
1089/// repetition and only a single level of definition
1090#[derive(Debug)]
1091struct SimpleAllNullDecodePageTask {
1092    num_values: u64,
1093}
1094impl DecodePageTask for SimpleAllNullDecodePageTask {
1095    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1096        let unraveler = RepDefUnraveler::new(
1097            None,
1098            Some(vec![1; self.num_values as usize]),
1099            Arc::new([DefinitionInterpretation::NullableItem]),
1100            self.num_values,
1101        );
1102        Ok(DecodedPage {
1103            data: DataBlock::AllNull(AllNullDataBlock {
1104                num_values: self.num_values,
1105            }),
1106            repdef: unraveler,
1107        })
1108    }
1109}
1110
1111#[derive(Debug)]
1112pub struct SimpleAllNullPageDecoder {
1113    num_rows: u64,
1114}
1115
1116impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1117    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1118        Ok(Box::new(SimpleAllNullDecodePageTask {
1119            num_values: num_rows,
1120        }))
1121    }
1122
1123    fn num_rows(&self) -> u64 {
1124        self.num_rows
1125    }
1126}
1127
1128#[derive(Debug, Clone)]
1129struct MiniBlockSchedulerDictionary {
1130    // These come from the protobuf
1131    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1132    dictionary_buf_position_and_size: (u64, u64),
1133    dictionary_data_alignment: u64,
1134    num_dictionary_items: u64,
1135}
1136
1137/// Individual block metadata within a MiniBlock repetition index.
1138#[derive(Debug)]
1139struct MiniBlockRepIndexBlock {
1140    // The index of the first row that starts after the beginning of this block.  If the block
1141    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1142    // then this will be a row that starts in some future block.
1143    first_row: u64,
1144    // The number of rows in the block, including the trailer but not the preamble.
1145    // Can be 0 if the block is entirely preamble
1146    starts_including_trailer: u64,
1147    // Whether the block has a preamble
1148    has_preamble: bool,
1149    // Whether the block has a trailer
1150    has_trailer: bool,
1151}
1152
1153impl DeepSizeOf for MiniBlockRepIndexBlock {
1154    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1155        0
1156    }
1157}
1158
1159/// Repetition index for MiniBlock encoding.
1160///
1161/// Stores block-level offset information to enable efficient random
1162/// access to nested data structures within mini-blocks.
1163#[derive(Debug)]
1164struct MiniBlockRepIndex {
1165    blocks: Vec<MiniBlockRepIndexBlock>,
1166}
1167
1168impl DeepSizeOf for MiniBlockRepIndex {
1169    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1170        self.blocks.deep_size_of_children(context)
1171    }
1172}
1173
1174impl MiniBlockRepIndex {
1175    /// Decode repetition index from chunk metadata using default values.
1176    ///
1177    /// This creates a repetition index where each chunk has no partial values
1178    /// and no trailers, suitable for simple sequential data layouts.
1179    pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1180        let mut blocks = Vec::with_capacity(chunks.len());
1181        let mut offset: u64 = 0;
1182
1183        for c in chunks {
1184            blocks.push(MiniBlockRepIndexBlock {
1185                first_row: offset,
1186                starts_including_trailer: c.num_values,
1187                has_preamble: false,
1188                has_trailer: false,
1189            });
1190
1191            offset += c.num_values;
1192        }
1193
1194        Self { blocks }
1195    }
1196
1197    /// Decode repetition index from raw bytes in little-endian format.
1198    ///
1199    /// The bytes should contain u64 values arranged in groups of `stride` elements,
1200    /// where the first two values of each group represent ends_count and partial_count.
1201    /// Returns an empty index if no bytes are provided.
1202    pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1203        // Convert bytes to u64 slice, handling alignment automatically
1204        let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1205        let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1206        let n = u64_slice.len() / stride;
1207
1208        let mut blocks = Vec::with_capacity(n);
1209        let mut chunk_has_preamble = false;
1210        let mut offset: u64 = 0;
1211
1212        // Extract first two values from each block: ends_count and partial_count
1213        for i in 0..n {
1214            let base_idx = i * stride;
1215            let ends = u64_slice[base_idx];
1216            let partial = u64_slice[base_idx + 1];
1217
1218            let has_trailer = partial > 0;
1219            // Convert branches to arithmetic for better compiler optimization
1220            let starts_including_trailer =
1221                ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1222
1223            blocks.push(MiniBlockRepIndexBlock {
1224                first_row: offset,
1225                starts_including_trailer,
1226                has_preamble: chunk_has_preamble,
1227                has_trailer,
1228            });
1229
1230            chunk_has_preamble = has_trailer;
1231            offset += starts_including_trailer;
1232        }
1233
1234        Self { blocks }
1235    }
1236}
1237
1238/// State that is loaded once and cached for future lookups
1239#[derive(Debug)]
1240struct MiniBlockCacheableState {
1241    /// Metadata that describes each chunk in the page
1242    chunk_meta: Vec<ChunkMeta>,
1243    /// The decoded repetition index
1244    rep_index: MiniBlockRepIndex,
1245    /// The dictionary for the page, if any
1246    dictionary: Option<Arc<DataBlock>>,
1247}
1248
1249impl DeepSizeOf for MiniBlockCacheableState {
1250    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1251        self.rep_index.deep_size_of_children(context)
1252            + self
1253                .dictionary
1254                .as_ref()
1255                .map(|dict| dict.data_size() as usize)
1256                .unwrap_or(0)
1257    }
1258}
1259
1260impl CachedPageData for MiniBlockCacheableState {
1261    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1262        self
1263    }
1264}
1265
1266/// A scheduler for a page that has been encoded with the mini-block layout
1267///
1268/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1269/// in practice.
1270///
1271/// First, during initialization, we load the chunk metadata, the repetition index,
1272/// and the dictionary (these last two may not be present)
1273///
1274/// Then, during scheduling, we use the user's requested row ranges and the repetition
1275/// index to determine which chunks we need and which rows we need from those chunks.
1276///
1277/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1278/// from the user is 40..60 then we need to:
1279///
1280///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1281///    then read 3 items for the 11th row of our range.
1282///  - Read the second chunk and read the remaining items in our 11th row and then read
1283///    the remaining 9 full rows.
1284///
1285/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1286/// The first two decode tasks will just need the first chunk.  The third decode task will
1287/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1288/// chunk.  The final decode task will just need the second chunk.
1289///
1290/// The above prose descriptions are what are represented by `ChunkInstructions` and
1291/// `ChunkDrainInstructions`.
1292#[derive(Debug)]
1293pub struct MiniBlockScheduler {
1294    // These come from the protobuf
1295    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1296    priority: u64,
1297    items_in_page: u64,
1298    repetition_index_depth: u16,
1299    num_buffers: u64,
1300    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1301    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1302    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1303    def_meaning: Arc<[DefinitionInterpretation]>,
1304    dictionary: Option<MiniBlockSchedulerDictionary>,
1305    // This is set after initialization
1306    page_meta: Option<Arc<MiniBlockCacheableState>>,
1307    has_large_chunk: bool,
1308}
1309
1310impl MiniBlockScheduler {
1311    fn try_new(
1312        buffer_offsets_and_sizes: &[(u64, u64)],
1313        priority: u64,
1314        items_in_page: u64,
1315        layout: &pb21::MiniBlockLayout,
1316        decompressors: &dyn DecompressionStrategy,
1317    ) -> Result<Self> {
1318        let rep_decompressor = layout
1319            .rep_compression
1320            .as_ref()
1321            .map(|rep_compression| {
1322                decompressors
1323                    .create_block_decompressor(rep_compression)
1324                    .map(Arc::from)
1325            })
1326            .transpose()?;
1327        let def_decompressor = layout
1328            .def_compression
1329            .as_ref()
1330            .map(|def_compression| {
1331                decompressors
1332                    .create_block_decompressor(def_compression)
1333                    .map(Arc::from)
1334            })
1335            .transpose()?;
1336        let def_meaning = layout
1337            .layers
1338            .iter()
1339            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1340            .collect::<Vec<_>>();
1341        let value_decompressor = decompressors.create_miniblock_decompressor(
1342            layout.value_compression.as_ref().unwrap(),
1343            decompressors,
1344        )?;
1345
1346        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1347            let num_dictionary_items = layout.num_dictionary_items;
1348            let dictionary_decompressor = decompressors
1349                .create_block_decompressor(dictionary_encoding)?
1350                .into();
1351            let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
1352            {
1353                Compression::Variable(_) => 4,
1354                Compression::Flat(_) => 16,
1355                Compression::General(_) => 1,
1356                Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
1357                    crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
1358                }
1359                _ => {
1360                    return Err(Error::invalid_input_source(
1361                        format!(
1362                            "Unsupported mini-block dictionary encoding: {:?}",
1363                            dictionary_encoding.compression.as_ref().unwrap()
1364                        )
1365                        .into(),
1366                    ));
1367                }
1368            };
1369            Some(MiniBlockSchedulerDictionary {
1370                dictionary_decompressor,
1371                dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1372                dictionary_data_alignment,
1373                num_dictionary_items,
1374            })
1375        } else {
1376            None
1377        };
1378
1379        Ok(Self {
1380            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1381            rep_decompressor,
1382            def_decompressor,
1383            value_decompressor: value_decompressor.into(),
1384            repetition_index_depth: layout.repetition_index_depth as u16,
1385            num_buffers: layout.num_buffers,
1386            priority,
1387            items_in_page,
1388            dictionary,
1389            def_meaning: def_meaning.into(),
1390            page_meta: None,
1391            has_large_chunk: layout.has_large_chunk,
1392        })
1393    }
1394
1395    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1396        let page_meta = self.page_meta.as_ref().unwrap();
1397        chunk_indices
1398            .iter()
1399            .map(|&chunk_idx| {
1400                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1401                let bytes_start = chunk_meta.offset_bytes;
1402                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1403                LoadedChunk {
1404                    byte_range: bytes_start..bytes_end,
1405                    items_in_chunk: chunk_meta.num_values,
1406                    chunk_idx,
1407                    data: LanceBuffer::empty(),
1408                }
1409            })
1410            .collect()
1411    }
1412}
1413
1414#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1415enum PreambleAction {
1416    Take,
1417    Skip,
1418    Absent,
1419}
1420
1421// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1422// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1423// us how exactly to read from the chunk.
1424//
1425// Examples:
1426//
1427// | Chunk 0     | Chunk 1   | Chunk 2   | Chunk 3 |
1428// | xxxxyyyyzzz | zzzzzzzzz | zzzzzzzzz | aaabbcc |
1429//
1430// Full read (0..6)
1431//
1432// Chunk 0: (several rows, ends with trailer)
1433//   preamble: absent
1434//   rows_to_skip: 0
1435//   rows_to_take: 3 (x, y, z)
1436//   take_trailer: true
1437//
1438// Chunk 1: (all preamble, ends with trailer)
1439//   preamble: take
1440//   rows_to_skip: 0
1441//   rows_to_take: 0
1442//   take_trailer: true
1443//
1444// Chunk 2: (all preamble, no trailer)
1445//   preamble: take
1446//   rows_to_skip: 0
1447//   rows_to_take: 0
1448//   take_trailer: false
1449//
1450// Chunk 3: (several rows, no trailer or preamble)
1451//   preamble: absent
1452//   rows_to_skip: 0
1453//   rows_to_take: 3 (a, b, c)
1454//   take_trailer: false
1455#[derive(Clone, Debug, PartialEq, Eq)]
1456struct ChunkInstructions {
1457    // The index of the chunk to read
1458    chunk_idx: usize,
1459    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1460    // is no repetition index there is never a preamble.
1461    //
1462    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1463    // that spans several chunks.
1464    preamble: PreambleAction,
1465    // How many complete rows (not including the preamble or trailer) to skip
1466    //
1467    // If this is non-zero then premable must not be Take
1468    rows_to_skip: u64,
1469    // How many rows to take.  If a row splits across chunks then we will count the row in the first
1470    // chunk that contains the row.
1471    rows_to_take: u64,
1472    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1473    // never a trailer.
1474    //
1475    // A chunk that is all preamble may or may not have a trailer.
1476    //
1477    // If this is true then we want to include the trailer
1478    take_trailer: bool,
1479}
1480
1481// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1482// start decoding them, based on a batch size, which might not align with what we scheduled.
1483//
1484// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1485//
1486// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1487// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1488// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1489// then take 10 rows.
1490//
1491// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1492//  -no preamble, skip 5, take 10, take trailer
1493//  -take preamble, skip 0, take 50, no trailer
1494//
1495// and we are draining 20 rows then the drain instructions for the first batch will be:
1496//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1497//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1498#[derive(Debug, PartialEq, Eq)]
1499struct ChunkDrainInstructions {
1500    chunk_instructions: ChunkInstructions,
1501    rows_to_skip: u64,
1502    rows_to_take: u64,
1503    preamble_action: PreambleAction,
1504}
1505
1506impl ChunkInstructions {
1507    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1508    //
1509    // We assume that `user_ranges` are in sorted order and non-overlapping
1510    //
1511    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1512    fn schedule_instructions(
1513        rep_index: &MiniBlockRepIndex,
1514        user_ranges: &[Range<u64>],
1515    ) -> Vec<Self> {
1516        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1517        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1518        // per row which can happen with lists.
1519        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1520
1521        for user_range in user_ranges {
1522            let mut rows_needed = user_range.end - user_range.start;
1523            let mut need_preamble = false;
1524
1525            // Need to find the first chunk with a first row >= user_range.start.  If there are
1526            // multiple chunks with the same first row we need to take the first one.
1527            let mut block_index = match rep_index
1528                .blocks
1529                .binary_search_by_key(&user_range.start, |block| block.first_row)
1530            {
1531                Ok(idx) => {
1532                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1533                    // are grabbing first eligible chunk
1534                    let mut idx = idx;
1535                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1536                        idx -= 1;
1537                    }
1538                    idx
1539                }
1540                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1541                Err(idx) => idx - 1,
1542            };
1543
1544            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1545
1546            while rows_needed > 0 || need_preamble {
1547                // Check if we've gone past the last block (should not happen)
1548                if block_index >= rep_index.blocks.len() {
1549                    log::warn!(
1550                        "schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
1551                    );
1552                    break;
1553                }
1554
1555                let chunk = &rep_index.blocks[block_index];
1556                let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1557
1558                // Handle blocks that are entirely preamble (rows_avail = 0)
1559                // These blocks have no rows to take but may have a preamble we need
1560                // We only look for preamble if to_skip == 0 (we're not skipping rows)
1561                if rows_avail == 0 && to_skip == 0 {
1562                    // Only process if this chunk has a preamble we need
1563                    if chunk.has_preamble && need_preamble {
1564                        chunk_instructions.push(Self {
1565                            chunk_idx: block_index,
1566                            preamble: PreambleAction::Take,
1567                            rows_to_skip: 0,
1568                            rows_to_take: 0,
1569                            // We still need to look at has_trailer to distinguish between "all preamble
1570                            // and row ends at end of chunk" and "all preamble and row bleeds into next
1571                            // chunk".  Both cases will have 0 rows available.
1572                            take_trailer: chunk.has_trailer,
1573                        });
1574                        // Only set need_preamble = false if the chunk has at least one row,
1575                        // Or we are reaching the last block,
1576                        // Otherwise, the chunk is entirely preamble and we need the next chunk's preamble too
1577                        if chunk.starts_including_trailer > 0
1578                            || block_index == rep_index.blocks.len() - 1
1579                        {
1580                            need_preamble = false;
1581                        }
1582                    }
1583                    // Move to next block
1584                    block_index += 1;
1585                    continue;
1586                }
1587
1588                // Edge case: if rows_avail == 0 but to_skip > 0
1589                // This theoretically shouldn't happen (binary search should avoid it)
1590                // but handle it for safety
1591                if rows_avail == 0 && to_skip > 0 {
1592                    // This block doesn't have enough rows to skip, move to next block
1593                    // Adjust to_skip by the number of rows in this block
1594                    to_skip -= chunk.starts_including_trailer;
1595                    block_index += 1;
1596                    continue;
1597                }
1598
1599                let rows_to_take = rows_avail.min(rows_needed);
1600                rows_needed -= rows_to_take;
1601
1602                let mut take_trailer = false;
1603                let preamble = if chunk.has_preamble {
1604                    if need_preamble {
1605                        PreambleAction::Take
1606                    } else {
1607                        PreambleAction::Skip
1608                    }
1609                } else {
1610                    PreambleAction::Absent
1611                };
1612
1613                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1614                if rows_to_take == rows_avail && chunk.has_trailer {
1615                    take_trailer = true;
1616                    need_preamble = true;
1617                } else {
1618                    need_preamble = false;
1619                };
1620
1621                chunk_instructions.push(Self {
1622                    preamble,
1623                    chunk_idx: block_index,
1624                    rows_to_skip: to_skip,
1625                    rows_to_take,
1626                    take_trailer,
1627                });
1628
1629                to_skip = 0;
1630                block_index += 1;
1631            }
1632        }
1633
1634        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1635        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1636        // rows of chunk 0 starting at 0")
1637        if user_ranges.len() > 1 {
1638            // TODO: Could probably optimize this allocation away
1639            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1640            let mut instructions_iter = chunk_instructions.into_iter();
1641            merged_instructions.push(instructions_iter.next().unwrap());
1642            for instruction in instructions_iter {
1643                let last = merged_instructions.last_mut().unwrap();
1644                if last.chunk_idx == instruction.chunk_idx
1645                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1646                {
1647                    last.rows_to_take += instruction.rows_to_take;
1648                    last.take_trailer |= instruction.take_trailer;
1649                } else {
1650                    merged_instructions.push(instruction);
1651                }
1652            }
1653            merged_instructions
1654        } else {
1655            chunk_instructions
1656        }
1657    }
1658
1659    fn drain_from_instruction(
1660        &self,
1661        rows_desired: &mut u64,
1662        need_preamble: &mut bool,
1663        skip_in_chunk: &mut u64,
1664    ) -> (ChunkDrainInstructions, bool) {
1665        // If we need the premable then we shouldn't be skipping anything
1666        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1667        let rows_avail = self.rows_to_take - *skip_in_chunk;
1668        let has_preamble = self.preamble != PreambleAction::Absent;
1669        let preamble_action = match (*need_preamble, has_preamble) {
1670            (true, true) => PreambleAction::Take,
1671            (true, false) => panic!("Need preamble but there isn't one"),
1672            (false, true) => PreambleAction::Skip,
1673            (false, false) => PreambleAction::Absent,
1674        };
1675
1676        // How many rows are we actually taking in this take step (including the preamble
1677        // and trailer both as individual rows)
1678        let rows_taking = if *rows_desired >= rows_avail {
1679            // We want all the rows.  If there is a trailer we are grabbing it and will need
1680            // the preamble of the next chunk
1681            // If there is a trailer and we are taking all the rows then we need the preamble
1682            // of the next chunk.
1683            //
1684            // Also, if this chunk is entirely preamble (rows_avail == 0 && !take_trailer) then we
1685            // need the preamble of the next chunk.
1686            *need_preamble = self.take_trailer;
1687            rows_avail
1688        } else {
1689            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1690            // it so we will not need the preamble
1691            *need_preamble = false;
1692            *rows_desired
1693        };
1694        let rows_skipped = *skip_in_chunk;
1695
1696        // Update the state for the next iteration
1697        let consumed_chunk = if *rows_desired >= rows_avail {
1698            *rows_desired -= rows_avail;
1699            *skip_in_chunk = 0;
1700            true
1701        } else {
1702            *skip_in_chunk += *rows_desired;
1703            *rows_desired = 0;
1704            false
1705        };
1706
1707        (
1708            ChunkDrainInstructions {
1709                chunk_instructions: self.clone(),
1710                rows_to_skip: rows_skipped,
1711                rows_to_take: rows_taking,
1712                preamble_action,
1713            },
1714            consumed_chunk,
1715        )
1716    }
1717}
1718
1719enum Words {
1720    U16(ScalarBuffer<u16>),
1721    U32(ScalarBuffer<u32>),
1722}
1723
1724struct WordsIter<'a> {
1725    iter: Box<dyn Iterator<Item = u32> + 'a>,
1726}
1727
1728impl Words {
1729    pub fn len(&self) -> usize {
1730        match self {
1731            Self::U16(b) => b.len(),
1732            Self::U32(b) => b.len(),
1733        }
1734    }
1735
1736    pub fn iter(&self) -> WordsIter<'_> {
1737        match self {
1738            Self::U16(buf) => WordsIter {
1739                iter: Box::new(buf.iter().map(|&x| x as u32)),
1740            },
1741            Self::U32(buf) => WordsIter {
1742                iter: Box::new(buf.iter().copied()),
1743            },
1744        }
1745    }
1746
1747    pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1748        let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1749        assert_eq!(bytes.len() % bytes_per_value, 0);
1750        let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1751        if has_large_chunk {
1752            Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1753        } else {
1754            Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1755        }
1756    }
1757}
1758
1759impl<'a> Iterator for WordsIter<'a> {
1760    type Item = u32;
1761
1762    fn next(&mut self) -> Option<Self::Item> {
1763        self.iter.next()
1764    }
1765}
1766
1767impl StructuralPageScheduler for MiniBlockScheduler {
1768    fn initialize<'a>(
1769        &'a mut self,
1770        io: &Arc<dyn EncodingsIo>,
1771    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1772        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1773        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1774        // need.
1775        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1776        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1777        let mut bufs_needed = 1;
1778        if self.dictionary.is_some() {
1779            bufs_needed += 1;
1780        }
1781        if self.repetition_index_depth > 0 {
1782            bufs_needed += 1;
1783        }
1784        let mut required_ranges = Vec::with_capacity(bufs_needed);
1785        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1786        if let Some(ref dictionary) = self.dictionary {
1787            required_ranges.push(
1788                dictionary.dictionary_buf_position_and_size.0
1789                    ..dictionary.dictionary_buf_position_and_size.0
1790                        + dictionary.dictionary_buf_position_and_size.1,
1791            );
1792        }
1793        if self.repetition_index_depth > 0 {
1794            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1795            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1796        }
1797        let io_req = io.submit_request(required_ranges, 0);
1798
1799        async move {
1800            let mut buffers = io_req.await?.into_iter().fuse();
1801            let meta_bytes = buffers.next().unwrap();
1802            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1803            let rep_index_bytes = buffers.next();
1804
1805            // Parse the metadata and build the chunk meta
1806            let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1807            let mut chunk_meta = Vec::with_capacity(words.len());
1808
1809            let mut rows_counter = 0;
1810            let mut offset_bytes = value_buf_position;
1811            for (word_idx, word) in words.iter().enumerate() {
1812                let log_num_values = word & 0x0F;
1813                let divided_bytes = word >> 4;
1814                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1815                debug_assert!(num_bytes > 0);
1816                let num_values = if word_idx < words.len() - 1 {
1817                    debug_assert!(log_num_values > 0);
1818                    1 << log_num_values
1819                } else {
1820                    debug_assert!(
1821                        log_num_values == 0
1822                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1823                    );
1824                    self.items_in_page - rows_counter
1825                };
1826                rows_counter += num_values;
1827
1828                chunk_meta.push(ChunkMeta {
1829                    num_values,
1830                    chunk_size_bytes: num_bytes as u64,
1831                    offset_bytes,
1832                });
1833                offset_bytes += num_bytes as u64;
1834            }
1835
1836            // Build the repetition index
1837            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1838                assert!(rep_index_data.len() % 8 == 0);
1839                let stride = self.repetition_index_depth as usize + 1;
1840                MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1841            } else {
1842                MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1843            };
1844
1845            let mut page_meta = MiniBlockCacheableState {
1846                chunk_meta,
1847                rep_index,
1848                dictionary: None,
1849            };
1850
1851            // decode dictionary
1852            if let Some(ref mut dictionary) = self.dictionary {
1853                let dictionary_data = dictionary_bytes.unwrap();
1854                page_meta.dictionary =
1855                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1856                        LanceBuffer::from_bytes(
1857                            dictionary_data,
1858                            dictionary.dictionary_data_alignment,
1859                        ),
1860                        dictionary.num_dictionary_items,
1861                    )?));
1862            };
1863            let page_meta = Arc::new(page_meta);
1864            self.page_meta = Some(page_meta.clone());
1865            Ok(page_meta as Arc<dyn CachedPageData>)
1866        }
1867        .boxed()
1868    }
1869
1870    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1871        self.page_meta = Some(
1872            data.clone()
1873                .as_arc_any()
1874                .downcast::<MiniBlockCacheableState>()
1875                .unwrap(),
1876        );
1877    }
1878
1879    fn schedule_ranges(
1880        &self,
1881        ranges: &[Range<u64>],
1882        io: &Arc<dyn EncodingsIo>,
1883    ) -> Result<Vec<PageLoadTask>> {
1884        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1885
1886        let page_meta = self.page_meta.as_ref().unwrap();
1887
1888        let chunk_instructions =
1889            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1890
1891        debug_assert_eq!(
1892            num_rows,
1893            chunk_instructions
1894                .iter()
1895                .map(|ci| ci.rows_to_take)
1896                .sum::<u64>()
1897        );
1898
1899        let chunks_needed = chunk_instructions
1900            .iter()
1901            .map(|ci| ci.chunk_idx)
1902            .unique()
1903            .collect::<Vec<_>>();
1904
1905        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1906        let chunk_ranges = loaded_chunks
1907            .iter()
1908            .map(|c| c.byte_range.clone())
1909            .collect::<Vec<_>>();
1910        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1911
1912        let rep_decompressor = self.rep_decompressor.clone();
1913        let def_decompressor = self.def_decompressor.clone();
1914        let value_decompressor = self.value_decompressor.clone();
1915        let num_buffers = self.num_buffers;
1916        let has_large_chunk = self.has_large_chunk;
1917        let dictionary = page_meta
1918            .dictionary
1919            .as_ref()
1920            .map(|dictionary| dictionary.clone());
1921        let def_meaning = self.def_meaning.clone();
1922
1923        let res = async move {
1924            let loaded_chunk_data = loaded_chunk_data.await?;
1925            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1926                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1927            }
1928
1929            Ok(Box::new(MiniBlockDecoder {
1930                rep_decompressor,
1931                def_decompressor,
1932                value_decompressor,
1933                def_meaning,
1934                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1935                instructions: VecDeque::from(chunk_instructions),
1936                offset_in_current_chunk: 0,
1937                dictionary,
1938                num_rows,
1939                num_buffers,
1940                has_large_chunk,
1941            }) as Box<dyn StructuralPageDecoder>)
1942        }
1943        .boxed();
1944        let page_load_task = PageLoadTask {
1945            decoder_fut: res,
1946            num_rows,
1947        };
1948        Ok(vec![page_load_task])
1949    }
1950}
1951
1952#[derive(Debug, Clone, Copy)]
1953struct FullZipRepIndexDetails {
1954    buf_position: u64,
1955    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1956}
1957
1958#[derive(Debug)]
1959enum PerValueDecompressor {
1960    Fixed(Arc<dyn FixedPerValueDecompressor>),
1961    Variable(Arc<dyn VariablePerValueDecompressor>),
1962}
1963
1964#[derive(Debug)]
1965struct FullZipDecodeDetails {
1966    value_decompressor: PerValueDecompressor,
1967    def_meaning: Arc<[DefinitionInterpretation]>,
1968    ctrl_word_parser: ControlWordParser,
1969    max_rep: u16,
1970    max_visible_def: u16,
1971}
1972
1973/// Describes where FullZip byte ranges should be read from.
1974///
1975/// FullZip decoding always needs a list of byte ranges, but those bytes can come
1976/// from two different places:
1977/// - Remote I/O (normal path): ranges are fetched from the underlying `EncodingsIo`.
1978/// - A prefetched full page (full scan fast path): the entire page has already been
1979///   loaded once and ranges should be sliced from memory.
1980///
1981/// This abstraction keeps scheduling code focused on "which ranges are needed"
1982/// instead of "how bytes are fetched", and it lets full-page scans avoid the
1983/// two-stage rep-index -> data I/O pipeline.
1984#[derive(Debug, Clone)]
1985enum FullZipReadSource {
1986    /// Fetch ranges from the storage backend through the encoding I/O interface.
1987    Remote(Arc<dyn EncodingsIo>),
1988    /// Slice ranges from an already-loaded FullZip page buffer.
1989    PrefetchedPage { base_offset: u64, data: LanceBuffer },
1990}
1991
1992impl FullZipReadSource {
1993    /// Materialize the requested ranges as decode-ready `LanceBuffer`s.
1994    ///
1995    /// The returned buffers preserve the input range order.
1996    fn fetch(
1997        &self,
1998        ranges: &[Range<u64>],
1999        priority: u64,
2000    ) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
2001        match self {
2002            Self::Remote(io) => {
2003                let io = io.clone();
2004                let ranges = ranges.to_vec();
2005                async move {
2006                    let data = io.submit_request(ranges, priority).await?;
2007                    Ok(data
2008                        .into_iter()
2009                        .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2010                        .collect::<VecDeque<_>>())
2011                }
2012                .boxed()
2013            }
2014            Self::PrefetchedPage { base_offset, data } => {
2015                let base_offset = *base_offset;
2016                let data = data.clone();
2017                let page_end = base_offset + data.len() as u64;
2018                std::future::ready(
2019                    ranges
2020                        .iter()
2021                        .map(|range| {
2022                            if range.start > range.end
2023                                || range.start < base_offset
2024                                || range.end > page_end
2025                            {
2026                                return Err(Error::internal(format!(
2027                                    "Requested range {:?} is outside page range {}..{}",
2028                                    range, base_offset, page_end
2029                                )));
2030                            }
2031                            let start = (range.start - base_offset) as usize;
2032                            let len = (range.end - range.start) as usize;
2033                            Ok(data.slice_with_length(start, len))
2034                        })
2035                        .collect::<Result<VecDeque<_>>>(),
2036                )
2037                .boxed()
2038            }
2039        }
2040    }
2041}
2042
2043/// A scheduler for full-zip encoded data
2044///
2045/// When the data type has a fixed-width then we simply need to map from
2046/// row ranges to byte ranges using the fixed-width of the data type.
2047///
2048/// When the data type is variable-width or has any repetition then a
2049/// repetition index is required.
2050#[derive(Debug)]
2051pub struct FullZipScheduler {
2052    data_buf_position: u64,
2053    data_buf_size: u64,
2054    rep_index: Option<FullZipRepIndexDetails>,
2055    priority: u64,
2056    rows_in_page: u64,
2057    bits_per_offset: u8,
2058    details: Arc<FullZipDecodeDetails>,
2059    /// Cached state containing the decoded repetition index
2060    cached_state: Option<Arc<FullZipCacheableState>>,
2061    /// Whether repetition index metadata should be cached during initialize.
2062    enable_cache: bool,
2063}
2064
2065impl FullZipScheduler {
2066    fn try_new(
2067        buffer_offsets_and_sizes: &[(u64, u64)],
2068        priority: u64,
2069        rows_in_page: u64,
2070        layout: &pb21::FullZipLayout,
2071        decompressors: &dyn DecompressionStrategy,
2072    ) -> Result<Self> {
2073        let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
2074        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
2075            let num_reps = rows_in_page + 1;
2076            let bytes_per_rep = len / num_reps;
2077            debug_assert_eq!(len % num_reps, 0);
2078            debug_assert!(
2079                bytes_per_rep == 1
2080                    || bytes_per_rep == 2
2081                    || bytes_per_rep == 4
2082                    || bytes_per_rep == 8
2083            );
2084            FullZipRepIndexDetails {
2085                buf_position: *pos,
2086                bytes_per_value: bytes_per_rep,
2087            }
2088        });
2089
2090        let value_decompressor = match layout.details {
2091            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
2092                let decompressor = decompressors.create_fixed_per_value_decompressor(
2093                    layout.value_compression.as_ref().unwrap(),
2094                )?;
2095                PerValueDecompressor::Fixed(decompressor.into())
2096            }
2097            Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
2098                let decompressor = decompressors.create_variable_per_value_decompressor(
2099                    layout.value_compression.as_ref().unwrap(),
2100                )?;
2101                PerValueDecompressor::Variable(decompressor.into())
2102            }
2103            None => {
2104                panic!("Full-zip layout must have a `details` field");
2105            }
2106        };
2107        let ctrl_word_parser = ControlWordParser::new(
2108            layout.bits_rep.try_into().unwrap(),
2109            layout.bits_def.try_into().unwrap(),
2110        );
2111        let def_meaning = layout
2112            .layers
2113            .iter()
2114            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2115            .collect::<Vec<_>>();
2116
2117        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
2118        let max_visible_def = def_meaning
2119            .iter()
2120            .filter(|d| !d.is_list())
2121            .map(|d| d.num_def_levels())
2122            .sum();
2123
2124        let bits_per_offset = match layout.details {
2125            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
2126            Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
2127                bits_per_offset as u8
2128            }
2129            None => panic!("Full-zip layout must have a `details` field"),
2130        };
2131
2132        let details = Arc::new(FullZipDecodeDetails {
2133            value_decompressor,
2134            def_meaning: def_meaning.into(),
2135            ctrl_word_parser,
2136            max_rep,
2137            max_visible_def,
2138        });
2139        Ok(Self {
2140            data_buf_position,
2141            data_buf_size,
2142            rep_index,
2143            details,
2144            priority,
2145            rows_in_page,
2146            bits_per_offset,
2147            cached_state: None,
2148            enable_cache: false,
2149        })
2150    }
2151
2152    fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
2153        if ranges.is_empty() {
2154            return false;
2155        }
2156        let mut expected_start = 0;
2157        for range in ranges {
2158            if range.start != expected_start || range.end > rows_in_page || range.end < range.start
2159            {
2160                return false;
2161            }
2162            expected_start = range.end;
2163        }
2164        expected_start == rows_in_page
2165    }
2166
2167    fn create_page_load_task(
2168        read_source: FullZipReadSource,
2169        byte_ranges: Vec<Range<u64>>,
2170        priority: u64,
2171        num_rows: u64,
2172        details: Arc<FullZipDecodeDetails>,
2173        bits_per_offset: u8,
2174    ) -> PageLoadTask {
2175        let load_task = async move {
2176            let data = read_source.fetch(&byte_ranges, priority).await?;
2177            Self::create_decoder(details, data, num_rows, bits_per_offset)
2178        }
2179        .boxed();
2180        PageLoadTask {
2181            decoder_fut: load_task,
2182            num_rows,
2183        }
2184    }
2185
2186    /// Creates a decoder from the loaded data
2187    fn create_decoder(
2188        details: Arc<FullZipDecodeDetails>,
2189        data: VecDeque<LanceBuffer>,
2190        num_rows: u64,
2191        bits_per_offset: u8,
2192    ) -> Result<Box<dyn StructuralPageDecoder>> {
2193        match &details.value_decompressor {
2194            PerValueDecompressor::Fixed(decompressor) => {
2195                let bits_per_value = decompressor.bits_per_value();
2196                if bits_per_value % 8 != 0 {
2197                    return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
2198                }
2199                let bytes_per_value = bits_per_value / 8;
2200                let total_bytes_per_value =
2201                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2202                if total_bytes_per_value == 0 {
2203                    return Err(lance_core::Error::internal(
2204                        "Invalid encoding: per-row byte width must be greater than 0",
2205                    ));
2206                }
2207                Ok(Box::new(FixedFullZipDecoder {
2208                    details,
2209                    data,
2210                    num_rows,
2211                    offset_in_current: 0,
2212                    bytes_per_value: bytes_per_value as usize,
2213                    total_bytes_per_value,
2214                }) as Box<dyn StructuralPageDecoder>)
2215            }
2216            PerValueDecompressor::Variable(_decompressor) => {
2217                Ok(Box::new(VariableFullZipDecoder::new(
2218                    details,
2219                    data,
2220                    num_rows,
2221                    bits_per_offset,
2222                    bits_per_offset,
2223                )))
2224            }
2225        }
2226    }
2227
2228    /// Extracts byte ranges from a repetition index buffer
2229    /// The buffer contains pairs of (start, end) values for each range
2230    fn extract_byte_ranges_from_pairs(
2231        buffer: LanceBuffer,
2232        bytes_per_value: u64,
2233        data_buf_position: u64,
2234    ) -> Vec<Range<u64>> {
2235        ByteUnpacker::new(buffer, bytes_per_value as usize)
2236            .chunks(2)
2237            .into_iter()
2238            .map(|mut c| {
2239                let start = c.next().unwrap() + data_buf_position;
2240                let end = c.next().unwrap() + data_buf_position;
2241                start..end
2242            })
2243            .collect::<Vec<_>>()
2244    }
2245
2246    /// Extracts byte ranges from a cached repetition index buffer
2247    /// The buffer contains all values and we need to extract specific ranges
2248    fn extract_byte_ranges_from_cached(
2249        buffer: &LanceBuffer,
2250        ranges: &[Range<u64>],
2251        bytes_per_value: u64,
2252        data_buf_position: u64,
2253    ) -> Vec<Range<u64>> {
2254        ranges
2255            .iter()
2256            .map(|r| {
2257                let start_offset = (r.start * bytes_per_value) as usize;
2258                let end_offset = (r.end * bytes_per_value) as usize;
2259
2260                let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2261                let start_val =
2262                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2263                        .next()
2264                        .unwrap();
2265
2266                let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2267                let end_val =
2268                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2269                        .next()
2270                        .unwrap();
2271
2272                (data_buf_position + start_val)..(data_buf_position + end_val)
2273            })
2274            .collect()
2275    }
2276
2277    /// Computes the ranges in the repetition index that need to be loaded
2278    fn compute_rep_index_ranges(
2279        ranges: &[Range<u64>],
2280        rep_index: &FullZipRepIndexDetails,
2281    ) -> Vec<Range<u64>> {
2282        ranges
2283            .iter()
2284            .flat_map(|r| {
2285                let first_val_start =
2286                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2287                let first_val_end = first_val_start + rep_index.bytes_per_value;
2288                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2289                let last_val_end = last_val_start + rep_index.bytes_per_value;
2290                [first_val_start..first_val_end, last_val_start..last_val_end]
2291            })
2292            .collect()
2293    }
2294
2295    /// Schedules ranges in the presence of a repetition index
2296    fn schedule_ranges_rep(
2297        &self,
2298        ranges: &[Range<u64>],
2299        io: &Arc<dyn EncodingsIo>,
2300        rep_index: FullZipRepIndexDetails,
2301    ) -> Result<Vec<PageLoadTask>> {
2302        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2303        let data_buf_position = self.data_buf_position;
2304        let priority = self.priority;
2305        let details = self.details.clone();
2306        let bits_per_offset = self.bits_per_offset;
2307
2308        if Self::covers_entire_page(ranges, self.rows_in_page) {
2309            let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
2310            let page_data = io.submit_single(full_range.clone(), priority);
2311            let load_task = async move {
2312                let page_data = page_data.await?;
2313                let source = FullZipReadSource::PrefetchedPage {
2314                    base_offset: full_range.start,
2315                    data: LanceBuffer::from_bytes(page_data, 1),
2316                };
2317                let read_ranges = vec![full_range];
2318                let data = source.fetch(&read_ranges, priority).await?;
2319                Self::create_decoder(details, data, num_rows, bits_per_offset)
2320            }
2321            .boxed();
2322            let page_load_task = PageLoadTask {
2323                decoder_fut: load_task,
2324                num_rows,
2325            };
2326            return Ok(vec![page_load_task]);
2327        }
2328
2329        if let Some(cached_state) = &self.cached_state {
2330            let byte_ranges = Self::extract_byte_ranges_from_cached(
2331                &cached_state.rep_index_buffer,
2332                ranges,
2333                rep_index.bytes_per_value,
2334                data_buf_position,
2335            );
2336            let page_load_task = Self::create_page_load_task(
2337                FullZipReadSource::Remote(io.clone()),
2338                byte_ranges,
2339                priority,
2340                num_rows,
2341                details,
2342                bits_per_offset,
2343            );
2344            return Ok(vec![page_load_task]);
2345        }
2346
2347        let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
2348        let rep_data = io.submit_request(rep_ranges, priority);
2349        let io_clone = io.clone();
2350        let load_task = async move {
2351            let rep_data = rep_data.await?;
2352            let rep_buffer = LanceBuffer::concat(
2353                &rep_data
2354                    .into_iter()
2355                    .map(|d| LanceBuffer::from_bytes(d, 1))
2356                    .collect::<Vec<_>>(),
2357            );
2358            let byte_ranges = Self::extract_byte_ranges_from_pairs(
2359                rep_buffer,
2360                rep_index.bytes_per_value,
2361                data_buf_position,
2362            );
2363            let source = FullZipReadSource::Remote(io_clone);
2364            let data = source.fetch(&byte_ranges, priority).await?;
2365            Self::create_decoder(details, data, num_rows, bits_per_offset)
2366        }
2367        .boxed();
2368        let page_load_task = PageLoadTask {
2369            decoder_fut: load_task,
2370            num_rows,
2371        };
2372        Ok(vec![page_load_task])
2373    }
2374
2375    // In the simple case there is no repetition and we just have large fixed-width
2376    // rows of data.  We can just map row ranges to byte ranges directly using the
2377    // fixed-width of the data type.
2378    fn schedule_ranges_simple(
2379        &self,
2380        ranges: &[Range<u64>],
2381        io: &Arc<dyn EncodingsIo>,
2382    ) -> Result<Vec<PageLoadTask>> {
2383        // Convert row ranges to item ranges (i.e. multiply by items per row)
2384        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2385
2386        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2387            unreachable!()
2388        };
2389
2390        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2391        let bits_per_value = decompressor.bits_per_value();
2392        assert_eq!(bits_per_value % 8, 0);
2393        let bytes_per_value = bits_per_value / 8;
2394        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2395        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2396        let byte_ranges = ranges
2397            .iter()
2398            .map(|r| {
2399                debug_assert!(r.end <= self.rows_in_page);
2400                let start = self.data_buf_position + r.start * total_bytes_per_value;
2401                let end = self.data_buf_position + r.end * total_bytes_per_value;
2402                start..end
2403            })
2404            .collect::<Vec<_>>();
2405
2406        let page_load_task = Self::create_page_load_task(
2407            FullZipReadSource::Remote(io.clone()),
2408            byte_ranges,
2409            self.priority,
2410            num_rows,
2411            self.details.clone(),
2412            self.bits_per_offset,
2413        );
2414        Ok(vec![page_load_task])
2415    }
2416}
2417
2418/// Cacheable state for FullZip encoding, storing the decoded repetition index
2419#[derive(Debug)]
2420struct FullZipCacheableState {
2421    /// The raw repetition index buffer for future decoding
2422    rep_index_buffer: LanceBuffer,
2423}
2424
2425impl DeepSizeOf for FullZipCacheableState {
2426    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2427        self.rep_index_buffer.len()
2428    }
2429}
2430
2431impl CachedPageData for FullZipCacheableState {
2432    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2433        self
2434    }
2435}
2436
2437impl StructuralPageScheduler for FullZipScheduler {
2438    fn initialize<'a>(
2439        &'a mut self,
2440        io: &Arc<dyn EncodingsIo>,
2441    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2442        if self.enable_cache
2443            && let Some(rep_index) = self.rep_index
2444        {
2445            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2446            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2447            let io_clone = io.clone();
2448            return async move {
2449                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2450                let state = Arc::new(FullZipCacheableState {
2451                    rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
2452                });
2453                self.cached_state = Some(state.clone());
2454                Ok(state as Arc<dyn CachedPageData>)
2455            }
2456            .boxed();
2457        }
2458        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2459    }
2460
2461    /// Loads previously cached repetition index data from the cache system.
2462    /// This method is called when a scheduler instance needs to use cached data
2463    /// that was initialized by another instance or in a previous operation.
2464    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2465        // Try to downcast to our specific cache type
2466        if let Ok(cached_state) = cache
2467            .clone()
2468            .as_arc_any()
2469            .downcast::<FullZipCacheableState>()
2470        {
2471            // Store the cached state for use in schedule_ranges
2472            self.cached_state = Some(cached_state);
2473        }
2474    }
2475
2476    fn schedule_ranges(
2477        &self,
2478        ranges: &[Range<u64>],
2479        io: &Arc<dyn EncodingsIo>,
2480    ) -> Result<Vec<PageLoadTask>> {
2481        if let Some(rep_index) = self.rep_index {
2482            self.schedule_ranges_rep(ranges, io, rep_index)
2483        } else {
2484            self.schedule_ranges_simple(ranges, io)
2485        }
2486    }
2487}
2488
2489/// A decoder for full-zip encoded data when the data has a fixed-width
2490///
2491/// Here we need to unzip the control words from the values themselves and
2492/// then decompress the requested values.
2493///
2494/// We use a PerValueDecompressor because we will only be decompressing the
2495/// requested data.  This decoder / scheduler does not do any read amplification.
2496#[derive(Debug)]
2497struct FixedFullZipDecoder {
2498    details: Arc<FullZipDecodeDetails>,
2499    data: VecDeque<LanceBuffer>,
2500    offset_in_current: usize,
2501    bytes_per_value: usize,
2502    total_bytes_per_value: usize,
2503    num_rows: u64,
2504}
2505
2506impl FixedFullZipDecoder {
2507    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2508        debug_assert!(num_rows > 0);
2509        let cur_buf = self.data.front_mut().unwrap();
2510        let start = self.offset_in_current;
2511        if self.details.ctrl_word_parser.has_rep() {
2512            // This is a slightly slower path.  In order to figure out where to split we need to
2513            // examine the rep index so we can convert num_lists to num_rows
2514            let mut rows_started = 0;
2515            // We always need at least one value.  Now loop through until we have passed num_rows
2516            // values
2517            let mut num_items = 0;
2518            while self.offset_in_current < cur_buf.len() {
2519                let control = self.details.ctrl_word_parser.parse_desc(
2520                    &cur_buf[self.offset_in_current..],
2521                    self.details.max_rep,
2522                    self.details.max_visible_def,
2523                );
2524                if control.is_new_row {
2525                    if rows_started == num_rows {
2526                        break;
2527                    }
2528                    rows_started += 1;
2529                }
2530                num_items += 1;
2531                if control.is_visible {
2532                    self.offset_in_current += self.total_bytes_per_value;
2533                } else {
2534                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2535                }
2536            }
2537
2538            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2539            if self.offset_in_current == cur_buf.len() {
2540                self.data.pop_front();
2541                self.offset_in_current = 0;
2542            }
2543
2544            FullZipDecodeTaskItem {
2545                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2546                    data: task_slice,
2547                    bits_per_value: self.bytes_per_value as u64 * 8,
2548                    num_values: num_items,
2549                    block_info: BlockInfo::new(),
2550                }),
2551                rows_in_buf: rows_started,
2552            }
2553        } else {
2554            // If there's no repetition we can calculate the slicing point by just multiplying
2555            // the number of rows by the total bytes per value
2556            let cur_buf = self.data.front_mut().unwrap();
2557            let bytes_avail = cur_buf.len() - self.offset_in_current;
2558            let offset_in_cur = self.offset_in_current;
2559
2560            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2561            let mut rows_taken = num_rows;
2562            let task_slice = if bytes_needed >= bytes_avail {
2563                self.offset_in_current = 0;
2564                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2565                self.data
2566                    .pop_front()
2567                    .unwrap()
2568                    .slice_with_length(offset_in_cur, bytes_avail)
2569            } else {
2570                self.offset_in_current += bytes_needed;
2571                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2572            };
2573            FullZipDecodeTaskItem {
2574                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2575                    data: task_slice,
2576                    bits_per_value: self.bytes_per_value as u64 * 8,
2577                    num_values: rows_taken,
2578                    block_info: BlockInfo::new(),
2579                }),
2580                rows_in_buf: rows_taken,
2581            }
2582        }
2583    }
2584}
2585
2586impl StructuralPageDecoder for FixedFullZipDecoder {
2587    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2588        let mut task_data = Vec::with_capacity(self.data.len());
2589        let mut remaining = num_rows;
2590        while remaining > 0 {
2591            let task_item = self.slice_next_task(remaining);
2592            remaining -= task_item.rows_in_buf;
2593            task_data.push(task_item);
2594        }
2595        Ok(Box::new(FixedFullZipDecodeTask {
2596            details: self.details.clone(),
2597            data: task_data,
2598            bytes_per_value: self.bytes_per_value,
2599            num_rows: num_rows as usize,
2600        }))
2601    }
2602
2603    fn num_rows(&self) -> u64 {
2604        self.num_rows
2605    }
2606}
2607
2608/// A decoder for full-zip encoded data when the data has a variable-width
2609///
2610/// Here we need to unzip the control words AND lengths from the values and
2611/// then decompress the requested values.
2612#[derive(Debug)]
2613struct VariableFullZipDecoder {
2614    details: Arc<FullZipDecodeDetails>,
2615    decompressor: Arc<dyn VariablePerValueDecompressor>,
2616    data: LanceBuffer,
2617    offsets: LanceBuffer,
2618    rep: ScalarBuffer<u16>,
2619    def: ScalarBuffer<u16>,
2620    repdef_starts: Vec<usize>,
2621    data_starts: Vec<usize>,
2622    offset_starts: Vec<usize>,
2623    visible_item_counts: Vec<u64>,
2624    bits_per_offset: u8,
2625    current_idx: usize,
2626    num_rows: u64,
2627}
2628
2629impl VariableFullZipDecoder {
2630    fn new(
2631        details: Arc<FullZipDecodeDetails>,
2632        data: VecDeque<LanceBuffer>,
2633        num_rows: u64,
2634        in_bits_per_length: u8,
2635        out_bits_per_offset: u8,
2636    ) -> Self {
2637        let decompressor = match details.value_decompressor {
2638            PerValueDecompressor::Variable(ref d) => d.clone(),
2639            _ => unreachable!(),
2640        };
2641
2642        assert_eq!(in_bits_per_length % 8, 0);
2643        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2644
2645        let mut decoder = Self {
2646            details,
2647            decompressor,
2648            data: LanceBuffer::empty(),
2649            offsets: LanceBuffer::empty(),
2650            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2651            def: LanceBuffer::empty().borrow_to_typed_slice(),
2652            bits_per_offset: out_bits_per_offset,
2653            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2654            data_starts: Vec::with_capacity(num_rows as usize + 1),
2655            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2656            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2657            current_idx: 0,
2658            num_rows,
2659        };
2660
2661        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2662        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2663        // unpacked to know where the values start and end.
2664        //
2665        // We don't want to unzip on the decode thread because that is a single-threaded path
2666        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2667        //
2668        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2669        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2670        // we load all the data into memory and then throw it away only to load it all into memory again during
2671        // the decode.
2672        //
2673        // There are some alternatives to investigate:
2674        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2675        //     range in between.  This will give us the break points that we need for slicing and won't increase
2676        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2677        //     even when doing a full scan.
2678        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2679        //     has to do more work but the work is all fused.
2680        //   - We could just try doing this work on the decode thread and see if it is a problem.
2681        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2682
2683        decoder
2684    }
2685
2686    fn slice_batch_data_and_rebase_offsets_typed<T>(
2687        data: &LanceBuffer,
2688        offsets: &LanceBuffer,
2689    ) -> Result<(LanceBuffer, LanceBuffer)>
2690    where
2691        T: arrow_buffer::ArrowNativeType
2692            + Copy
2693            + PartialOrd
2694            + std::ops::Sub<Output = T>
2695            + std::fmt::Display
2696            + TryInto<usize>,
2697    {
2698        let offsets_slice = offsets.borrow_to_typed_slice::<T>();
2699        let offsets_slice = offsets_slice.as_ref();
2700        if offsets_slice.is_empty() {
2701            return Err(Error::internal(
2702                "Variable offsets cannot be empty".to_string(),
2703            ));
2704        }
2705
2706        let base = offsets_slice[0];
2707        let end = *offsets_slice.last().unwrap();
2708        if end < base {
2709            return Err(Error::internal(format!(
2710                "Invalid variable offsets: end ({end}) is less than base ({base})"
2711            )));
2712        }
2713
2714        let data_start = base.try_into().map_err(|_| {
2715            Error::internal(format!("Variable offset ({base}) does not fit into usize"))
2716        })?;
2717        let data_end = end.try_into().map_err(|_| {
2718            Error::internal(format!("Variable offset ({end}) does not fit into usize"))
2719        })?;
2720        if data_end > data.len() {
2721            return Err(Error::internal(format!(
2722                "Invalid variable offsets: end ({data_end}) exceeds data len ({})",
2723                data.len()
2724            )));
2725        }
2726
2727        let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
2728        for &offset in offsets_slice {
2729            if offset < base {
2730                return Err(Error::internal(format!(
2731                    "Invalid variable offsets: offset ({offset}) is less than base ({base})"
2732                )));
2733            }
2734            rebased_offsets.push(offset - base);
2735        }
2736
2737        let sliced_data = data.slice_with_length(data_start, data_end - data_start);
2738        // Copy into a compact buffer so each output batch owns only what it references.
2739        let sliced_data = LanceBuffer::copy_slice(&sliced_data);
2740        let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
2741        Ok((sliced_data, rebased_offsets))
2742    }
2743
2744    fn slice_batch_data_and_rebase_offsets(
2745        data: &LanceBuffer,
2746        offsets: &LanceBuffer,
2747        bits_per_offset: u8,
2748    ) -> Result<(LanceBuffer, LanceBuffer)> {
2749        match bits_per_offset {
2750            32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
2751            64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
2752            _ => Err(Error::internal(format!(
2753                "Unsupported bits_per_offset={bits_per_offset}"
2754            ))),
2755        }
2756    }
2757
2758    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2759        match bits_per_offset {
2760            8 => *data.get_unchecked(0) as u64,
2761            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2762            32 => u32::from_le_bytes([
2763                *data.get_unchecked(0),
2764                *data.get_unchecked(1),
2765                *data.get_unchecked(2),
2766                *data.get_unchecked(3),
2767            ]) as u64,
2768            64 => u64::from_le_bytes([
2769                *data.get_unchecked(0),
2770                *data.get_unchecked(1),
2771                *data.get_unchecked(2),
2772                *data.get_unchecked(3),
2773                *data.get_unchecked(4),
2774                *data.get_unchecked(5),
2775                *data.get_unchecked(6),
2776                *data.get_unchecked(7),
2777            ]),
2778            _ => unreachable!(),
2779        }
2780    }
2781
2782    fn unzip(
2783        &mut self,
2784        data: VecDeque<LanceBuffer>,
2785        in_bits_per_length: u8,
2786        out_bits_per_offset: u8,
2787        num_rows: u64,
2788    ) {
2789        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2790        let mut rep = Vec::with_capacity(num_rows as usize);
2791        let mut def = Vec::with_capacity(num_rows as usize);
2792        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2793
2794        // This undercounts if there are lists
2795        // It can also overcount if there are invisible items
2796        let bytes_per_offset = out_bits_per_offset as usize / 8;
2797        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2798        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2799
2800        let bytes_per_length = in_bits_per_length as usize / 8;
2801        let bytes_lengths = bytes_per_length * num_rows as usize;
2802
2803        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2804        // This overcounts since bytes_lengths and bytes_cw are undercounts
2805        // It can also undercount if there are invisible items (hence the saturating_sub)
2806        let mut unzipped_data =
2807            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2808
2809        let mut current_offset = 0_u64;
2810        let mut visible_item_count = 0_u64;
2811        for databuf in data.into_iter() {
2812            let mut databuf = databuf.as_ref();
2813            while !databuf.is_empty() {
2814                let data_start = unzipped_data.len();
2815                let offset_start = offsets_data.len();
2816                // We might have only-rep or only-def, neither, or both.  They move at the same
2817                // speed though so we only need one index into it
2818                let repdef_start = rep.len().max(def.len());
2819                // TODO: Kind of inefficient we parse the control word twice here
2820                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2821                    databuf,
2822                    self.details.max_rep,
2823                    self.details.max_visible_def,
2824                );
2825                self.details
2826                    .ctrl_word_parser
2827                    .parse(databuf, &mut rep, &mut def);
2828                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2829
2830                if ctrl_desc.is_new_row {
2831                    self.repdef_starts.push(repdef_start);
2832                    self.data_starts.push(data_start);
2833                    self.offset_starts.push(offset_start);
2834                    self.visible_item_counts.push(visible_item_count);
2835                }
2836                if ctrl_desc.is_visible {
2837                    visible_item_count += 1;
2838                    if ctrl_desc.is_valid_item {
2839                        // Safety: Data should have at least bytes_per_length bytes remaining
2840                        debug_assert!(databuf.len() >= bytes_per_length);
2841                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2842                        match out_bits_per_offset {
2843                            32 => offsets_data
2844                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2845                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2846                            _ => unreachable!(),
2847                        };
2848                        databuf = &databuf[bytes_per_offset..];
2849                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2850                        databuf = &databuf[length as usize..];
2851                        current_offset += length;
2852                    } else {
2853                        // Null items still get an offset
2854                        match out_bits_per_offset {
2855                            32 => offsets_data
2856                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2857                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2858                            _ => unreachable!(),
2859                        }
2860                    }
2861                }
2862            }
2863        }
2864        self.repdef_starts.push(rep.len().max(def.len()));
2865        self.data_starts.push(unzipped_data.len());
2866        self.offset_starts.push(offsets_data.len());
2867        self.visible_item_counts.push(visible_item_count);
2868        match out_bits_per_offset {
2869            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2870            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2871            _ => unreachable!(),
2872        };
2873        self.rep = ScalarBuffer::from(rep);
2874        self.def = ScalarBuffer::from(def);
2875        self.data = LanceBuffer::from(unzipped_data);
2876        self.offsets = LanceBuffer::from(offsets_data);
2877    }
2878}
2879
2880impl StructuralPageDecoder for VariableFullZipDecoder {
2881    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2882        let start = self.current_idx;
2883        let end = start + num_rows as usize;
2884
2885        let offset_start = self.offset_starts[start];
2886        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2887        let offsets = self
2888            .offsets
2889            .slice_with_length(offset_start, offset_end - offset_start);
2890        // Keep each batch's variable data buffer bounded to the selected rows.
2891        let (data, offsets) =
2892            Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
2893
2894        let repdef_start = self.repdef_starts[start];
2895        let repdef_end = self.repdef_starts[end];
2896        let rep = if self.rep.is_empty() {
2897            self.rep.clone()
2898        } else {
2899            self.rep.slice(repdef_start, repdef_end - repdef_start)
2900        };
2901        let def = if self.def.is_empty() {
2902            self.def.clone()
2903        } else {
2904            self.def.slice(repdef_start, repdef_end - repdef_start)
2905        };
2906
2907        let visible_item_counts_start = self.visible_item_counts[start];
2908        let visible_item_counts_end = self.visible_item_counts[end];
2909        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2910
2911        self.current_idx += num_rows as usize;
2912
2913        Ok(Box::new(VariableFullZipDecodeTask {
2914            details: self.details.clone(),
2915            decompressor: self.decompressor.clone(),
2916            data,
2917            offsets,
2918            bits_per_offset: self.bits_per_offset,
2919            num_visible_items,
2920            rep,
2921            def,
2922        }))
2923    }
2924
2925    fn num_rows(&self) -> u64 {
2926        self.num_rows
2927    }
2928}
2929
2930#[derive(Debug)]
2931struct VariableFullZipDecodeTask {
2932    details: Arc<FullZipDecodeDetails>,
2933    decompressor: Arc<dyn VariablePerValueDecompressor>,
2934    data: LanceBuffer,
2935    offsets: LanceBuffer,
2936    bits_per_offset: u8,
2937    num_visible_items: u64,
2938    rep: ScalarBuffer<u16>,
2939    def: ScalarBuffer<u16>,
2940}
2941
2942impl DecodePageTask for VariableFullZipDecodeTask {
2943    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2944        let block = VariableWidthBlock {
2945            data: self.data,
2946            offsets: self.offsets,
2947            bits_per_offset: self.bits_per_offset,
2948            num_values: self.num_visible_items,
2949            block_info: BlockInfo::new(),
2950        };
2951        let decomopressed = self.decompressor.decompress(block)?;
2952        let rep = if self.rep.is_empty() {
2953            None
2954        } else {
2955            Some(self.rep.to_vec())
2956        };
2957        let def = if self.def.is_empty() {
2958            None
2959        } else {
2960            Some(self.def.to_vec())
2961        };
2962        let unraveler = RepDefUnraveler::new(
2963            rep,
2964            def,
2965            self.details.def_meaning.clone(),
2966            self.num_visible_items,
2967        );
2968        Ok(DecodedPage {
2969            data: decomopressed,
2970            repdef: unraveler,
2971        })
2972    }
2973}
2974
2975#[derive(Debug)]
2976struct FullZipDecodeTaskItem {
2977    data: PerValueDataBlock,
2978    rows_in_buf: u64,
2979}
2980
2981/// A task to unzip and decompress full-zip encoded data when that data
2982/// has a fixed-width.
2983#[derive(Debug)]
2984struct FixedFullZipDecodeTask {
2985    details: Arc<FullZipDecodeDetails>,
2986    data: Vec<FullZipDecodeTaskItem>,
2987    num_rows: usize,
2988    bytes_per_value: usize,
2989}
2990
2991impl DecodePageTask for FixedFullZipDecodeTask {
2992    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2993        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2994        let estimated_size_bytes = self
2995            .data
2996            .iter()
2997            .map(|task_item| task_item.data.data_size() as usize)
2998            .sum::<usize>()
2999            * 2;
3000        let mut data_builder =
3001            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
3002
3003        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
3004            // Fast path, no need to unzip because there is no rep/def
3005            //
3006            // We decompress each buffer and add it to our output buffer
3007            for task_item in self.data.into_iter() {
3008                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3009                    unreachable!()
3010                };
3011                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3012                else {
3013                    unreachable!()
3014                };
3015                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
3016                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
3017                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
3018            }
3019
3020            let unraveler = RepDefUnraveler::new(
3021                None,
3022                None,
3023                self.details.def_meaning.clone(),
3024                self.num_rows as u64,
3025            );
3026
3027            Ok(DecodedPage {
3028                data: data_builder.finish(),
3029                repdef: unraveler,
3030            })
3031        } else {
3032            // Slow path, unzipping needed
3033            let mut rep = Vec::with_capacity(self.num_rows);
3034            let mut def = Vec::with_capacity(self.num_rows);
3035
3036            for task_item in self.data.into_iter() {
3037                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3038                    unreachable!()
3039                };
3040                let mut buf_slice = fixed_data.data.as_ref();
3041                let num_values = fixed_data.num_values as usize;
3042                // We will be unzipping repdef in to `rep` and `def` and the
3043                // values into `values` (which contains the compressed values)
3044                let mut values = Vec::with_capacity(
3045                    fixed_data.data.len()
3046                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
3047                );
3048                let mut visible_items = 0;
3049                for _ in 0..num_values {
3050                    // Extract rep/def
3051                    self.details
3052                        .ctrl_word_parser
3053                        .parse(buf_slice, &mut rep, &mut def);
3054                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
3055
3056                    let is_visible = def
3057                        .last()
3058                        .map(|d| *d <= self.details.max_visible_def)
3059                        .unwrap_or(true);
3060                    if is_visible {
3061                        // Extract value
3062                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
3063                        buf_slice = &buf_slice[self.bytes_per_value..];
3064                        visible_items += 1;
3065                    }
3066                }
3067
3068                // Finally, we decompress the values and add them to our output buffer
3069                let values_buf = LanceBuffer::from(values);
3070                let fixed_data = FixedWidthDataBlock {
3071                    bits_per_value: self.bytes_per_value as u64 * 8,
3072                    block_info: BlockInfo::new(),
3073                    data: values_buf,
3074                    num_values: visible_items,
3075                };
3076                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3077                else {
3078                    unreachable!()
3079                };
3080                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
3081                data_builder.append(&decompressed, 0..visible_items);
3082            }
3083
3084            let repetition = if rep.is_empty() { None } else { Some(rep) };
3085            let definition = if def.is_empty() { None } else { Some(def) };
3086
3087            let unraveler = RepDefUnraveler::new(
3088                repetition,
3089                definition,
3090                self.details.def_meaning.clone(),
3091                self.num_rows as u64,
3092            );
3093            let data = data_builder.finish();
3094
3095            Ok(DecodedPage {
3096                data,
3097                repdef: unraveler,
3098            })
3099        }
3100    }
3101}
3102
3103#[derive(Debug)]
3104struct StructuralPrimitiveFieldSchedulingJob<'a> {
3105    scheduler: &'a StructuralPrimitiveFieldScheduler,
3106    ranges: Vec<Range<u64>>,
3107    page_idx: usize,
3108    range_idx: usize,
3109    global_row_offset: u64,
3110}
3111
3112impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
3113    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
3114        Self {
3115            scheduler,
3116            ranges,
3117            page_idx: 0,
3118            range_idx: 0,
3119            global_row_offset: 0,
3120        }
3121    }
3122}
3123
3124impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
3125    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
3126        if self.range_idx >= self.ranges.len() {
3127            return Ok(Vec::new());
3128        }
3129        // Get our current range
3130        let mut range = self.ranges[self.range_idx].clone();
3131        let priority = range.start;
3132
3133        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
3134        trace!(
3135            "Current range is {:?} and current page has {} rows",
3136            range, cur_page.num_rows
3137        );
3138        // Skip entire pages until we have some overlap with our next range
3139        while cur_page.num_rows + self.global_row_offset <= range.start {
3140            self.global_row_offset += cur_page.num_rows;
3141            self.page_idx += 1;
3142            trace!("Skipping entire page of {} rows", cur_page.num_rows);
3143            cur_page = &self.scheduler.page_schedulers[self.page_idx];
3144        }
3145
3146        // Now the cur_page has overlap with range.  Continue looping through ranges
3147        // until we find a range that exceeds the current page
3148
3149        let mut ranges_in_page = Vec::new();
3150        while cur_page.num_rows + self.global_row_offset > range.start {
3151            range.start = range.start.max(self.global_row_offset);
3152            let start_in_page = range.start - self.global_row_offset;
3153            let end_in_page = start_in_page + (range.end - range.start);
3154            let end_in_page = end_in_page.min(cur_page.num_rows);
3155            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
3156
3157            ranges_in_page.push(start_in_page..end_in_page);
3158            if last_in_range {
3159                self.range_idx += 1;
3160                if self.range_idx == self.ranges.len() {
3161                    break;
3162                }
3163                range = self.ranges[self.range_idx].clone();
3164            } else {
3165                break;
3166            }
3167        }
3168
3169        trace!(
3170            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
3171            ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
3172            ranges_in_page.len(),
3173            cur_page.num_rows,
3174            priority,
3175            self.scheduler.column_index,
3176            cur_page.page_index,
3177        );
3178
3179        self.global_row_offset += cur_page.num_rows;
3180        self.page_idx += 1;
3181
3182        let page_decoders = cur_page
3183            .scheduler
3184            .schedule_ranges(&ranges_in_page, context.io())?;
3185
3186        let cur_path = context.current_path();
3187        page_decoders
3188            .into_iter()
3189            .map(|page_load_task| {
3190                let cur_path = cur_path.clone();
3191                let page_decoder = page_load_task.decoder_fut;
3192                let unloaded_page = async move {
3193                    let page_decoder = page_decoder.await?;
3194                    Ok(LoadedPageShard {
3195                        decoder: page_decoder,
3196                        path: cur_path,
3197                    })
3198                }
3199                .boxed();
3200                Ok(ScheduledScanLine {
3201                    decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
3202                    rows_scheduled: page_load_task.num_rows,
3203                })
3204            })
3205            .collect::<Result<Vec<_>>>()
3206    }
3207}
3208
3209#[derive(Debug)]
3210struct PageInfoAndScheduler {
3211    page_index: usize,
3212    num_rows: u64,
3213    scheduler: Box<dyn StructuralPageScheduler>,
3214}
3215
3216/// A scheduler for a leaf node
3217///
3218/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
3219/// appropriate for the layout of the page.
3220#[derive(Debug)]
3221pub struct StructuralPrimitiveFieldScheduler {
3222    page_schedulers: Vec<PageInfoAndScheduler>,
3223    column_index: u32,
3224}
3225
3226impl StructuralPrimitiveFieldScheduler {
3227    pub fn try_new(
3228        column_info: &ColumnInfo,
3229        decompressors: &dyn DecompressionStrategy,
3230        cache_repetition_index: bool,
3231        target_field: &Field,
3232    ) -> Result<Self> {
3233        let page_schedulers = column_info
3234            .page_infos
3235            .iter()
3236            .enumerate()
3237            .map(|(page_index, page_info)| {
3238                Self::page_info_to_scheduler(
3239                    page_info,
3240                    page_index,
3241                    decompressors,
3242                    cache_repetition_index,
3243                    target_field,
3244                )
3245            })
3246            .collect::<Result<Vec<_>>>()?;
3247        Ok(Self {
3248            page_schedulers,
3249            column_index: column_info.index,
3250        })
3251    }
3252
3253    fn page_layout_to_scheduler(
3254        page_info: &PageInfo,
3255        page_layout: &PageLayout,
3256        decompressors: &dyn DecompressionStrategy,
3257        cache_repetition_index: bool,
3258        target_field: &Field,
3259    ) -> Result<Box<dyn StructuralPageScheduler>> {
3260        use pb21::page_layout::Layout;
3261        Ok(match page_layout.layout.as_ref().expect_ok()? {
3262            Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3263                &page_info.buffer_offsets_and_sizes,
3264                page_info.priority,
3265                mini_block.num_items,
3266                mini_block,
3267                decompressors,
3268            )?),
3269            Layout::FullZipLayout(full_zip) => {
3270                let mut scheduler = FullZipScheduler::try_new(
3271                    &page_info.buffer_offsets_and_sizes,
3272                    page_info.priority,
3273                    page_info.num_rows,
3274                    full_zip,
3275                    decompressors,
3276                )?;
3277                scheduler.enable_cache = cache_repetition_index;
3278                Box::new(scheduler)
3279            }
3280            Layout::ConstantLayout(constant_layout) => {
3281                let def_meaning = constant_layout
3282                    .layers
3283                    .iter()
3284                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3285                    .collect::<Vec<_>>();
3286                let has_scalar_value = constant_layout.inline_value.is_some()
3287                    || page_info.buffer_offsets_and_sizes.len() == 1
3288                    || page_info.buffer_offsets_and_sizes.len() == 3;
3289                if has_scalar_value {
3290                    Box::new(constant::ConstantPageScheduler::try_new(
3291                        page_info.buffer_offsets_and_sizes.clone(),
3292                        constant_layout.inline_value.clone(),
3293                        target_field.data_type(),
3294                        def_meaning.into(),
3295                    )?) as Box<dyn StructuralPageScheduler>
3296                } else if def_meaning.len() == 1
3297                    && def_meaning[0] == DefinitionInterpretation::NullableItem
3298                {
3299                    Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3300                } else {
3301                    let rep_decompressor = constant_layout
3302                        .rep_compression
3303                        .as_ref()
3304                        .map(|encoding| decompressors.create_block_decompressor(encoding))
3305                        .transpose()?
3306                        .map(Arc::from);
3307
3308                    let def_decompressor = constant_layout
3309                        .def_compression
3310                        .as_ref()
3311                        .map(|encoding| decompressors.create_block_decompressor(encoding))
3312                        .transpose()?
3313                        .map(Arc::from);
3314
3315                    Box::new(ComplexAllNullScheduler::new(
3316                        page_info.buffer_offsets_and_sizes.clone(),
3317                        def_meaning.into(),
3318                        rep_decompressor,
3319                        def_decompressor,
3320                        constant_layout.num_rep_values,
3321                        constant_layout.num_def_values,
3322                    )) as Box<dyn StructuralPageScheduler>
3323                }
3324            }
3325            Layout::BlobLayout(blob) => {
3326                let inner_scheduler = Self::page_layout_to_scheduler(
3327                    page_info,
3328                    blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3329                    decompressors,
3330                    cache_repetition_index,
3331                    target_field,
3332                )?;
3333                let def_meaning = blob
3334                    .layers
3335                    .iter()
3336                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3337                    .collect::<Vec<_>>();
3338                if matches!(target_field.data_type(), DataType::Struct(_)) {
3339                    // User wants to decode blob into struct
3340                    Box::new(BlobDescriptionPageScheduler::new(
3341                        inner_scheduler,
3342                        def_meaning.into(),
3343                    ))
3344                } else {
3345                    // User wants to decode blob into binary data
3346                    Box::new(BlobPageScheduler::new(
3347                        inner_scheduler,
3348                        page_info.priority,
3349                        page_info.num_rows,
3350                        def_meaning.into(),
3351                    ))
3352                }
3353            }
3354        })
3355    }
3356
3357    fn page_info_to_scheduler(
3358        page_info: &PageInfo,
3359        page_index: usize,
3360        decompressors: &dyn DecompressionStrategy,
3361        cache_repetition_index: bool,
3362        target_field: &Field,
3363    ) -> Result<PageInfoAndScheduler> {
3364        let page_layout = page_info.encoding.as_structural();
3365        let scheduler = Self::page_layout_to_scheduler(
3366            page_info,
3367            page_layout,
3368            decompressors,
3369            cache_repetition_index,
3370            target_field,
3371        )?;
3372        Ok(PageInfoAndScheduler {
3373            page_index,
3374            num_rows: page_info.num_rows,
3375            scheduler,
3376        })
3377    }
3378}
3379
3380pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3381    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3382}
3383
3384pub struct NoCachedPageData;
3385
3386impl DeepSizeOf for NoCachedPageData {
3387    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3388        0
3389    }
3390}
3391impl CachedPageData for NoCachedPageData {
3392    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3393        self
3394    }
3395}
3396
3397pub struct CachedFieldData {
3398    pages: Vec<Arc<dyn CachedPageData>>,
3399}
3400
3401impl DeepSizeOf for CachedFieldData {
3402    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3403        self.pages.deep_size_of_children(ctx)
3404    }
3405}
3406
3407// Cache key for field data
3408#[derive(Debug, Clone)]
3409pub struct FieldDataCacheKey {
3410    pub column_index: u32,
3411}
3412
3413impl CacheKey for FieldDataCacheKey {
3414    type ValueType = CachedFieldData;
3415
3416    fn key(&self) -> std::borrow::Cow<'_, str> {
3417        self.column_index.to_string().into()
3418    }
3419}
3420
3421impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3422    fn initialize<'a>(
3423        &'a mut self,
3424        _filter: &'a FilterExpression,
3425        context: &'a SchedulerContext,
3426    ) -> BoxFuture<'a, Result<()>> {
3427        let cache_key = FieldDataCacheKey {
3428            column_index: self.column_index,
3429        };
3430        let cache = context.cache().clone();
3431
3432        async move {
3433            if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3434                self.page_schedulers
3435                    .iter_mut()
3436                    .zip(cached_data.pages.iter())
3437                    .for_each(|(page_scheduler, cached_data)| {
3438                        page_scheduler.scheduler.load(cached_data);
3439                    });
3440                return Ok(());
3441            }
3442
3443            let page_data = self
3444                .page_schedulers
3445                .iter_mut()
3446                .map(|s| s.scheduler.initialize(context.io()))
3447                .collect::<FuturesOrdered<_>>();
3448
3449            let page_data = page_data.try_collect::<Vec<_>>().await?;
3450            let cached_data = Arc::new(CachedFieldData { pages: page_data });
3451            cache.insert_with_key(&cache_key, cached_data).await;
3452            Ok(())
3453        }
3454        .boxed()
3455    }
3456
3457    fn schedule_ranges<'a>(
3458        &'a self,
3459        ranges: &[Range<u64>],
3460        _filter: &FilterExpression,
3461    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3462        let ranges = ranges.to_vec();
3463        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3464            self, ranges,
3465        )))
3466    }
3467}
3468
3469/// Takes the output from several pages decoders and
3470/// concatenates them.
3471#[derive(Debug)]
3472pub struct StructuralCompositeDecodeArrayTask {
3473    tasks: Vec<Box<dyn DecodePageTask>>,
3474    should_validate: bool,
3475    data_type: DataType,
3476}
3477
3478impl StructuralCompositeDecodeArrayTask {
3479    fn restore_validity(
3480        array: Arc<dyn Array>,
3481        unraveler: &mut CompositeRepDefUnraveler,
3482    ) -> Arc<dyn Array> {
3483        let validity = unraveler.unravel_validity(array.len());
3484        let Some(validity) = validity else {
3485            return array;
3486        };
3487        if array.data_type() == &DataType::Null {
3488            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3489            return array;
3490        }
3491        assert_eq!(validity.len(), array.len());
3492        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3493        // adding null buffers to the array here
3494        make_array(unsafe {
3495            array
3496                .to_data()
3497                .into_builder()
3498                .nulls(Some(validity))
3499                .build_unchecked()
3500        })
3501    }
3502}
3503
3504impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3505    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3506        let mut arrays = Vec::with_capacity(self.tasks.len());
3507        let mut unravelers = Vec::with_capacity(self.tasks.len());
3508        for task in self.tasks {
3509            let decoded = task.decode()?;
3510            unravelers.push(decoded.repdef);
3511
3512            let array = make_array(
3513                decoded
3514                    .data
3515                    .into_arrow(self.data_type.clone(), self.should_validate)?,
3516            );
3517
3518            arrays.push(array);
3519        }
3520        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3521        let array = arrow_select::concat::concat(&array_refs)?;
3522        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3523
3524        let array = Self::restore_validity(array, &mut repdef);
3525
3526        Ok(DecodedArray { array, repdef })
3527    }
3528}
3529
3530#[derive(Debug)]
3531pub struct StructuralPrimitiveFieldDecoder {
3532    field: Arc<ArrowField>,
3533    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3534    should_validate: bool,
3535    rows_drained_in_current: u64,
3536}
3537
3538impl StructuralPrimitiveFieldDecoder {
3539    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3540        Self {
3541            field: field.clone(),
3542            page_decoders: VecDeque::new(),
3543            should_validate,
3544            rows_drained_in_current: 0,
3545        }
3546    }
3547}
3548
3549impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3550    fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3551        assert!(child.path.is_empty());
3552        self.page_decoders.push_back(child.decoder);
3553        Ok(())
3554    }
3555
3556    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3557        let mut remaining = num_rows;
3558        let mut tasks = Vec::new();
3559        while remaining > 0 {
3560            let cur_page = self.page_decoders.front_mut().unwrap();
3561            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3562            let to_take = num_in_page.min(remaining);
3563
3564            let task = cur_page.drain(to_take)?;
3565            tasks.push(task);
3566
3567            if to_take == num_in_page {
3568                self.page_decoders.pop_front();
3569                self.rows_drained_in_current = 0;
3570            } else {
3571                self.rows_drained_in_current += to_take;
3572            }
3573
3574            remaining -= to_take;
3575        }
3576        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3577            tasks,
3578            should_validate: self.should_validate,
3579            data_type: self.field.data_type().clone(),
3580        }))
3581    }
3582
3583    fn data_type(&self) -> &DataType {
3584        self.field.data_type()
3585    }
3586}
3587
3588/// The serialized representation of full-zip data
3589struct SerializedFullZip {
3590    /// The zipped values buffer
3591    values: LanceBuffer,
3592    /// The repetition index (only present if there is repetition)
3593    repetition_index: Option<LanceBuffer>,
3594}
3595
3596// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3597// to allow us to store a chunk size in 12 bits.
3598//
3599// If we directly record the size in bytes with 12 bits we would be limited to
3600// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3601// words we can store the # of words instead which gives us 32KiB.  We want
3602// at least 24KiB so we can handle even the worst case of
3603// - 4Ki values compressed into an 8186 byte buffer
3604// - 4 bytes to describe rep & def lengths
3605// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3606//   plan for it)
3607//
3608// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3609// values like offsets to be stored in a mini-block and safely read back out.  It also
3610// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3611// needed).
3612//
3613// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3614// the page" and "aligned to 8 bytes from the start of the file."
3615const MINIBLOCK_ALIGNMENT: usize = 8;
3616
3617/// An encoder for primitive (leaf) arrays
3618///
3619/// This encoder is fairly complicated and follows a number of paths depending
3620/// on the data.
3621///
3622/// First, we convert the validity & offsets information into repetition and
3623/// definition levels.  Then we compress the data itself into a single buffer.
3624///
3625/// If the data is narrow then we encode the data in small chunks (each chunk
3626/// should be a few disk sectors and contains a buffer of repetition, a buffer
3627/// of definition, and a buffer of value data).  This approach is called
3628/// "mini-block".  These mini-blocks are stored into a single data buffer.
3629///
3630/// If the data is wide then we zip together the repetition and definition value
3631/// with the value data into a single buffer.  This approach is called "zipped".
3632///
3633/// If there is any repetition information then we create a repetition index
3634///
3635/// In addition, the compression process may create zero or more metadata buffers.
3636/// For example, a dictionary compression will create dictionary metadata.  Any
3637/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3638/// stored in a separate buffer on disk and read at initialization time.
3639///
3640/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3641/// at (roughly) the end of the file so there is, at most, one read per column of
3642/// metadata per file.
3643pub struct PrimitiveStructuralEncoder {
3644    // Accumulates arrays until we have enough data to justify a disk page
3645    accumulation_queue: AccumulationQueue,
3646
3647    keep_original_array: bool,
3648    support_large_chunk: bool,
3649    accumulated_repdefs: Vec<RepDefBuilder>,
3650    // The compression strategy we will use to compress the data
3651    compression_strategy: Arc<dyn CompressionStrategy>,
3652    column_index: u32,
3653    field: Field,
3654    encoding_metadata: Arc<HashMap<String, String>>,
3655    version: LanceFileVersion,
3656}
3657
3658struct CompressedLevelsChunk {
3659    data: LanceBuffer,
3660    num_levels: u16,
3661}
3662
3663struct CompressedLevels {
3664    data: Vec<CompressedLevelsChunk>,
3665    compression: CompressiveEncoding,
3666    rep_index: Option<LanceBuffer>,
3667}
3668
3669struct SerializedMiniBlockPage {
3670    num_buffers: u64,
3671    data: LanceBuffer,
3672    metadata: LanceBuffer,
3673}
3674
3675#[derive(Debug, Clone, Copy)]
3676struct DictEncodingBudget {
3677    max_dict_entries: u32,
3678    max_encoded_size: usize,
3679}
3680
3681impl PrimitiveStructuralEncoder {
3682    pub fn try_new(
3683        options: &EncodingOptions,
3684        compression_strategy: Arc<dyn CompressionStrategy>,
3685        column_index: u32,
3686        field: Field,
3687        encoding_metadata: Arc<HashMap<String, String>>,
3688    ) -> Result<Self> {
3689        Ok(Self {
3690            accumulation_queue: AccumulationQueue::new(
3691                options.cache_bytes_per_column,
3692                column_index,
3693                options.keep_original_array,
3694            ),
3695            support_large_chunk: options.support_large_chunk(),
3696            keep_original_array: options.keep_original_array,
3697            accumulated_repdefs: Vec::new(),
3698            column_index,
3699            compression_strategy,
3700            field,
3701            encoding_metadata,
3702            version: options.version,
3703        })
3704    }
3705
3706    // TODO: This is a heuristic we may need to tune at some point
3707    //
3708    // As data gets narrow then the "zipping" process gets too expensive
3709    //   and we prefer mini-block
3710    // As data gets wide then the # of values per block shrinks (very wide)
3711    //   data doesn't even fit in a mini-block and the block overhead gets
3712    //   too large and we prefer zipped.
3713    fn is_narrow(data_block: &DataBlock) -> bool {
3714        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3715
3716        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3717            let max_len_array = max_len_array
3718                .as_any()
3719                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3720                .unwrap();
3721            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3722                return true;
3723            }
3724        }
3725        false
3726    }
3727
3728    fn prefers_miniblock(
3729        data_block: &DataBlock,
3730        encoding_metadata: &HashMap<String, String>,
3731    ) -> bool {
3732        // If the user specifically requested miniblock then use it
3733        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3734            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3735        }
3736        // Otherwise only use miniblock if it is narrow
3737        Self::is_narrow(data_block)
3738    }
3739
3740    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3741        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3742        // user specifically requested not to use it (in which case we're probably going
3743        // to emit an error)
3744        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3745            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3746        }
3747        true
3748    }
3749
3750    // Converts value data, repetition levels, and definition levels into a single
3751    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3752    // which tells us the size of each block.  Finally, if repetition is present then
3753    // we also create a buffer for the repetition index.
3754    //
3755    // Each chunk is serialized as:
3756    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3757    //
3758    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3759    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3760    //
3761    // Each block has a u16 word of metadata.  The upper 12 bits contain the
3762    // # of 8-byte words in the block (if the block does not fill the final word
3763    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3764    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3765    // 0xA)  All blocks except the last must have power-of-two number of values.
3766    // This not only makes metadata smaller but it makes decoding easier since
3767    // batch sizes are typically a power of 2.  4 bits would allow us to express
3768    // up to 16Ki values but we restrict this further to 4Ki values.
3769    //
3770    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3771    //
3772    // All metadata words are serialized (as little endian) into a single buffer
3773    // of metadata values.
3774    //
3775    // If there is repetition then we also create a repetition index.  This is a
3776    // single buffer of integer vectors (stored in row major order).  There is one
3777    // entry for each chunk.  The size of the vector is based on the depth of random
3778    // access we want to support.
3779    //
3780    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3781    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3782    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3783    // levels of nested access and so on.
3784    //
3785    // The first number in the vector is the number of top-level rows that complete in
3786    // the chunk.  The second number is the number of second-level rows that complete
3787    // after the final top-level row completed (or beginning of the chunk if no top-level
3788    // row completes in the chunk).  And so on.  The final number in the vector is always
3789    // the number of leftover items not covered by earlier entries in the vector.
3790    //
3791    // Currently we are limited to 0 levels of nested access but that will change in the
3792    // future.
3793    //
3794    // The repetition index and the chunk metadata are read at initialization time and
3795    // cached in memory.
3796    fn serialize_miniblocks(
3797        miniblocks: MiniBlockCompressed,
3798        rep: Option<Vec<CompressedLevelsChunk>>,
3799        def: Option<Vec<CompressedLevelsChunk>>,
3800        support_large_chunk: bool,
3801    ) -> SerializedMiniBlockPage {
3802        let bytes_rep = rep
3803            .as_ref()
3804            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3805            .unwrap_or(0);
3806        let bytes_def = def
3807            .as_ref()
3808            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3809            .unwrap_or(0);
3810        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3811        let mut num_buffers = miniblocks.data.len();
3812        if rep.is_some() {
3813            num_buffers += 1;
3814        }
3815        if def.is_some() {
3816            num_buffers += 1;
3817        }
3818        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3819        let max_extra = 9 * num_buffers;
3820        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3821        let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3822        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3823
3824        let mut rep_iter = rep.map(|r| r.into_iter());
3825        let mut def_iter = def.map(|d| d.into_iter());
3826
3827        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3828        for chunk in miniblocks.chunks {
3829            let start_pos = data_buffer.len();
3830            // Start of chunk should be aligned
3831            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3832
3833            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3834            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3835
3836            // Write the number of levels, or 0 if there is no rep/def
3837            let num_levels = rep
3838                .as_ref()
3839                .map(|r| r.num_levels)
3840                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3841            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3842
3843            // Write the buffer lengths
3844            if let Some(rep) = rep.as_ref() {
3845                let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3846                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3847            }
3848            if let Some(def) = def.as_ref() {
3849                let bytes_def = u16::try_from(def.data.len()).unwrap();
3850                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3851            }
3852
3853            if support_large_chunk {
3854                for &buffer_size in &chunk.buffer_sizes {
3855                    data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
3856                }
3857            } else {
3858                for &buffer_size in &chunk.buffer_sizes {
3859                    data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
3860                }
3861            }
3862
3863            // Pad
3864            let add_padding = |data_buffer: &mut Vec<u8>| {
3865                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3866                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3867            };
3868            add_padding(&mut data_buffer);
3869
3870            // Write the buffers themselves
3871            if let Some(rep) = rep.as_ref() {
3872                data_buffer.extend_from_slice(&rep.data);
3873                add_padding(&mut data_buffer);
3874            }
3875            if let Some(def) = def.as_ref() {
3876                data_buffer.extend_from_slice(&def.data);
3877                add_padding(&mut data_buffer);
3878            }
3879            for (buffer_size, (buffer, buffer_offset)) in chunk
3880                .buffer_sizes
3881                .iter()
3882                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3883            {
3884                let start = *buffer_offset;
3885                let end = start + *buffer_size as usize;
3886                *buffer_offset += *buffer_size as usize;
3887                data_buffer.extend_from_slice(&buffer[start..end]);
3888                add_padding(&mut data_buffer);
3889            }
3890
3891            let chunk_bytes = data_buffer.len() - start_pos;
3892            let max_chunk_size = if support_large_chunk {
3893                4 * 1024 * 1024 * 1024 // 4GB limit with u32 metadata
3894            } else {
3895                32 * 1024 // 32KiB limit with u16 metadata
3896            };
3897            assert!(chunk_bytes <= max_chunk_size);
3898            assert!(chunk_bytes > 0);
3899            assert_eq!(chunk_bytes % 8, 0);
3900            // 4Ki values max
3901            assert!(chunk.log_num_values <= 12);
3902            // We subtract 1 here from chunk_bytes because we want to be able to express
3903            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3904            // 0xFFF
3905            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3906            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3907
3908            let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
3909            if support_large_chunk {
3910                meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
3911            } else {
3912                meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
3913            }
3914        }
3915
3916        let data_buffer = LanceBuffer::from(data_buffer);
3917        let metadata_buffer = LanceBuffer::from(meta_buffer);
3918
3919        SerializedMiniBlockPage {
3920            num_buffers: miniblocks.data.len() as u64,
3921            data: data_buffer,
3922            metadata: metadata_buffer,
3923        }
3924    }
3925
3926    /// Compresses a buffer of levels into chunks
3927    ///
3928    /// If these are repetition levels then we also calculate the repetition index here (that
3929    /// is the third return value)
3930    fn compress_levels(
3931        mut levels: RepDefSlicer<'_>,
3932        num_elements: u64,
3933        compression_strategy: &dyn CompressionStrategy,
3934        chunks: &[MiniBlockChunk],
3935        // This will be 0 if we are compressing def levels
3936        max_rep: u16,
3937    ) -> Result<CompressedLevels> {
3938        let mut rep_index = if max_rep > 0 {
3939            Vec::with_capacity(chunks.len())
3940        } else {
3941            vec![]
3942        };
3943        // Make the levels into a FixedWidth data block
3944        let num_levels = levels.num_levels() as u64;
3945        let levels_buf = levels.all_levels().clone();
3946
3947        let mut fixed_width_block = FixedWidthDataBlock {
3948            data: levels_buf,
3949            bits_per_value: 16,
3950            num_values: num_levels,
3951            block_info: BlockInfo::new(),
3952        };
3953        // Compute statistics to enable optimal compression for rep/def levels
3954        fixed_width_block.compute_stat();
3955
3956        let levels_block = DataBlock::FixedWidth(fixed_width_block);
3957        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3958        // Pick a block compressor
3959        let (compressor, compressor_desc) =
3960            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3961        // Compress blocks of levels (sized according to the chunks)
3962        let mut level_chunks = Vec::with_capacity(chunks.len());
3963        let mut values_counter = 0;
3964        for (chunk_idx, chunk) in chunks.iter().enumerate() {
3965            let chunk_num_values = chunk.num_values(values_counter, num_elements);
3966            debug_assert!(chunk_num_values > 0);
3967            values_counter += chunk_num_values;
3968            let chunk_levels = if chunk_idx < chunks.len() - 1 {
3969                levels.slice_next(chunk_num_values as usize)
3970            } else {
3971                levels.slice_rest()
3972            };
3973            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3974            if max_rep > 0 {
3975                // If max_rep > 0 then we are working with rep levels and we need
3976                // to calculate the repetition index.  The repetition index for a
3977                // chunk is currently 2 values (in the future it may be more).
3978                //
3979                // The first value is the number of rows that _finish_ in the
3980                // chunk.
3981                //
3982                // The second value is the number of "leftovers" after the last
3983                // finished row in the chunk.
3984                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3985                let rep_values = rep_values.as_ref();
3986
3987                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3988                // will count it in the previous chunk)
3989                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3990                let num_leftovers = if chunk_idx < chunks.len() - 1 {
3991                    rep_values
3992                        .iter()
3993                        .rev()
3994                        .position(|v| *v == max_rep)
3995                        // # of leftovers includes the max_rep spot
3996                        .map(|pos| pos + 1)
3997                        .unwrap_or(rep_values.len())
3998                } else {
3999                    // Last chunk can't have leftovers
4000                    0
4001                };
4002
4003                if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
4004                    // This chunk starts with a new row and so, if we thought we had leftovers
4005                    // in the previous chunk, we were mistaken
4006                    // TODO: Can use unchecked here
4007                    let rep_len = rep_index.len();
4008                    if rep_index[rep_len - 1] != 0 {
4009                        // We thought we had leftovers but that was actually a full row
4010                        rep_index[rep_len - 2] += 1;
4011                        rep_index[rep_len - 1] = 0;
4012                    }
4013                }
4014
4015                if chunk_idx == chunks.len() - 1 {
4016                    // The final list
4017                    num_rows += 1;
4018                }
4019                rep_index.push(num_rows as u64);
4020                rep_index.push(num_leftovers as u64);
4021            }
4022            let mut chunk_fixed_width = FixedWidthDataBlock {
4023                data: chunk_levels,
4024                bits_per_value: 16,
4025                num_values: num_chunk_levels,
4026                block_info: BlockInfo::new(),
4027            };
4028            chunk_fixed_width.compute_stat();
4029            let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
4030            let compressed_levels = compressor.compress(chunk_levels_block)?;
4031            level_chunks.push(CompressedLevelsChunk {
4032                data: compressed_levels,
4033                num_levels: num_chunk_levels as u16,
4034            });
4035        }
4036        debug_assert_eq!(levels.num_levels_remaining(), 0);
4037        let rep_index = if rep_index.is_empty() {
4038            None
4039        } else {
4040            Some(LanceBuffer::reinterpret_vec(rep_index))
4041        };
4042        Ok(CompressedLevels {
4043            data: level_chunks,
4044            compression: compressor_desc,
4045            rep_index,
4046        })
4047    }
4048
4049    fn encode_simple_all_null(
4050        column_idx: u32,
4051        num_rows: u64,
4052        row_number: u64,
4053    ) -> Result<EncodedPage> {
4054        let description =
4055            ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
4056        Ok(EncodedPage {
4057            column_idx,
4058            data: vec![],
4059            description: PageEncoding::Structural(description),
4060            num_rows,
4061            row_number,
4062        })
4063    }
4064
4065    fn encode_complex_all_null_vals(
4066        data: &Arc<[u16]>,
4067        compression_strategy: &dyn CompressionStrategy,
4068    ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
4069        let buffer = LanceBuffer::reinterpret_slice(data.clone());
4070        let mut fixed_width_block = FixedWidthDataBlock {
4071            data: buffer,
4072            bits_per_value: 16,
4073            num_values: data.len() as u64,
4074            block_info: BlockInfo::new(),
4075        };
4076        fixed_width_block.compute_stat();
4077
4078        let levels_block = DataBlock::FixedWidth(fixed_width_block);
4079        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4080        let (compressor, encoding) =
4081            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4082        let compressed_buffer = compressor.compress(levels_block)?;
4083        Ok((compressed_buffer, encoding))
4084    }
4085
4086    // Encodes a page where all values are null but we have rep/def
4087    // information that we need to store (e.g. to distinguish between
4088    // different kinds of null)
4089    fn encode_complex_all_null(
4090        column_idx: u32,
4091        repdef: crate::repdef::SerializedRepDefs,
4092        row_number: u64,
4093        num_rows: u64,
4094        version: LanceFileVersion,
4095        compression_strategy: &dyn CompressionStrategy,
4096    ) -> Result<EncodedPage> {
4097        if version.resolve() < LanceFileVersion::V2_2 {
4098            let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
4099                LanceBuffer::reinterpret_slice(rep.clone())
4100            } else {
4101                LanceBuffer::empty()
4102            };
4103
4104            let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
4105                LanceBuffer::reinterpret_slice(def.clone())
4106            } else {
4107                LanceBuffer::empty()
4108            };
4109
4110            let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
4111            return Ok(EncodedPage {
4112                column_idx,
4113                data: vec![rep_bytes, def_bytes],
4114                description: PageEncoding::Structural(description),
4115                num_rows,
4116                row_number,
4117            });
4118        }
4119
4120        let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
4121            repdef.repetition_levels.as_ref()
4122        {
4123            let num_values = rep.len() as u64;
4124            let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
4125            (buffer, Some(encoding), num_values)
4126        } else {
4127            (LanceBuffer::empty(), None, 0)
4128        };
4129
4130        let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
4131            repdef.definition_levels.as_ref()
4132        {
4133            let num_values = def.len() as u64;
4134            let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
4135            (buffer, Some(encoding), num_values)
4136        } else {
4137            (LanceBuffer::empty(), None, 0)
4138        };
4139
4140        let description = ProtobufUtils21::compressed_all_null_constant_layout(
4141            &repdef.def_meaning,
4142            rep_encoding,
4143            def_encoding,
4144            num_rep_values,
4145            num_def_values,
4146        );
4147        Ok(EncodedPage {
4148            column_idx,
4149            data: vec![rep_bytes, def_bytes],
4150            description: PageEncoding::Structural(description),
4151            num_rows,
4152            row_number,
4153        })
4154    }
4155
4156    fn leaf_validity(
4157        repdef: &crate::repdef::SerializedRepDefs,
4158        num_values: usize,
4159    ) -> Result<Option<BooleanBuffer>> {
4160        let rep = repdef
4161            .repetition_levels
4162            .as_ref()
4163            .map(|rep| rep.as_ref().to_vec());
4164        let def = repdef
4165            .definition_levels
4166            .as_ref()
4167            .map(|def| def.as_ref().to_vec());
4168        let mut unraveler = RepDefUnraveler::new(
4169            rep,
4170            def,
4171            repdef.def_meaning.clone().into(),
4172            num_values as u64,
4173        );
4174        if unraveler.is_all_valid() {
4175            return Ok(None);
4176        }
4177        let mut validity = BooleanBufferBuilder::new(num_values);
4178        unraveler.unravel_validity(&mut validity);
4179        Ok(Some(validity.finish()))
4180    }
4181
4182    fn is_constant_values(
4183        arrays: &[ArrayRef],
4184        scalar: &ArrayRef,
4185        validity: Option<&BooleanBuffer>,
4186    ) -> Result<bool> {
4187        debug_assert_eq!(scalar.len(), 1);
4188        debug_assert_eq!(scalar.null_count(), 0);
4189
4190        match scalar.data_type() {
4191            DataType::Boolean => {
4192                let mut global_idx = 0usize;
4193                let scalar_val = scalar.as_boolean().value(0);
4194                for arr in arrays {
4195                    let bool_arr = arr.as_boolean();
4196                    for i in 0..arr.len() {
4197                        let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4198                        global_idx += 1;
4199                        if !is_valid {
4200                            continue;
4201                        }
4202                        if bool_arr.value(i) != scalar_val {
4203                            return Ok(false);
4204                        }
4205                    }
4206                }
4207                Ok(true)
4208            }
4209            DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
4210            DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
4211            DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
4212            DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
4213            data_type => {
4214                let mut global_idx = 0usize;
4215                let Some(byte_width) = data_type.byte_width_opt() else {
4216                    return Ok(false);
4217                };
4218                let scalar_data = scalar.to_data();
4219                if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
4220                    return Ok(false);
4221                }
4222                let scalar_bytes = scalar_data.buffers()[0].as_slice();
4223                if scalar_bytes.len() != byte_width {
4224                    return Ok(false);
4225                }
4226
4227                for arr in arrays {
4228                    let data = arr.to_data();
4229                    if data.buffers().is_empty() {
4230                        return Ok(false);
4231                    }
4232                    let buf = data.buffers()[0].as_slice();
4233                    let base = data.offset();
4234                    for i in 0..arr.len() {
4235                        let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4236                        global_idx += 1;
4237                        if !is_valid {
4238                            continue;
4239                        }
4240                        let start = (base + i) * byte_width;
4241                        if buf[start..start + byte_width] != scalar_bytes[..] {
4242                            return Ok(false);
4243                        }
4244                    }
4245                }
4246                Ok(true)
4247            }
4248        }
4249    }
4250
4251    fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
4252        arrays: &[ArrayRef],
4253        scalar: &ArrayRef,
4254        validity: Option<&BooleanBuffer>,
4255    ) -> Result<bool> {
4256        debug_assert_eq!(scalar.len(), 1);
4257        let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
4258        let mut global_idx = 0usize;
4259        for arr in arrays {
4260            let str_arr = arr.as_string::<O>();
4261            for i in 0..arr.len() {
4262                let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4263                global_idx += 1;
4264                if !is_valid {
4265                    continue;
4266                }
4267                if str_arr.value(i).as_bytes() != scalar_val {
4268                    return Ok(false);
4269                }
4270            }
4271        }
4272        Ok(true)
4273    }
4274
4275    fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
4276        arrays: &[ArrayRef],
4277        scalar: &ArrayRef,
4278        validity: Option<&BooleanBuffer>,
4279    ) -> Result<bool> {
4280        debug_assert_eq!(scalar.len(), 1);
4281        let scalar_val = scalar.as_binary::<O>().value(0);
4282        let mut global_idx = 0usize;
4283        for arr in arrays {
4284            let bin_arr = arr.as_binary::<O>();
4285            for i in 0..arr.len() {
4286                let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4287                global_idx += 1;
4288                if !is_valid {
4289                    continue;
4290                }
4291                if bin_arr.value(i) != scalar_val {
4292                    return Ok(false);
4293                }
4294            }
4295        }
4296        Ok(true)
4297    }
4298
4299    fn find_constant_scalar(
4300        arrays: &[ArrayRef],
4301        validity: Option<&BooleanBuffer>,
4302    ) -> Result<Option<ArrayRef>> {
4303        if arrays.is_empty() {
4304            return Ok(None);
4305        }
4306
4307        let global_scalar_idx = if let Some(validity) = validity {
4308            let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
4309                return Ok(None);
4310            };
4311            idx
4312        } else {
4313            0
4314        };
4315
4316        let mut idx_remaining = global_scalar_idx;
4317        let mut scalar_arr_idx = 0usize;
4318        while scalar_arr_idx < arrays.len() {
4319            let len = arrays[scalar_arr_idx].len();
4320            if idx_remaining < len {
4321                break;
4322            }
4323            idx_remaining -= len;
4324            scalar_arr_idx += 1;
4325        }
4326
4327        if scalar_arr_idx >= arrays.len() {
4328            return Ok(None);
4329        }
4330
4331        let scalar =
4332            lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
4333        if scalar.null_count() != 0 {
4334            return Ok(None);
4335        }
4336        if !Self::is_constant_values(arrays, &scalar, validity)? {
4337            return Ok(None);
4338        }
4339        Ok(Some(scalar))
4340    }
4341
4342    fn resolve_dict_values_compression_metadata(
4343        field_metadata: &HashMap<String, String>,
4344        env_compression: Option<String>,
4345        env_compression_level: Option<String>,
4346    ) -> HashMap<String, String> {
4347        let mut metadata = HashMap::new();
4348
4349        let compression = field_metadata
4350            .get(DICT_VALUES_COMPRESSION_META_KEY)
4351            .cloned()
4352            .or(env_compression)
4353            .unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
4354        metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
4355
4356        if let Some(compression_level) = field_metadata
4357            .get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
4358            .cloned()
4359            .or(env_compression_level)
4360        {
4361            metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
4362        }
4363
4364        metadata
4365    }
4366
4367    fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
4368        // This is an internal synthetic field used only to feed metadata into
4369        // `create_block_compressor` for dictionary values. The concrete type/name here
4370        // are not semantically meaningful; we rely on explicit metadata below to control
4371        // general compression selection for dictionary values.
4372        let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
4373        dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
4374            &field.metadata,
4375            env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
4376            env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
4377        );
4378        Ok(dict_values_field)
4379    }
4380
4381    #[allow(clippy::too_many_arguments)]
4382    fn encode_miniblock(
4383        column_idx: u32,
4384        field: &Field,
4385        compression_strategy: &dyn CompressionStrategy,
4386        data: DataBlock,
4387        repdef: crate::repdef::SerializedRepDefs,
4388        row_number: u64,
4389        dictionary_data: Option<DataBlock>,
4390        num_rows: u64,
4391        support_large_chunk: bool,
4392    ) -> Result<EncodedPage> {
4393        if let DataBlock::AllNull(_null_block) = data {
4394            // We should not be using mini-block for all-null.  There are other structural
4395            // encodings for that.
4396            unreachable!()
4397        }
4398
4399        let num_items = data.num_values();
4400
4401        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
4402        let (compressed_data, value_encoding) = compressor.compress(data)?;
4403
4404        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
4405
4406        let mut compressed_rep = repdef
4407            .rep_slicer()
4408            .map(|rep_slicer| {
4409                Self::compress_levels(
4410                    rep_slicer,
4411                    num_items,
4412                    compression_strategy,
4413                    &compressed_data.chunks,
4414                    max_rep,
4415                )
4416            })
4417            .transpose()?;
4418
4419        let (rep_index, rep_index_depth) =
4420            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
4421                Some(rep_index) => (Some(rep_index.clone()), 1),
4422                None => (None, 0),
4423            };
4424
4425        let mut compressed_def = repdef
4426            .def_slicer()
4427            .map(|def_slicer| {
4428                Self::compress_levels(
4429                    def_slicer,
4430                    num_items,
4431                    compression_strategy,
4432                    &compressed_data.chunks,
4433                    /*max_rep=*/ 0,
4434                )
4435            })
4436            .transpose()?;
4437
4438        // TODO: Parquet sparsely encodes values here.  We could do the same but
4439        // then we won't have log2 values per chunk.  This means more metadata
4440        // and potentially more decoder asymmetry.  However, it may be worth
4441        // investigating at some point
4442
4443        let rep_data = compressed_rep
4444            .as_mut()
4445            .map(|cr| std::mem::take(&mut cr.data));
4446        let def_data = compressed_def
4447            .as_mut()
4448            .map(|cd| std::mem::take(&mut cd.data));
4449
4450        let serialized =
4451            Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk);
4452
4453        // Metadata, Data, Dictionary, (maybe) Repetition Index
4454        let mut data = Vec::with_capacity(4);
4455        data.push(serialized.metadata);
4456        data.push(serialized.data);
4457
4458        if let Some(dictionary_data) = dictionary_data {
4459            let num_dictionary_items = dictionary_data.num_values();
4460            let dict_values_field = Self::build_dict_values_compressor_field(field)?;
4461
4462            let (compressor, dictionary_encoding) = compression_strategy
4463                .create_block_compressor(&dict_values_field, &dictionary_data)?;
4464            let dictionary_buffer = compressor.compress(dictionary_data)?;
4465
4466            data.push(dictionary_buffer);
4467            if let Some(rep_index) = rep_index {
4468                data.push(rep_index);
4469            }
4470
4471            let description = ProtobufUtils21::miniblock_layout(
4472                compressed_rep.map(|cr| cr.compression),
4473                compressed_def.map(|cd| cd.compression),
4474                value_encoding,
4475                rep_index_depth,
4476                serialized.num_buffers,
4477                Some((dictionary_encoding, num_dictionary_items)),
4478                &repdef.def_meaning,
4479                num_items,
4480                support_large_chunk,
4481            );
4482            Ok(EncodedPage {
4483                num_rows,
4484                column_idx,
4485                data,
4486                description: PageEncoding::Structural(description),
4487                row_number,
4488            })
4489        } else {
4490            let description = ProtobufUtils21::miniblock_layout(
4491                compressed_rep.map(|cr| cr.compression),
4492                compressed_def.map(|cd| cd.compression),
4493                value_encoding,
4494                rep_index_depth,
4495                serialized.num_buffers,
4496                None,
4497                &repdef.def_meaning,
4498                num_items,
4499                support_large_chunk,
4500            );
4501
4502            if let Some(rep_index) = rep_index {
4503                let view = rep_index.borrow_to_typed_slice::<u64>();
4504                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4505                debug_assert_eq!(total, num_rows);
4506
4507                data.push(rep_index);
4508            }
4509
4510            Ok(EncodedPage {
4511                num_rows,
4512                column_idx,
4513                data,
4514                description: PageEncoding::Structural(description),
4515                row_number,
4516            })
4517        }
4518    }
4519
4520    // For fixed-size data we encode < control word | data > for each value
4521    fn serialize_full_zip_fixed(
4522        fixed: FixedWidthDataBlock,
4523        mut repdef: ControlWordIterator,
4524        num_values: u64,
4525    ) -> SerializedFullZip {
4526        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4527        let mut zipped_data = Vec::with_capacity(len);
4528
4529        let max_rep_index_val = if repdef.has_repetition() {
4530            len as u64
4531        } else {
4532            // Setting this to 0 means we won't write a repetition index
4533            0
4534        };
4535        let mut rep_index_builder =
4536            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4537
4538        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
4539        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
4540        assert_eq!(
4541            fixed.bits_per_value % 8,
4542            0,
4543            "Non-byte aligned full-zip compression not yet supported"
4544        );
4545
4546        let bytes_per_value = fixed.bits_per_value as usize / 8;
4547        let mut offset = 0;
4548
4549        if bytes_per_value == 0 {
4550            // No data, just dump the repdef into the buffer
4551            while let Some(control) = repdef.append_next(&mut zipped_data) {
4552                if control.is_new_row {
4553                    // We have finished a row
4554                    debug_assert!(offset <= len);
4555                    // SAFETY: We know that `start <= len`
4556                    unsafe { rep_index_builder.append(offset as u64) };
4557                }
4558                offset = zipped_data.len();
4559            }
4560        } else {
4561            // We have data, zip it with the repdef
4562            let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4563            while let Some(control) = repdef.append_next(&mut zipped_data) {
4564                if control.is_new_row {
4565                    // We have finished a row
4566                    debug_assert!(offset <= len);
4567                    // SAFETY: We know that `start <= len`
4568                    unsafe { rep_index_builder.append(offset as u64) };
4569                }
4570                if control.is_visible {
4571                    let value = data_iter.next().unwrap();
4572                    zipped_data.extend_from_slice(value);
4573                }
4574                offset = zipped_data.len();
4575            }
4576        }
4577
4578        debug_assert_eq!(zipped_data.len(), len);
4579        // Put the final value in the rep index
4580        // SAFETY: `zipped_data.len() == len`
4581        unsafe {
4582            rep_index_builder.append(zipped_data.len() as u64);
4583        }
4584
4585        let zipped_data = LanceBuffer::from(zipped_data);
4586        let rep_index = rep_index_builder.into_data();
4587        let rep_index = if rep_index.is_empty() {
4588            None
4589        } else {
4590            Some(LanceBuffer::from(rep_index))
4591        };
4592        SerializedFullZip {
4593            values: zipped_data,
4594            repetition_index: rep_index,
4595        }
4596    }
4597
4598    // For variable-size data we encode < control word | length | data > for each value
4599    //
4600    // In addition, we create a second buffer, the repetition index
4601    fn serialize_full_zip_variable(
4602        variable: VariableWidthBlock,
4603        mut repdef: ControlWordIterator,
4604        num_items: u64,
4605    ) -> SerializedFullZip {
4606        let bytes_per_offset = variable.bits_per_offset as usize / 8;
4607        assert_eq!(
4608            variable.bits_per_offset % 8,
4609            0,
4610            "Only byte-aligned offsets supported"
4611        );
4612        let len = variable.data.len()
4613            + repdef.bytes_per_word() * num_items as usize
4614            + bytes_per_offset * variable.num_values as usize;
4615        let mut buf = Vec::with_capacity(len);
4616
4617        let max_rep_index_val = len as u64;
4618        let mut rep_index_builder =
4619            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4620
4621        // TODO: byte pack the item lengths with varint encoding
4622        match bytes_per_offset {
4623            4 => {
4624                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4625                let mut rep_offset = 0;
4626                let mut windows_iter = offs.as_ref().windows(2);
4627                while let Some(control) = repdef.append_next(&mut buf) {
4628                    if control.is_new_row {
4629                        // We have finished a row
4630                        debug_assert!(rep_offset <= len);
4631                        // SAFETY: We know that `buf.len() <= len`
4632                        unsafe { rep_index_builder.append(rep_offset as u64) };
4633                    }
4634                    if control.is_visible {
4635                        let window = windows_iter.next().unwrap();
4636                        if control.is_valid_item {
4637                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4638                            buf.extend_from_slice(
4639                                &variable.data[window[0] as usize..window[1] as usize],
4640                            );
4641                        }
4642                    }
4643                    rep_offset = buf.len();
4644                }
4645            }
4646            8 => {
4647                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4648                let mut rep_offset = 0;
4649                let mut windows_iter = offs.as_ref().windows(2);
4650                while let Some(control) = repdef.append_next(&mut buf) {
4651                    if control.is_new_row {
4652                        // We have finished a row
4653                        debug_assert!(rep_offset <= len);
4654                        // SAFETY: We know that `buf.len() <= len`
4655                        unsafe { rep_index_builder.append(rep_offset as u64) };
4656                    }
4657                    if control.is_visible {
4658                        let window = windows_iter.next().unwrap();
4659                        if control.is_valid_item {
4660                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4661                            buf.extend_from_slice(
4662                                &variable.data[window[0] as usize..window[1] as usize],
4663                            );
4664                        }
4665                    }
4666                    rep_offset = buf.len();
4667                }
4668            }
4669            _ => panic!("Unsupported offset size"),
4670        }
4671
4672        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4673        // if we are over `len` then we have a bug.
4674        debug_assert!(buf.len() <= len);
4675        // Put the final value in the rep index
4676        // SAFETY: `zipped_data.len() == len`
4677        unsafe {
4678            rep_index_builder.append(buf.len() as u64);
4679        }
4680
4681        let zipped_data = LanceBuffer::from(buf);
4682        let rep_index = rep_index_builder.into_data();
4683        debug_assert!(!rep_index.is_empty());
4684        let rep_index = Some(LanceBuffer::from(rep_index));
4685        SerializedFullZip {
4686            values: zipped_data,
4687            repetition_index: rep_index,
4688        }
4689    }
4690
4691    /// Serializes data into a single buffer according to the full-zip format which zips
4692    /// together the repetition, definition, and value data into a single buffer.
4693    fn serialize_full_zip(
4694        compressed_data: PerValueDataBlock,
4695        repdef: ControlWordIterator,
4696        num_items: u64,
4697    ) -> SerializedFullZip {
4698        match compressed_data {
4699            PerValueDataBlock::Fixed(fixed) => {
4700                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4701            }
4702            PerValueDataBlock::Variable(var) => {
4703                Self::serialize_full_zip_variable(var, repdef, num_items)
4704            }
4705        }
4706    }
4707
4708    fn encode_full_zip(
4709        column_idx: u32,
4710        field: &Field,
4711        compression_strategy: &dyn CompressionStrategy,
4712        data: DataBlock,
4713        repdef: crate::repdef::SerializedRepDefs,
4714        row_number: u64,
4715        num_lists: u64,
4716    ) -> Result<EncodedPage> {
4717        let max_rep = repdef
4718            .repetition_levels
4719            .as_ref()
4720            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4721        let max_def = repdef
4722            .definition_levels
4723            .as_ref()
4724            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4725
4726        // To handle FSL we just flatten
4727        // let data = data.flatten();
4728
4729        let (num_items, num_visible_items) =
4730            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4731                // If there are rep levels there may be "invisible" items and we need to encode
4732                // rep_levels.len() things which might be larger than data.num_values()
4733                (rep_levels.len() as u64, data.num_values())
4734            } else {
4735                // If there are no rep levels then we encode data.num_values() things
4736                (data.num_values(), data.num_values())
4737            };
4738
4739        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4740
4741        let repdef_iter = build_control_word_iterator(
4742            repdef.repetition_levels.as_deref(),
4743            max_rep,
4744            repdef.definition_levels.as_deref(),
4745            max_def,
4746            max_visible_def,
4747            num_items as usize,
4748        );
4749        let bits_rep = repdef_iter.bits_rep();
4750        let bits_def = repdef_iter.bits_def();
4751
4752        let compressor = compression_strategy.create_per_value(field, &data)?;
4753        let (compressed_data, value_encoding) = compressor.compress(data)?;
4754
4755        let description = match &compressed_data {
4756            PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4757                bits_rep,
4758                bits_def,
4759                fixed.bits_per_value as u32,
4760                value_encoding,
4761                &repdef.def_meaning,
4762                num_items as u32,
4763                num_visible_items as u32,
4764            ),
4765            PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4766                bits_rep,
4767                bits_def,
4768                variable.bits_per_offset as u32,
4769                value_encoding,
4770                &repdef.def_meaning,
4771                num_items as u32,
4772                num_visible_items as u32,
4773            ),
4774        };
4775
4776        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4777
4778        let data = if let Some(repindex) = zipped.repetition_index {
4779            vec![zipped.values, repindex]
4780        } else {
4781            vec![zipped.values]
4782        };
4783
4784        Ok(EncodedPage {
4785            num_rows: num_lists,
4786            column_idx,
4787            data,
4788            description: PageEncoding::Structural(description),
4789            row_number,
4790        })
4791    }
4792
4793    fn should_dictionary_encode(
4794        data_block: &DataBlock,
4795        field: &Field,
4796        version: LanceFileVersion,
4797    ) -> Option<DictEncodingBudget> {
4798        const DEFAULT_SAMPLE_SIZE: usize = 4096;
4799        const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
4800
4801        // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip
4802        // estimating the size for other types.
4803        match data_block {
4804            DataBlock::FixedWidth(fixed) => {
4805                if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4806                    return None;
4807                }
4808                if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4809                    return None;
4810                }
4811                if fixed.bits_per_value % 8 != 0 {
4812                    return None;
4813                }
4814            }
4815            DataBlock::VariableWidth(var) => {
4816                if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4817                    return None;
4818                }
4819            }
4820            _ => return None,
4821        }
4822
4823        // Don't dictionary encode tiny arrays.
4824        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4825            .ok()
4826            .and_then(|val| val.parse().ok())
4827            .unwrap_or(100);
4828        if data_block.num_values() < too_small {
4829            return None;
4830        }
4831
4832        let num_values = data_block.num_values();
4833
4834        // Apply divisor threshold and cap. This is intentionally conservative: the goal is to
4835        // avoid spending too much CPU trying to estimate very high cardinalities.
4836        let divisor: u64 = field
4837            .metadata
4838            .get(DICT_DIVISOR_META_KEY)
4839            .and_then(|val| val.parse().ok())
4840            .or_else(|| {
4841                env::var("LANCE_ENCODING_DICT_DIVISOR")
4842                    .ok()
4843                    .and_then(|val| val.parse().ok())
4844            })
4845            .unwrap_or(DEFAULT_DICT_DIVISOR);
4846
4847        let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
4848            .ok()
4849            .and_then(|val| val.parse().ok())
4850            .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
4851
4852        let threshold_cardinality = num_values
4853            .checked_div(divisor.max(1))
4854            .unwrap_or(0)
4855            .min(max_cardinality);
4856        if threshold_cardinality == 0 {
4857            return None;
4858        }
4859
4860        // Get size ratio from metadata or env var.
4861        let threshold_ratio = field
4862            .metadata
4863            .get(DICT_SIZE_RATIO_META_KEY)
4864            .and_then(|val| val.parse::<f64>().ok())
4865            .or_else(|| {
4866                env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4867                    .ok()
4868                    .and_then(|val| val.parse().ok())
4869            })
4870            .unwrap_or(DEFAULT_DICT_SIZE_RATIO);
4871
4872        if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4873            panic!(
4874                "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4875                threshold_ratio
4876            );
4877        }
4878
4879        let data_size = data_block.data_size();
4880        if data_size == 0 {
4881            return None;
4882        }
4883
4884        let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
4885        let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
4886
4887        // Avoid probing dictionary encoding on data that appears to be near-unique.
4888        if Self::sample_is_near_unique(
4889            data_block,
4890            DEFAULT_SAMPLE_SIZE,
4891            DEFAULT_SAMPLE_UNIQUE_RATIO,
4892        )? {
4893            return None;
4894        }
4895
4896        let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
4897        Some(DictEncodingBudget {
4898            max_dict_entries,
4899            max_encoded_size,
4900        })
4901    }
4902
4903    /// Probe whether a page looks near-unique before attempting dictionary encoding.
4904    ///
4905    /// The probe uses deterministic stride sampling (not RNG sampling), which keeps
4906    /// the check cheap and reproducible across runs. The result is only a gate for
4907    /// whether we try dictionary encoding, not a cardinality statistic.
4908    fn sample_is_near_unique(
4909        data_block: &DataBlock,
4910        max_samples: usize,
4911        unique_ratio_threshold: f64,
4912    ) -> Option<bool> {
4913        use std::collections::HashSet;
4914
4915        if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 {
4916            return None;
4917        }
4918
4919        let num_values = usize::try_from(data_block.num_values()).ok()?;
4920        if num_values == 0 {
4921            return Some(false);
4922        }
4923
4924        let sample_count = num_values.min(max_samples).max(1);
4925        // Uniform stride sampling across the page.
4926        let step = (num_values / sample_count).max(1);
4927
4928        match data_block {
4929            DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
4930                64 => {
4931                    let values = fixed.data.borrow_to_typed_slice::<u64>();
4932                    let values = values.as_ref();
4933                    let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
4934                    for idx in (0..num_values).step_by(step).take(sample_count) {
4935                        unique.insert(values.get(idx).copied()?);
4936                    }
4937                    let ratio = unique.len() as f64 / sample_count as f64;
4938                    // Avoid overreacting to tiny pages with too few samples.
4939                    Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
4940                }
4941                128 => {
4942                    let values = fixed.data.borrow_to_typed_slice::<u128>();
4943                    let values = values.as_ref();
4944                    let mut unique: HashSet<u128> = HashSet::with_capacity(sample_count.min(1024));
4945                    for idx in (0..num_values).step_by(step).take(sample_count) {
4946                        unique.insert(values.get(idx).copied()?);
4947                    }
4948                    let ratio = unique.len() as f64 / sample_count as f64;
4949                    Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
4950                }
4951                _ => Some(false),
4952            },
4953            DataBlock::VariableWidth(var) => {
4954                use xxhash_rust::xxh3::xxh3_64;
4955
4956                // Hash variable-width slices instead of storing borrowed slice keys.
4957                let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
4958                match var.bits_per_offset {
4959                    32 => {
4960                        let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
4961                        let offsets: &[u32] = offsets_ref.as_ref();
4962                        for i in (0..num_values).step_by(step).take(sample_count) {
4963                            let start = usize::try_from(*offsets.get(i)?).ok()?;
4964                            let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
4965                            if start > end || end > var.data.len() {
4966                                return None;
4967                            }
4968                            unique.insert(xxh3_64(&var.data[start..end]));
4969                        }
4970                    }
4971                    64 => {
4972                        let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
4973                        let offsets: &[u64] = offsets_ref.as_ref();
4974                        for i in (0..num_values).step_by(step).take(sample_count) {
4975                            let start = usize::try_from(*offsets.get(i)?).ok()?;
4976                            let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
4977                            if start > end || end > var.data.len() {
4978                                return None;
4979                            }
4980                            unique.insert(xxh3_64(&var.data[start..end]));
4981                        }
4982                    }
4983                    _ => return Some(false),
4984                }
4985                let ratio = unique.len() as f64 / sample_count as f64;
4986                Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
4987            }
4988            _ => Some(false),
4989        }
4990    }
4991
4992    // Creates an encode task, consuming all buffered data
4993    fn do_flush(
4994        &mut self,
4995        arrays: Vec<ArrayRef>,
4996        repdefs: Vec<RepDefBuilder>,
4997        row_number: u64,
4998        num_rows: u64,
4999    ) -> Result<Vec<EncodeTask>> {
5000        let column_idx = self.column_index;
5001        let compression_strategy = self.compression_strategy.clone();
5002        let field = self.field.clone();
5003        let encoding_metadata = self.encoding_metadata.clone();
5004        let support_large_chunk = self.support_large_chunk;
5005        let version = self.version;
5006        let task = spawn_cpu(move || {
5007            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5008            let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
5009            let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
5010            let repdef = RepDefBuilder::serialize(repdefs);
5011
5012            if num_values == 0 {
5013                // We should not encode empty arrays.  So if we get here that should mean that we
5014                // either have all empty lists or all null lists (or a mix).  We still need to encode
5015                // the rep/def information but we can skip the data encoding.
5016                log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
5017                return Self::encode_complex_all_null(
5018                    column_idx,
5019                    repdef,
5020                    row_number,
5021                    num_rows,
5022                    version,
5023                    compression_strategy.as_ref(),
5024                );
5025            }
5026
5027            let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
5028            let all_null = leaf_validity
5029                .as_ref()
5030                .map(|validity| validity.count_set_bits() == 0)
5031                .unwrap_or(false);
5032
5033            if all_null {
5034                return if is_simple_validity {
5035                    log::debug!(
5036                        "Encoding column {} with {} items ({} rows) using simple-null layout",
5037                        column_idx,
5038                        num_values,
5039                        num_rows
5040                    );
5041                    Self::encode_simple_all_null(column_idx, num_values, row_number)
5042                } else {
5043                    log::debug!(
5044                        "Encoding column {} with {} items ({} rows) using complex-null layout",
5045                        column_idx,
5046                        num_values,
5047                        num_rows
5048                    );
5049                    Self::encode_complex_all_null(
5050                        column_idx,
5051                        repdef,
5052                        row_number,
5053                        num_rows,
5054                        version,
5055                        compression_strategy.as_ref(),
5056                    )
5057                };
5058            }
5059
5060            if let DataType::Struct(fields) = &field.data_type()
5061                && fields.is_empty()
5062            {
5063                if has_repdef_info {
5064                    return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported.  The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
5065                }
5066                // This is maybe a little confusing but the reader should never look at this anyways and it
5067                // seems like overkill to invent a new layout just for "empty structs".
5068                return Self::encode_simple_all_null(column_idx, num_values, row_number);
5069            }
5070
5071            let data_block = DataBlock::from_arrays(&arrays, num_values);
5072
5073            if version.resolve() >= LanceFileVersion::V2_2
5074                && let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
5075            {
5076                log::debug!(
5077                    "Encoding column {} with {} items ({} rows) using constant layout",
5078                    column_idx,
5079                    num_values,
5080                    num_rows
5081                );
5082                return constant::encode_constant_page(
5083                    column_idx,
5084                    scalar,
5085                    repdef,
5086                    row_number,
5087                    num_rows,
5088                );
5089            }
5090
5091            let requires_full_zip_packed_struct =
5092                if let DataBlock::Struct(ref struct_data_block) = data_block {
5093                    struct_data_block.has_variable_width_child()
5094                } else {
5095                    false
5096                };
5097
5098            if requires_full_zip_packed_struct {
5099                log::debug!(
5100                    "Encoding column {} with {} items using full-zip packed struct layout",
5101                    column_idx,
5102                    num_values
5103                );
5104                return Self::encode_full_zip(
5105                    column_idx,
5106                    &field,
5107                    compression_strategy.as_ref(),
5108                    data_block,
5109                    repdef,
5110                    row_number,
5111                    num_rows,
5112                );
5113            }
5114
5115            if let DataBlock::Dictionary(dict) = data_block {
5116                log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
5117                let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
5118                // TODO: https://github.com/lancedb/lance/issues/4809
5119                // If we compute stats on dictionary_data_block => panic.
5120                // If we don't compute stats on indices_data_block => panic.
5121                // This is messy.  Don't make me call compute_stat ever.
5122                indices_data_block.compute_stat();
5123                Self::encode_miniblock(
5124                    column_idx,
5125                    &field,
5126                    compression_strategy.as_ref(),
5127                    indices_data_block,
5128                    repdef,
5129                    row_number,
5130                    Some(dictionary_data_block),
5131                    num_rows,
5132                    support_large_chunk,
5133                )
5134            } else {
5135                // Try dictionary encoding first if applicable. If encoding aborts, fall back to the
5136                // preferred structural encoding.
5137                let dict_result = Self::should_dictionary_encode(&data_block, &field, version)
5138                    .and_then(|budget| {
5139                        log::debug!(
5140                            "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
5141                            column_idx,
5142                            num_values
5143                        );
5144                        dict::dictionary_encode(
5145                            &data_block,
5146                            budget.max_dict_entries,
5147                            budget.max_encoded_size,
5148                        )
5149                    });
5150
5151                if let Some((indices_data_block, dictionary_data_block)) = dict_result {
5152                    Self::encode_miniblock(
5153                        column_idx,
5154                        &field,
5155                        compression_strategy.as_ref(),
5156                        indices_data_block,
5157                        repdef,
5158                        row_number,
5159                        Some(dictionary_data_block),
5160                        num_rows,
5161                        support_large_chunk,
5162                    )
5163                } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
5164                    log::debug!(
5165                        "Encoding column {} with {} items using mini-block layout",
5166                        column_idx,
5167                        num_values
5168                    );
5169                    Self::encode_miniblock(
5170                        column_idx,
5171                        &field,
5172                        compression_strategy.as_ref(),
5173                        data_block,
5174                        repdef,
5175                        row_number,
5176                        None,
5177                        num_rows,
5178                        support_large_chunk,
5179                    )
5180                } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
5181                    log::debug!(
5182                        "Encoding column {} with {} items using full-zip layout",
5183                        column_idx,
5184                        num_values
5185                    );
5186                    Self::encode_full_zip(
5187                        column_idx,
5188                        &field,
5189                        compression_strategy.as_ref(),
5190                        data_block,
5191                        repdef,
5192                        row_number,
5193                        num_rows,
5194                    )
5195                } else {
5196                    Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
5197                }
5198            }
5199        })
5200        .boxed();
5201        Ok(vec![task])
5202    }
5203
5204    fn extract_validity_buf(
5205        array: Arc<dyn Array>,
5206        repdef: &mut RepDefBuilder,
5207        keep_original_array: bool,
5208    ) -> Result<Arc<dyn Array>> {
5209        if let Some(validity) = array.nulls() {
5210            if keep_original_array {
5211                repdef.add_validity_bitmap(validity.clone());
5212            } else {
5213                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
5214            }
5215            let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
5216            Ok(make_array(data_no_nulls))
5217        } else {
5218            repdef.add_no_null(array.len());
5219            Ok(array)
5220        }
5221    }
5222
5223    fn extract_validity(
5224        mut array: Arc<dyn Array>,
5225        repdef: &mut RepDefBuilder,
5226        keep_original_array: bool,
5227    ) -> Result<Arc<dyn Array>> {
5228        match array.data_type() {
5229            DataType::Null => {
5230                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
5231                Ok(array)
5232            }
5233            DataType::Dictionary(_, _) => {
5234                array = dict::normalize_dict_nulls(array)?;
5235                Self::extract_validity_buf(array, repdef, keep_original_array)
5236            }
5237            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
5238            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
5239            // NOT want to increase the repdef depth.
5240            //
5241            // This would be quite catasrophic for something like vector embeddings.  Imagine we
5242            // had thousands of vectors and some were null but no vector contained null items.  If
5243            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
5244            // with a rep/def value for every single item in the vector.
5245            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
5246        }
5247    }
5248}
5249
5250impl FieldEncoder for PrimitiveStructuralEncoder {
5251    // Buffers data, if there is enough to write a page then we create an encode task
5252    fn maybe_encode(
5253        &mut self,
5254        array: ArrayRef,
5255        _external_buffers: &mut OutOfLineBuffers,
5256        mut repdef: RepDefBuilder,
5257        row_number: u64,
5258        num_rows: u64,
5259    ) -> Result<Vec<EncodeTask>> {
5260        let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
5261        self.accumulated_repdefs.push(repdef);
5262
5263        if let Some((arrays, row_number, num_rows)) =
5264            self.accumulation_queue.insert(array, row_number, num_rows)
5265        {
5266            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5267            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5268        } else {
5269            Ok(vec![])
5270        }
5271    }
5272
5273    // If there is any data left in the buffer then create an encode task from it
5274    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
5275        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
5276            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5277            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5278        } else {
5279            Ok(vec![])
5280        }
5281    }
5282
5283    fn num_columns(&self) -> u32 {
5284        1
5285    }
5286
5287    fn finish(
5288        &mut self,
5289        _external_buffers: &mut OutOfLineBuffers,
5290    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
5291        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
5292    }
5293}
5294
5295#[cfg(test)]
5296#[allow(clippy::single_range_in_vec_init)]
5297mod tests {
5298    use super::{
5299        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
5300        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
5301        FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
5302        PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
5303    };
5304    use crate::buffer::LanceBuffer;
5305    use crate::compression::DefaultDecompressionStrategy;
5306    use crate::constants::{
5307        COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
5308        DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
5309        STRUCTURAL_ENCODING_MINIBLOCK,
5310    };
5311    use crate::data::BlockInfo;
5312    use crate::decoder::PageEncoding;
5313    use crate::encodings::logical::primitive::{
5314        ChunkDrainInstructions, PrimitiveStructuralEncoder,
5315    };
5316    use crate::format::ProtobufUtils21;
5317    use crate::format::pb21;
5318    use crate::format::pb21::compressive_encoding::Compression;
5319    use crate::testing::{TestCases, check_round_trip_encoding_of_data};
5320    use crate::version::LanceFileVersion;
5321    use arrow_array::{ArrayRef, Int8Array, StringArray};
5322    use arrow_schema::DataType;
5323    use std::collections::HashMap;
5324    use std::{collections::VecDeque, sync::Arc};
5325
5326    #[test]
5327    fn test_is_narrow() {
5328        let int8_array = Int8Array::from(vec![1, 2, 3]);
5329        let array_ref: ArrayRef = Arc::new(int8_array);
5330        let block = DataBlock::from_array(array_ref);
5331
5332        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5333
5334        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
5335        let block = DataBlock::from_array(string_array);
5336        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5337
5338        let string_array = StringArray::from(vec![
5339            Some("hello world".repeat(100)),
5340            Some("world".to_string()),
5341        ]);
5342        let block = DataBlock::from_array(string_array);
5343        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
5344    }
5345
5346    #[test]
5347    fn test_map_range() {
5348        // Null in the middle
5349        // [[A, B, C], [D, E], NULL, [F, G, H]]
5350        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
5351        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
5352        let max_visible_def = 0;
5353        let total_items = 8;
5354        let max_rep = 1;
5355
5356        let check = |range, expected_item_range, expected_level_range| {
5357            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5358                range,
5359                rep.as_ref(),
5360                def.as_ref(),
5361                max_rep,
5362                max_visible_def,
5363                total_items,
5364                PreambleAction::Absent,
5365            );
5366            assert_eq!(item_range, expected_item_range);
5367            assert_eq!(level_range, expected_level_range);
5368        };
5369
5370        check(0..1, 0..3, 0..3);
5371        check(1..2, 3..5, 3..5);
5372        check(2..3, 5..5, 5..6);
5373        check(3..4, 5..8, 6..9);
5374        check(0..2, 0..5, 0..5);
5375        check(1..3, 3..5, 3..6);
5376        check(2..4, 5..8, 5..9);
5377        check(0..3, 0..5, 0..6);
5378        check(1..4, 3..8, 3..9);
5379        check(0..4, 0..8, 0..9);
5380
5381        // Null at start
5382        // [NULL, [A, B], [C]]
5383        let rep = Some(vec![1, 1, 0, 1]);
5384        let def = Some(vec![1, 0, 0, 0]);
5385        let max_visible_def = 0;
5386        let total_items = 3;
5387
5388        let check = |range, expected_item_range, expected_level_range| {
5389            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5390                range,
5391                rep.as_ref(),
5392                def.as_ref(),
5393                max_rep,
5394                max_visible_def,
5395                total_items,
5396                PreambleAction::Absent,
5397            );
5398            assert_eq!(item_range, expected_item_range);
5399            assert_eq!(level_range, expected_level_range);
5400        };
5401
5402        check(0..1, 0..0, 0..1);
5403        check(1..2, 0..2, 1..3);
5404        check(2..3, 2..3, 3..4);
5405        check(0..2, 0..2, 0..3);
5406        check(1..3, 0..3, 1..4);
5407        check(0..3, 0..3, 0..4);
5408
5409        // Null at end
5410        // [[A], [B, C], NULL]
5411        let rep = Some(vec![1, 1, 0, 1]);
5412        let def = Some(vec![0, 0, 0, 1]);
5413        let max_visible_def = 0;
5414        let total_items = 3;
5415
5416        let check = |range, expected_item_range, expected_level_range| {
5417            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5418                range,
5419                rep.as_ref(),
5420                def.as_ref(),
5421                max_rep,
5422                max_visible_def,
5423                total_items,
5424                PreambleAction::Absent,
5425            );
5426            assert_eq!(item_range, expected_item_range);
5427            assert_eq!(level_range, expected_level_range);
5428        };
5429
5430        check(0..1, 0..1, 0..1);
5431        check(1..2, 1..3, 1..3);
5432        check(2..3, 3..3, 3..4);
5433        check(0..2, 0..3, 0..3);
5434        check(1..3, 1..3, 1..4);
5435        check(0..3, 0..3, 0..4);
5436
5437        // No nulls, with repetition
5438        // [[A, B], [C, D], [E, F]]
5439        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
5440        let def: Option<&[u16]> = None;
5441        let max_visible_def = 0;
5442        let total_items = 6;
5443
5444        let check = |range, expected_item_range, expected_level_range| {
5445            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5446                range,
5447                rep.as_ref(),
5448                def.as_ref(),
5449                max_rep,
5450                max_visible_def,
5451                total_items,
5452                PreambleAction::Absent,
5453            );
5454            assert_eq!(item_range, expected_item_range);
5455            assert_eq!(level_range, expected_level_range);
5456        };
5457
5458        check(0..1, 0..2, 0..2);
5459        check(1..2, 2..4, 2..4);
5460        check(2..3, 4..6, 4..6);
5461        check(0..2, 0..4, 0..4);
5462        check(1..3, 2..6, 2..6);
5463        check(0..3, 0..6, 0..6);
5464
5465        // No repetition, with nulls (this case is trivial)
5466        // [A, B, NULL, C]
5467        let rep: Option<&[u16]> = None;
5468        let def = Some(vec![0, 0, 1, 0]);
5469        let max_visible_def = 1;
5470        let total_items = 4;
5471
5472        let check = |range, expected_item_range, expected_level_range| {
5473            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5474                range,
5475                rep.as_ref(),
5476                def.as_ref(),
5477                max_rep,
5478                max_visible_def,
5479                total_items,
5480                PreambleAction::Absent,
5481            );
5482            assert_eq!(item_range, expected_item_range);
5483            assert_eq!(level_range, expected_level_range);
5484        };
5485
5486        check(0..1, 0..1, 0..1);
5487        check(1..2, 1..2, 1..2);
5488        check(2..3, 2..3, 2..3);
5489        check(0..2, 0..2, 0..2);
5490        check(1..3, 1..3, 1..3);
5491        check(0..3, 0..3, 0..3);
5492
5493        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
5494        // [[..., A] [B, C], NULL]
5495        //
5496        // What we do will depend on the preamble action
5497        let rep = Some(vec![0, 1, 0, 1]);
5498        let def = Some(vec![0, 0, 0, 1]);
5499        let max_visible_def = 0;
5500        let total_items = 3;
5501
5502        let check = |range, expected_item_range, expected_level_range| {
5503            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5504                range,
5505                rep.as_ref(),
5506                def.as_ref(),
5507                max_rep,
5508                max_visible_def,
5509                total_items,
5510                PreambleAction::Take,
5511            );
5512            assert_eq!(item_range, expected_item_range);
5513            assert_eq!(level_range, expected_level_range);
5514        };
5515
5516        // If we are taking the preamble then the range must start at 0
5517        check(0..1, 0..3, 0..3);
5518        check(0..2, 0..3, 0..4);
5519
5520        let check = |range, expected_item_range, expected_level_range| {
5521            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5522                range,
5523                rep.as_ref(),
5524                def.as_ref(),
5525                max_rep,
5526                max_visible_def,
5527                total_items,
5528                PreambleAction::Skip,
5529            );
5530            assert_eq!(item_range, expected_item_range);
5531            assert_eq!(level_range, expected_level_range);
5532        };
5533
5534        check(0..1, 1..3, 1..3);
5535        check(1..2, 3..3, 3..4);
5536        check(0..2, 1..3, 1..4);
5537
5538        // Another preamble case but now it doesn't end with a new list
5539        // [[..., A], NULL, [D, E]]
5540        //
5541        // What we do will depend on the preamble action
5542        let rep = Some(vec![0, 1, 1, 0]);
5543        let def = Some(vec![0, 1, 0, 0]);
5544        let max_visible_def = 0;
5545        let total_items = 4;
5546
5547        let check = |range, expected_item_range, expected_level_range| {
5548            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5549                range,
5550                rep.as_ref(),
5551                def.as_ref(),
5552                max_rep,
5553                max_visible_def,
5554                total_items,
5555                PreambleAction::Take,
5556            );
5557            assert_eq!(item_range, expected_item_range);
5558            assert_eq!(level_range, expected_level_range);
5559        };
5560
5561        // If we are taking the preamble then the range must start at 0
5562        check(0..1, 0..1, 0..2);
5563        check(0..2, 0..3, 0..4);
5564
5565        let check = |range, expected_item_range, expected_level_range| {
5566            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5567                range,
5568                rep.as_ref(),
5569                def.as_ref(),
5570                max_rep,
5571                max_visible_def,
5572                total_items,
5573                PreambleAction::Skip,
5574            );
5575            assert_eq!(item_range, expected_item_range);
5576            assert_eq!(level_range, expected_level_range);
5577        };
5578
5579        // If we are taking the preamble then the range must start at 0
5580        check(0..1, 1..1, 1..2);
5581        check(1..2, 1..3, 2..4);
5582        check(0..2, 1..3, 1..4);
5583
5584        // Now a preamble case without any definition levels
5585        // [[..., A] [B, C], [D]]
5586        let rep = Some(vec![0, 1, 0, 1]);
5587        let def: Option<Vec<u16>> = None;
5588        let max_visible_def = 0;
5589        let total_items = 4;
5590
5591        let check = |range, expected_item_range, expected_level_range| {
5592            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5593                range,
5594                rep.as_ref(),
5595                def.as_ref(),
5596                max_rep,
5597                max_visible_def,
5598                total_items,
5599                PreambleAction::Take,
5600            );
5601            assert_eq!(item_range, expected_item_range);
5602            assert_eq!(level_range, expected_level_range);
5603        };
5604
5605        // If we are taking the preamble then the range must start at 0
5606        check(0..1, 0..3, 0..3);
5607        check(0..2, 0..4, 0..4);
5608
5609        let check = |range, expected_item_range, expected_level_range| {
5610            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5611                range,
5612                rep.as_ref(),
5613                def.as_ref(),
5614                max_rep,
5615                max_visible_def,
5616                total_items,
5617                PreambleAction::Skip,
5618            );
5619            assert_eq!(item_range, expected_item_range);
5620            assert_eq!(level_range, expected_level_range);
5621        };
5622
5623        check(0..1, 1..3, 1..3);
5624        check(1..2, 3..4, 3..4);
5625        check(0..2, 1..4, 1..4);
5626
5627        // If we have nested lists then non-top level lists may be empty/null
5628        // and we need to make sure we still handle them as invisible items (we
5629        // failed to do this previously)
5630        let rep = Some(vec![2, 1, 2, 0, 1, 2]);
5631        let def = Some(vec![0, 1, 2, 0, 0, 0]);
5632        let max_rep = 2;
5633        let max_visible_def = 0;
5634        let total_items = 4;
5635
5636        let check = |range, expected_item_range, expected_level_range| {
5637            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5638                range,
5639                rep.as_ref(),
5640                def.as_ref(),
5641                max_rep,
5642                max_visible_def,
5643                total_items,
5644                PreambleAction::Absent,
5645            );
5646            assert_eq!(item_range, expected_item_range);
5647            assert_eq!(level_range, expected_level_range);
5648        };
5649
5650        check(0..3, 0..4, 0..6);
5651        check(0..1, 0..1, 0..2);
5652        check(1..2, 1..3, 2..5);
5653        check(2..3, 3..4, 5..6);
5654
5655        // Invisible items in a preamble that we are taking (regressing a previous failure)
5656        let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5657        let def = Some(vec![0, 1, 0, 0, 0, 0]);
5658        let max_rep = 1;
5659        let max_visible_def = 0;
5660        let total_items = 5;
5661
5662        let check = |range, expected_item_range, expected_level_range| {
5663            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5664                range,
5665                rep.as_ref(),
5666                def.as_ref(),
5667                max_rep,
5668                max_visible_def,
5669                total_items,
5670                PreambleAction::Take,
5671            );
5672            assert_eq!(item_range, expected_item_range);
5673            assert_eq!(level_range, expected_level_range);
5674        };
5675
5676        check(0..0, 0..1, 0..2);
5677        check(0..1, 0..3, 0..4);
5678        check(0..2, 0..4, 0..5);
5679
5680        // Skip preamble (with invis items) and skip a few rows (with invis items)
5681        // and then take a few rows but not all the rows
5682        let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5683        let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5684        let max_rep = 1;
5685        let max_visible_def = 0;
5686        let total_items = 5;
5687
5688        let check = |range, expected_item_range, expected_level_range| {
5689            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5690                range,
5691                rep.as_ref(),
5692                def.as_ref(),
5693                max_rep,
5694                max_visible_def,
5695                total_items,
5696                PreambleAction::Skip,
5697            );
5698            assert_eq!(item_range, expected_item_range);
5699            assert_eq!(level_range, expected_level_range);
5700        };
5701
5702        check(2..3, 2..4, 5..7);
5703    }
5704
5705    #[test]
5706    fn test_slice_batch_data_and_rebase_offsets_u32() {
5707        let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
5708        let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
5709
5710        let (sliced_data, normalized_offsets) =
5711            VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5712                .unwrap();
5713
5714        assert_eq!(sliced_data.as_ref(), b"6789ab");
5715        let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
5716        assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
5717    }
5718
5719    #[test]
5720    fn test_slice_batch_data_and_rebase_offsets_u64() {
5721        let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
5722        let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
5723
5724        let (sliced_data, normalized_offsets) =
5725            VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
5726                .unwrap();
5727
5728        assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
5729        let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
5730        assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
5731    }
5732
5733    #[test]
5734    fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
5735        let data = LanceBuffer::copy_slice(b"abcd");
5736        let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
5737
5738        let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
5739            .expect_err("offset end before start should error");
5740        assert!(err.to_string().contains("less than base"));
5741    }
5742
5743    #[test]
5744    fn test_schedule_instructions() {
5745        // Convert repetition index to bytes for testing
5746        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5747        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5748        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5749
5750        let check = |user_ranges, expected_instructions| {
5751            let instructions =
5752                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5753            assert_eq!(instructions, expected_instructions);
5754        };
5755
5756        // The instructions we expect if we're grabbing the whole range
5757        let expected_take_all = vec![
5758            ChunkInstructions {
5759                chunk_idx: 0,
5760                preamble: PreambleAction::Absent,
5761                rows_to_skip: 0,
5762                rows_to_take: 6,
5763                take_trailer: true,
5764            },
5765            ChunkInstructions {
5766                chunk_idx: 1,
5767                preamble: PreambleAction::Take,
5768                rows_to_skip: 0,
5769                rows_to_take: 2,
5770                take_trailer: false,
5771            },
5772            ChunkInstructions {
5773                chunk_idx: 2,
5774                preamble: PreambleAction::Absent,
5775                rows_to_skip: 0,
5776                rows_to_take: 5,
5777                take_trailer: true,
5778            },
5779            ChunkInstructions {
5780                chunk_idx: 3,
5781                preamble: PreambleAction::Take,
5782                rows_to_skip: 0,
5783                rows_to_take: 1,
5784                take_trailer: false,
5785            },
5786        ];
5787
5788        // Take all as 1 range
5789        check(&[0..14], expected_take_all.clone());
5790
5791        // Take all a individual rows
5792        check(
5793            &[
5794                0..1,
5795                1..2,
5796                2..3,
5797                3..4,
5798                4..5,
5799                5..6,
5800                6..7,
5801                7..8,
5802                8..9,
5803                9..10,
5804                10..11,
5805                11..12,
5806                12..13,
5807                13..14,
5808            ],
5809            expected_take_all,
5810        );
5811
5812        // Test some partial takes
5813
5814        // 2 rows in the same chunk but not contiguous
5815        check(
5816            &[0..1, 3..4],
5817            vec![
5818                ChunkInstructions {
5819                    chunk_idx: 0,
5820                    preamble: PreambleAction::Absent,
5821                    rows_to_skip: 0,
5822                    rows_to_take: 1,
5823                    take_trailer: false,
5824                },
5825                ChunkInstructions {
5826                    chunk_idx: 0,
5827                    preamble: PreambleAction::Absent,
5828                    rows_to_skip: 3,
5829                    rows_to_take: 1,
5830                    take_trailer: false,
5831                },
5832            ],
5833        );
5834
5835        // Taking just a trailer/preamble
5836        check(
5837            &[5..6],
5838            vec![
5839                ChunkInstructions {
5840                    chunk_idx: 0,
5841                    preamble: PreambleAction::Absent,
5842                    rows_to_skip: 5,
5843                    rows_to_take: 1,
5844                    take_trailer: true,
5845                },
5846                ChunkInstructions {
5847                    chunk_idx: 1,
5848                    preamble: PreambleAction::Take,
5849                    rows_to_skip: 0,
5850                    rows_to_take: 0,
5851                    take_trailer: false,
5852                },
5853            ],
5854        );
5855
5856        // Skipping an entire chunk
5857        check(
5858            &[7..10],
5859            vec![
5860                ChunkInstructions {
5861                    chunk_idx: 1,
5862                    preamble: PreambleAction::Skip,
5863                    rows_to_skip: 1,
5864                    rows_to_take: 1,
5865                    take_trailer: false,
5866                },
5867                ChunkInstructions {
5868                    chunk_idx: 2,
5869                    preamble: PreambleAction::Absent,
5870                    rows_to_skip: 0,
5871                    rows_to_take: 2,
5872                    take_trailer: false,
5873                },
5874            ],
5875        );
5876    }
5877
5878    #[test]
5879    fn test_drain_instructions() {
5880        fn drain_from_instructions(
5881            instructions: &mut VecDeque<ChunkInstructions>,
5882            mut rows_desired: u64,
5883            need_preamble: &mut bool,
5884            skip_in_chunk: &mut u64,
5885        ) -> Vec<ChunkDrainInstructions> {
5886            // Note: instructions.len() is an upper bound, we typically take much fewer
5887            let mut drain_instructions = Vec::with_capacity(instructions.len());
5888            while rows_desired > 0 || *need_preamble {
5889                let (next_instructions, consumed_chunk) = instructions
5890                    .front()
5891                    .unwrap()
5892                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5893                if consumed_chunk {
5894                    instructions.pop_front();
5895                }
5896                drain_instructions.push(next_instructions);
5897            }
5898            drain_instructions
5899        }
5900
5901        // Convert repetition index to bytes for testing
5902        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5903        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5904        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5905        let user_ranges = vec![1..7, 10..14];
5906
5907        // First, schedule the ranges
5908        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5909
5910        let mut to_drain = VecDeque::from(scheduled.clone());
5911
5912        // Now we drain in batches of 4
5913
5914        let mut need_preamble = false;
5915        let mut skip_in_chunk = 0;
5916
5917        let next_batch =
5918            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5919
5920        assert!(!need_preamble);
5921        assert_eq!(skip_in_chunk, 4);
5922        assert_eq!(
5923            next_batch,
5924            vec![ChunkDrainInstructions {
5925                chunk_instructions: scheduled[0].clone(),
5926                rows_to_take: 4,
5927                rows_to_skip: 0,
5928                preamble_action: PreambleAction::Absent,
5929            }]
5930        );
5931
5932        let next_batch =
5933            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5934
5935        assert!(!need_preamble);
5936        assert_eq!(skip_in_chunk, 2);
5937
5938        assert_eq!(
5939            next_batch,
5940            vec![
5941                ChunkDrainInstructions {
5942                    chunk_instructions: scheduled[0].clone(),
5943                    rows_to_take: 1,
5944                    rows_to_skip: 4,
5945                    preamble_action: PreambleAction::Absent,
5946                },
5947                ChunkDrainInstructions {
5948                    chunk_instructions: scheduled[1].clone(),
5949                    rows_to_take: 1,
5950                    rows_to_skip: 0,
5951                    preamble_action: PreambleAction::Take,
5952                },
5953                ChunkDrainInstructions {
5954                    chunk_instructions: scheduled[2].clone(),
5955                    rows_to_take: 2,
5956                    rows_to_skip: 0,
5957                    preamble_action: PreambleAction::Absent,
5958                }
5959            ]
5960        );
5961
5962        let next_batch =
5963            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5964
5965        assert!(!need_preamble);
5966        assert_eq!(skip_in_chunk, 0);
5967
5968        assert_eq!(
5969            next_batch,
5970            vec![
5971                ChunkDrainInstructions {
5972                    chunk_instructions: scheduled[2].clone(),
5973                    rows_to_take: 1,
5974                    rows_to_skip: 2,
5975                    preamble_action: PreambleAction::Absent,
5976                },
5977                ChunkDrainInstructions {
5978                    chunk_instructions: scheduled[3].clone(),
5979                    rows_to_take: 1,
5980                    rows_to_skip: 0,
5981                    preamble_action: PreambleAction::Take,
5982                },
5983            ]
5984        );
5985
5986        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
5987        let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
5988        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5989        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5990        let user_ranges = vec![0..28];
5991
5992        // First, schedule the ranges
5993        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5994
5995        let mut to_drain = VecDeque::from(scheduled.clone());
5996
5997        // Drain first chunk and some of second chunk
5998
5999        let mut need_preamble = false;
6000        let mut skip_in_chunk = 0;
6001
6002        let next_batch =
6003            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
6004
6005        assert_eq!(
6006            next_batch,
6007            vec![
6008                ChunkDrainInstructions {
6009                    chunk_instructions: scheduled[0].clone(),
6010                    rows_to_take: 6,
6011                    rows_to_skip: 0,
6012                    preamble_action: PreambleAction::Absent,
6013                },
6014                ChunkDrainInstructions {
6015                    chunk_instructions: scheduled[1].clone(),
6016                    rows_to_take: 1,
6017                    rows_to_skip: 0,
6018                    preamble_action: PreambleAction::Take,
6019                },
6020            ]
6021        );
6022
6023        assert!(!need_preamble);
6024        assert_eq!(skip_in_chunk, 1);
6025
6026        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
6027        // we get a drain task to take the preamble of the third chunk (and nothing else)
6028        let next_batch =
6029            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6030
6031        assert_eq!(
6032            next_batch,
6033            vec![
6034                ChunkDrainInstructions {
6035                    chunk_instructions: scheduled[1].clone(),
6036                    rows_to_take: 2,
6037                    rows_to_skip: 1,
6038                    preamble_action: PreambleAction::Skip,
6039                },
6040                ChunkDrainInstructions {
6041                    chunk_instructions: scheduled[2].clone(),
6042                    rows_to_take: 0,
6043                    rows_to_skip: 0,
6044                    preamble_action: PreambleAction::Take,
6045                },
6046            ]
6047        );
6048
6049        assert!(!need_preamble);
6050        assert_eq!(skip_in_chunk, 0);
6051    }
6052
6053    #[tokio::test]
6054    async fn test_fullzip_initialize_is_lazy() {
6055        use futures::{FutureExt, future::BoxFuture};
6056        use std::ops::Range;
6057        use std::sync::Mutex;
6058
6059        #[derive(Debug, Clone)]
6060        struct RecordingScheduler {
6061            data: bytes::Bytes,
6062            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6063        }
6064
6065        impl RecordingScheduler {
6066            fn new(data: bytes::Bytes) -> Self {
6067                Self {
6068                    data,
6069                    requests: Arc::new(Mutex::new(Vec::new())),
6070                }
6071            }
6072
6073            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6074                self.requests.lock().unwrap().clone()
6075            }
6076        }
6077
6078        impl crate::EncodingsIo for RecordingScheduler {
6079            fn submit_request(
6080                &self,
6081                ranges: Vec<Range<u64>>,
6082                _priority: u64,
6083            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6084                self.requests.lock().unwrap().push(ranges.clone());
6085                let data = ranges
6086                    .into_iter()
6087                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6088                    .collect::<Vec<_>>();
6089                std::future::ready(Ok(data)).boxed()
6090            }
6091        }
6092
6093        #[derive(Debug)]
6094        struct TestFixedDecompressor;
6095
6096        impl FixedPerValueDecompressor for TestFixedDecompressor {
6097            fn decompress(
6098                &self,
6099                _data: FixedWidthDataBlock,
6100                _num_rows: u64,
6101            ) -> crate::Result<DataBlock> {
6102                unimplemented!("Test decompressor")
6103            }
6104
6105            fn bits_per_value(&self) -> u64 {
6106                32
6107            }
6108        }
6109
6110        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
6111            0;
6112            16 * 1024
6113        ])));
6114        let mut scheduler = FullZipScheduler {
6115            data_buf_position: 0,
6116            data_buf_size: 4096,
6117            rep_index: Some(FullZipRepIndexDetails {
6118                buf_position: 1000,
6119                bytes_per_value: 4,
6120            }),
6121            priority: 0,
6122            rows_in_page: 100,
6123            bits_per_offset: 32,
6124            details: Arc::new(FullZipDecodeDetails {
6125                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6126                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6127                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6128                max_rep: 0,
6129                max_visible_def: 0,
6130            }),
6131            cached_state: None,
6132            enable_cache: false,
6133        };
6134
6135        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6136        let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6137
6138        assert!(
6139            cached_data
6140                .as_arc_any()
6141                .downcast_ref::<super::NoCachedPageData>()
6142                .is_some(),
6143            "FullZip initialize should not eagerly load repetition index data"
6144        );
6145        assert!(scheduler.cached_state.is_none());
6146        assert!(
6147            io.requests().is_empty(),
6148            "FullZip initialize should not issue any I/O"
6149        );
6150    }
6151
6152    #[tokio::test]
6153    async fn test_fullzip_read_source_slices_prefetched_page() {
6154        let page_start = 200_u64;
6155        let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
6156        let source = FullZipReadSource::PrefetchedPage {
6157            base_offset: page_start,
6158            data: page_data,
6159        };
6160        let ranges = vec![
6161            page_start..(page_start + 3),
6162            (page_start + 4)..(page_start + 8),
6163        ];
6164        let mut data = source.fetch(&ranges, 0).await.unwrap();
6165        assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
6166        assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
6167    }
6168
6169    #[tokio::test]
6170    async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
6171        use futures::{FutureExt, future::BoxFuture};
6172        use std::ops::Range;
6173        use std::sync::Mutex;
6174
6175        #[derive(Debug, Clone)]
6176        struct RecordingScheduler {
6177            data: bytes::Bytes,
6178            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6179        }
6180
6181        impl RecordingScheduler {
6182            fn new(data: bytes::Bytes) -> Self {
6183                Self {
6184                    data,
6185                    requests: Arc::new(Mutex::new(Vec::new())),
6186                }
6187            }
6188
6189            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6190                self.requests.lock().unwrap().clone()
6191            }
6192        }
6193
6194        impl crate::EncodingsIo for RecordingScheduler {
6195            fn submit_request(
6196                &self,
6197                ranges: Vec<Range<u64>>,
6198                _priority: u64,
6199            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6200                self.requests.lock().unwrap().push(ranges.clone());
6201                let data = ranges
6202                    .into_iter()
6203                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6204                    .collect::<Vec<_>>();
6205                std::future::ready(Ok(data)).boxed()
6206            }
6207        }
6208
6209        #[derive(Debug)]
6210        struct TestFixedDecompressor;
6211
6212        impl FixedPerValueDecompressor for TestFixedDecompressor {
6213            fn decompress(
6214                &self,
6215                _data: FixedWidthDataBlock,
6216                _num_rows: u64,
6217            ) -> crate::Result<DataBlock> {
6218                unimplemented!("Test decompressor")
6219            }
6220
6221            fn bits_per_value(&self) -> u64 {
6222                32
6223            }
6224        }
6225
6226        let rows_in_page = 100_u64;
6227        let bytes_per_value = 4_u64;
6228        let rep_start = 1000_u64;
6229        let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
6230        let mut data = vec![0_u8; 16 * 1024];
6231        data[rep_start as usize..rep_start as usize + rep_size].fill(7);
6232        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
6233
6234        let mut scheduler = FullZipScheduler {
6235            data_buf_position: 0,
6236            data_buf_size: 4096,
6237            rep_index: Some(FullZipRepIndexDetails {
6238                buf_position: rep_start,
6239                bytes_per_value,
6240            }),
6241            priority: 0,
6242            rows_in_page,
6243            bits_per_offset: 32,
6244            details: Arc::new(FullZipDecodeDetails {
6245                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6246                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6247                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6248                max_rep: 0,
6249                max_visible_def: 0,
6250            }),
6251            cached_state: None,
6252            enable_cache: true,
6253        };
6254
6255        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6256        let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6257        assert!(
6258            cached_data
6259                .as_arc_any()
6260                .downcast_ref::<FullZipCacheableState>()
6261                .is_some()
6262        );
6263        assert!(scheduler.cached_state.is_some());
6264        assert_eq!(
6265            io.requests(),
6266            vec![vec![
6267                rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
6268            ]]
6269        );
6270    }
6271
6272    #[tokio::test]
6273    async fn test_fullzip_full_page_bypasses_rep_index_io() {
6274        use futures::{FutureExt, future::BoxFuture};
6275        use std::ops::Range;
6276        use std::sync::Mutex;
6277
6278        #[derive(Debug, Clone)]
6279        struct RecordingScheduler {
6280            data: bytes::Bytes,
6281            requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6282        }
6283
6284        impl RecordingScheduler {
6285            fn new(data: bytes::Bytes) -> Self {
6286                Self {
6287                    data,
6288                    requests: Arc::new(Mutex::new(Vec::new())),
6289                }
6290            }
6291
6292            fn requests(&self) -> Vec<Vec<Range<u64>>> {
6293                self.requests.lock().unwrap().clone()
6294            }
6295        }
6296
6297        impl crate::EncodingsIo for RecordingScheduler {
6298            fn submit_request(
6299                &self,
6300                ranges: Vec<Range<u64>>,
6301                _priority: u64,
6302            ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6303                self.requests.lock().unwrap().push(ranges.clone());
6304                let data = ranges
6305                    .into_iter()
6306                    .map(|range| self.data.slice(range.start as usize..range.end as usize))
6307                    .collect::<Vec<_>>();
6308                std::future::ready(Ok(data)).boxed()
6309            }
6310        }
6311
6312        #[derive(Debug)]
6313        struct TestFixedDecompressor;
6314
6315        impl FixedPerValueDecompressor for TestFixedDecompressor {
6316            fn decompress(
6317                &self,
6318                _data: FixedWidthDataBlock,
6319                _num_rows: u64,
6320            ) -> crate::Result<DataBlock> {
6321                unimplemented!("Test decompressor")
6322            }
6323
6324            fn bits_per_value(&self) -> u64 {
6325                32
6326            }
6327        }
6328
6329        let rows_in_page = 100_u64;
6330        let data_start = 256_u64;
6331        let data_size = 500_u64;
6332        let rep_start = 4096_u64;
6333        let bytes_per_value = 4_u64;
6334
6335        let mut bytes = vec![0_u8; 16 * 1024];
6336        for i in 0..=rows_in_page {
6337            let offset = (i * 5) as u32;
6338            let pos = rep_start as usize + (i * bytes_per_value) as usize;
6339            bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
6340        }
6341        let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
6342
6343        let scheduler = FullZipScheduler {
6344            data_buf_position: data_start,
6345            data_buf_size: data_size,
6346            rep_index: Some(FullZipRepIndexDetails {
6347                buf_position: rep_start,
6348                bytes_per_value,
6349            }),
6350            priority: 0,
6351            rows_in_page,
6352            bits_per_offset: 32,
6353            details: Arc::new(FullZipDecodeDetails {
6354                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6355                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6356                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6357                max_rep: 0,
6358                max_visible_def: 0,
6359            }),
6360            cached_state: None,
6361            enable_cache: false,
6362        };
6363
6364        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6365        let tasks = scheduler
6366            .schedule_ranges_rep(
6367                &[0..rows_in_page],
6368                &io_dyn,
6369                FullZipRepIndexDetails {
6370                    buf_position: rep_start,
6371                    bytes_per_value,
6372                },
6373            )
6374            .unwrap();
6375
6376        let requests = io.requests();
6377        assert_eq!(requests.len(), 1);
6378        assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
6379
6380        let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
6381        let requests_after_await = io.requests();
6382        assert_eq!(
6383            requests_after_await.len(),
6384            1,
6385            "full page path should not issue rep-index I/O"
6386        );
6387    }
6388
6389    /// This test is used to reproduce fuzz test https://github.com/lancedb/lance/issues/4492
6390    #[tokio::test]
6391    async fn test_fuzz_issue_4492_empty_rep_values() {
6392        use lance_datagen::{RowCount, Seed, array, gen_batch};
6393
6394        let seed = 1823859942947654717u64;
6395        let num_rows = 2741usize;
6396
6397        // Generate the exact same data that caused the failure
6398        let batch_gen = gen_batch().with_seed(Seed::from(seed));
6399        let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
6400        let list_generator = array::rand_list_any(base_generator, false);
6401
6402        let batch = batch_gen
6403            .anon_col(list_generator)
6404            .into_batch_rows(RowCount::from(num_rows as u64))
6405            .unwrap();
6406
6407        let list_array = batch.column(0).clone();
6408
6409        // Force miniblock encoding
6410        let mut metadata = HashMap::new();
6411        metadata.insert(
6412            STRUCTURAL_ENCODING_META_KEY.to_string(),
6413            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6414        );
6415
6416        let test_cases = TestCases::default()
6417            .with_min_file_version(LanceFileVersion::V2_1)
6418            .with_batch_size(100)
6419            .with_range(0..num_rows.min(500) as u64)
6420            .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
6421
6422        check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
6423    }
6424
6425    async fn test_minichunk_size_helper(
6426        string_data: Vec<Option<String>>,
6427        minichunk_size: u64,
6428        file_version: LanceFileVersion,
6429    ) {
6430        use crate::constants::MINICHUNK_SIZE_META_KEY;
6431        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6432        use arrow_array::{ArrayRef, StringArray};
6433        use std::sync::Arc;
6434
6435        let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
6436
6437        let mut metadata = HashMap::new();
6438        metadata.insert(
6439            MINICHUNK_SIZE_META_KEY.to_string(),
6440            minichunk_size.to_string(),
6441        );
6442        metadata.insert(
6443            STRUCTURAL_ENCODING_META_KEY.to_string(),
6444            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6445        );
6446
6447        let test_cases = TestCases::default()
6448            .with_min_file_version(file_version)
6449            .with_batch_size(1000);
6450
6451        check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
6452    }
6453
6454    #[tokio::test]
6455    async fn test_minichunk_size_roundtrip() {
6456        // Test that minichunk size can be configured and works correctly in round-trip encoding
6457        let mut string_data = Vec::new();
6458        for i in 0..100 {
6459            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6460        }
6461        // configure minichunk size to 64 bytes (smaller than the default 4kb) for Lance 2.1
6462        test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
6463    }
6464
6465    #[tokio::test]
6466    async fn test_minichunk_size_128kb_v2_2() {
6467        // Test that minichunk size can be configured to 128KB and works correctly with Lance 2.2
6468        let mut string_data = Vec::new();
6469        // create a 500kb string array
6470        for i in 0..10000 {
6471            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6472        }
6473        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6474    }
6475
6476    #[tokio::test]
6477    async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
6478        let mut string_data = Vec::new();
6479        // 128kb/chunk / 6 bytes (t_9999) = 21845 > max 4096 items per chunk
6480        for i in 0..10000 {
6481            string_data.push(Some(format!("t_{}", i)));
6482        }
6483        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6484    }
6485
6486    #[tokio::test]
6487    async fn test_large_dictionary_general_compression() {
6488        use arrow_array::{ArrayRef, StringArray};
6489        use std::collections::HashMap;
6490        use std::sync::Arc;
6491
6492        // Create large string dictionary data (>32KiB) with low cardinality
6493        // Use 100 unique strings, each 500 bytes long = 50KB dictionary
6494        let unique_values: Vec<String> = (0..100)
6495            .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
6496            .collect();
6497
6498        // Repeat these strings many times to create a large array
6499        let repeated_strings: Vec<_> = unique_values
6500            .iter()
6501            .cycle()
6502            .take(100_000)
6503            .map(|s| Some(s.as_str()))
6504            .collect();
6505
6506        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
6507
6508        // Configure test to use V2_2 and verify encoding
6509        let test_cases = TestCases::default()
6510            .with_min_file_version(LanceFileVersion::V2_2)
6511            .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
6512                assert_eq!(cols.len(), 1);
6513                let col = &cols[0];
6514
6515                // Navigate to the dictionary encoding in the page layout
6516                if let Some(PageEncoding::Structural(page_layout)) =
6517                    &col.final_pages.first().map(|p| &p.description)
6518                    && let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
6519                        &page_layout.layout
6520                    && let Some(dictionary_encoding) = &mini_block.dictionary
6521                {
6522                    match dictionary_encoding.compression.as_ref() {
6523                        Some(Compression::General(general)) => {
6524                            // Verify it's using LZ4 or Zstd
6525                            let compression = general.compression.as_ref().unwrap();
6526                            assert!(
6527                                compression.scheme()
6528                                    == pb21::CompressionScheme::CompressionAlgorithmLz4
6529                                    || compression.scheme()
6530                                        == pb21::CompressionScheme::CompressionAlgorithmZstd,
6531                                "Expected LZ4 or Zstd compression for large dictionary"
6532                            );
6533                        }
6534                        _ => panic!("Expected General compression for large dictionary"),
6535                    }
6536                }
6537            }));
6538
6539        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
6540    }
6541
6542    fn dictionary_encoding_from_page(
6543        page: &crate::encoder::EncodedPage,
6544    ) -> &crate::format::pb21::CompressiveEncoding {
6545        let PageEncoding::Structural(layout) = &page.description else {
6546            panic!("Expected structural page encoding");
6547        };
6548        let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
6549        else {
6550            panic!("Expected mini-block layout");
6551        };
6552        layout
6553            .dictionary
6554            .as_ref()
6555            .unwrap_or_else(|| panic!("Expected dictionary encoding"))
6556    }
6557
6558    async fn encode_variable_dict_page(
6559        metadata: HashMap<String, String>,
6560    ) -> crate::encoder::EncodedPage {
6561        use arrow_array::types::Int32Type;
6562        use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
6563
6564        let values = Arc::new(StringArray::from(
6565            (0..128)
6566                .map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
6567                .collect::<Vec<_>>(),
6568        )) as ArrayRef;
6569        let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
6570        let dict_array =
6571            Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
6572
6573        let field = arrow_schema::Field::new(
6574            "dict_col",
6575            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
6576            false,
6577        )
6578        .with_metadata(metadata);
6579
6580        encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
6581    }
6582
6583    async fn encode_auto_fixed_dict_page(
6584        metadata: HashMap<String, String>,
6585    ) -> crate::encoder::EncodedPage {
6586        use arrow_array::{ArrayRef, Decimal128Array};
6587
6588        // 128-bit fixed-width values with low cardinality to trigger dictionary encoding.
6589        let values = (0..20_000)
6590            .map(|i| match i % 3 {
6591                0 => 10_i128,
6592                1 => 20_i128,
6593                _ => 30_i128,
6594            })
6595            .collect::<Vec<_>>();
6596        let decimal = Decimal128Array::from_iter_values(values)
6597            .with_precision_and_scale(38, 0)
6598            .unwrap();
6599        let decimal = Arc::new(decimal) as ArrayRef;
6600
6601        let mut field_metadata = metadata;
6602        // Strongly encourage dictionary encoding for this synthetic test data.
6603        field_metadata.insert(
6604            "lance-encoding:dict-size-ratio".to_string(),
6605            "0.99".to_string(),
6606        );
6607        let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
6608            .with_metadata(field_metadata);
6609
6610        encode_first_page(field, decimal, LanceFileVersion::V2_2).await
6611    }
6612
6613    #[tokio::test]
6614    async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
6615        let page = encode_variable_dict_page(HashMap::new()).await;
6616        let dictionary_encoding = dictionary_encoding_from_page(&page);
6617        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6618            panic!("Expected General compression for dictionary values");
6619        };
6620        let compression = general.compression.as_ref().unwrap();
6621        assert_eq!(
6622            compression.scheme(),
6623            pb21::CompressionScheme::CompressionAlgorithmLz4
6624        );
6625    }
6626
6627    #[tokio::test]
6628    async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
6629        let page = encode_auto_fixed_dict_page(HashMap::new()).await;
6630        let dictionary_encoding = dictionary_encoding_from_page(&page);
6631        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6632            panic!("Expected General compression for dictionary values");
6633        };
6634        let compression = general.compression.as_ref().unwrap();
6635        assert_eq!(
6636            compression.scheme(),
6637            pb21::CompressionScheme::CompressionAlgorithmLz4
6638        );
6639    }
6640
6641    #[tokio::test]
6642    async fn test_dict_values_general_compression_zstd() {
6643        let mut metadata = HashMap::new();
6644        metadata.insert(
6645            DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6646            "zstd".to_string(),
6647        );
6648        let page = encode_variable_dict_page(metadata).await;
6649        let dictionary_encoding = dictionary_encoding_from_page(&page);
6650        let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
6651            panic!("Expected General compression for dictionary values");
6652        };
6653        let compression = general.compression.as_ref().unwrap();
6654        assert_eq!(
6655            compression.scheme(),
6656            pb21::CompressionScheme::CompressionAlgorithmZstd
6657        );
6658    }
6659
6660    #[tokio::test]
6661    async fn test_dict_values_general_compression_none() {
6662        let mut metadata = HashMap::new();
6663        metadata.insert(
6664            DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6665            "none".to_string(),
6666        );
6667        let page = encode_variable_dict_page(metadata).await;
6668        let dictionary_encoding = dictionary_encoding_from_page(&page);
6669        assert!(
6670            !matches!(
6671                dictionary_encoding.compression.as_ref(),
6672                Some(Compression::General(_))
6673            ),
6674            "Expected dictionary values to avoid General compression"
6675        );
6676    }
6677
6678    #[test]
6679    fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
6680        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6681            &HashMap::new(),
6682            None,
6683            None,
6684        );
6685        assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
6686        assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
6687    }
6688
6689    #[test]
6690    fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
6691        let field_metadata = HashMap::from([
6692            (
6693                DICT_VALUES_COMPRESSION_META_KEY.to_string(),
6694                "none".to_string(),
6695            ),
6696            (
6697                DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
6698                "7".to_string(),
6699            ),
6700        ]);
6701        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6702            &field_metadata,
6703            Some("zstd".to_string()),
6704            Some("3".to_string()),
6705        );
6706        assert_eq!(
6707            metadata.get(COMPRESSION_META_KEY),
6708            Some(&"none".to_string()),
6709        );
6710        assert_eq!(
6711            metadata.get(COMPRESSION_LEVEL_META_KEY),
6712            Some(&"7".to_string()),
6713        );
6714    }
6715
6716    #[test]
6717    fn test_resolve_dict_values_compression_metadata_env_fallback() {
6718        let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
6719            &HashMap::new(),
6720            Some("zstd".to_string()),
6721            Some("9".to_string()),
6722        );
6723        assert_eq!(
6724            metadata.get(COMPRESSION_META_KEY),
6725            Some(&"zstd".to_string()),
6726        );
6727        assert_eq!(
6728            metadata.get(COMPRESSION_LEVEL_META_KEY),
6729            Some(&"9".to_string()),
6730        );
6731    }
6732
6733    #[tokio::test]
6734    async fn test_dictionary_encode_int64() {
6735        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6736        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6737        use crate::version::LanceFileVersion;
6738        use arrow_array::{ArrayRef, Int64Array};
6739        use std::collections::HashMap;
6740        use std::sync::Arc;
6741
6742        // Low cardinality with poor RLE opportunity.
6743        let values = (0..1000)
6744            .map(|i| match i % 3 {
6745                0 => 10i64,
6746                1 => 20i64,
6747                _ => 30i64,
6748            })
6749            .collect::<Vec<_>>();
6750        let array = Arc::new(Int64Array::from(values)) as ArrayRef;
6751
6752        let mut metadata = HashMap::new();
6753        metadata.insert(
6754            STRUCTURAL_ENCODING_META_KEY.to_string(),
6755            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6756        );
6757        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6758
6759        let test_cases = TestCases::default()
6760            .with_min_file_version(LanceFileVersion::V2_2)
6761            .with_batch_size(1000)
6762            .with_range(0..1000)
6763            .with_indices(vec![0, 1, 10, 999])
6764            .with_expected_encoding("dictionary");
6765
6766        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6767    }
6768
6769    #[tokio::test]
6770    async fn test_dictionary_encode_float64() {
6771        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
6772        use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6773        use crate::version::LanceFileVersion;
6774        use arrow_array::{ArrayRef, Float64Array};
6775        use std::collections::HashMap;
6776        use std::sync::Arc;
6777
6778        // Low cardinality with poor RLE opportunity.
6779        let values = (0..1000)
6780            .map(|i| match i % 3 {
6781                0 => 0.1f64,
6782                1 => 0.2f64,
6783                _ => 0.3f64,
6784            })
6785            .collect::<Vec<_>>();
6786        let array = Arc::new(Float64Array::from(values)) as ArrayRef;
6787
6788        let mut metadata = HashMap::new();
6789        metadata.insert(
6790            STRUCTURAL_ENCODING_META_KEY.to_string(),
6791            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6792        );
6793        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
6794
6795        let test_cases = TestCases::default()
6796            .with_min_file_version(LanceFileVersion::V2_2)
6797            .with_batch_size(1000)
6798            .with_range(0..1000)
6799            .with_indices(vec![0, 1, 10, 999])
6800            .with_expected_encoding("dictionary");
6801
6802        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
6803    }
6804
6805    #[test]
6806    fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
6807        let rows = 10_000;
6808        let unique_values = 2_000;
6809
6810        let dictionary_encoding =
6811            ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
6812        let layout = pb21::MiniBlockLayout {
6813            rep_compression: None,
6814            def_compression: None,
6815            value_compression: Some(ProtobufUtils21::flat(64, None)),
6816            dictionary: Some(dictionary_encoding),
6817            num_dictionary_items: unique_values,
6818            layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
6819            num_buffers: 1,
6820            repetition_index_depth: 0,
6821            num_items: rows,
6822            has_large_chunk: false,
6823        };
6824
6825        let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
6826        let scheduler = super::MiniBlockScheduler::try_new(
6827            &buffer_offsets_and_sizes,
6828            /*priority=*/ 0,
6829            /*items_in_page=*/ rows,
6830            &layout,
6831            &DefaultDecompressionStrategy::default(),
6832        )
6833        .unwrap();
6834
6835        let dictionary = scheduler.dictionary.unwrap();
6836        assert_eq!(dictionary.num_dictionary_items, unique_values);
6837        assert_eq!(
6838            dictionary.dictionary_data_alignment,
6839            crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
6840        );
6841    }
6842
6843    // Dictionary encoding decision tests
6844    fn create_test_fixed_data_block(
6845        num_values: u64,
6846        cardinality: u64,
6847        bits_per_value: u64,
6848    ) -> DataBlock {
6849        assert!(cardinality > 0);
6850        assert!(cardinality <= num_values);
6851        let block_info = BlockInfo::default();
6852
6853        assert_eq!(bits_per_value % 8, 0);
6854        let data = match bits_per_value {
6855            32 => {
6856                let values = (0..num_values)
6857                    .map(|i| (i % cardinality) as u32)
6858                    .collect::<Vec<_>>();
6859                crate::buffer::LanceBuffer::reinterpret_vec(values)
6860            }
6861            64 => {
6862                let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
6863                crate::buffer::LanceBuffer::reinterpret_vec(values)
6864            }
6865            128 => {
6866                let values = (0..num_values)
6867                    .map(|i| (i % cardinality) as u128)
6868                    .collect::<Vec<_>>();
6869                crate::buffer::LanceBuffer::reinterpret_vec(values)
6870            }
6871            _ => unreachable!(),
6872        };
6873        DataBlock::FixedWidth(FixedWidthDataBlock {
6874            bits_per_value,
6875            data,
6876            num_values,
6877            block_info,
6878        })
6879    }
6880
6881    /// Helper to create VariableWidth (string) test data block with exact cardinality
6882    fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
6883        use arrow_array::StringArray;
6884
6885        assert!(cardinality <= num_values && cardinality > 0);
6886
6887        let mut values = Vec::with_capacity(num_values as usize);
6888        for i in 0..num_values {
6889            values.push(format!("value_{:016}", i % cardinality));
6890        }
6891
6892        let array = StringArray::from(values);
6893        DataBlock::from_array(Arc::new(array) as ArrayRef)
6894    }
6895
6896    #[test]
6897    fn test_should_dictionary_encode() {
6898        use crate::constants::DICT_SIZE_RATIO_META_KEY;
6899        use lance_core::datatypes::Field as LanceField;
6900
6901        // Create data where dict encoding saves space
6902        let block = create_test_variable_width_block(1000, 10);
6903
6904        let mut metadata = HashMap::new();
6905        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
6906        let arrow_field =
6907            arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
6908        let field = LanceField::try_from(&arrow_field).unwrap();
6909
6910        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
6911            &block,
6912            &field,
6913            LanceFileVersion::V2_1,
6914        );
6915
6916        assert!(
6917            result.is_some(),
6918            "Should use dictionary encode based on size"
6919        );
6920    }
6921
6922    #[test]
6923    fn test_should_not_dictionary_encode_unsupported_bits() {
6924        use crate::constants::DICT_SIZE_RATIO_META_KEY;
6925        use lance_core::datatypes::Field as LanceField;
6926
6927        let block = create_test_fixed_data_block(1000, 1000, 32);
6928
6929        let mut metadata = HashMap::new();
6930        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
6931        let arrow_field =
6932            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
6933        let field = LanceField::try_from(&arrow_field).unwrap();
6934
6935        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
6936            &block,
6937            &field,
6938            LanceFileVersion::V2_1,
6939        );
6940
6941        assert!(
6942            result.is_none(),
6943            "Should not use dictionary encode for unsupported bit width"
6944        );
6945    }
6946
6947    #[test]
6948    fn test_should_not_dictionary_encode_near_unique_sample() {
6949        use crate::constants::DICT_SIZE_RATIO_META_KEY;
6950        use lance_core::datatypes::Field as LanceField;
6951
6952        let num_values = 5000;
6953        let block = create_test_variable_width_block(num_values, num_values);
6954
6955        let mut metadata = HashMap::new();
6956        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
6957        let arrow_field =
6958            arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
6959        let field = LanceField::try_from(&arrow_field).unwrap();
6960
6961        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
6962            &block,
6963            &field,
6964            LanceFileVersion::V2_1,
6965        );
6966
6967        assert!(
6968            result.is_none(),
6969            "Should not probe dictionary encoding for near-unique data"
6970        );
6971    }
6972
6973    async fn encode_first_page(
6974        field: arrow_schema::Field,
6975        array: ArrayRef,
6976        version: LanceFileVersion,
6977    ) -> crate::encoder::EncodedPage {
6978        use crate::encoder::{
6979            ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
6980            default_encoding_strategy,
6981        };
6982        use crate::repdef::RepDefBuilder;
6983
6984        let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
6985        let encoding_strategy = default_encoding_strategy(version);
6986        let mut column_index_seq = ColumnIndexSequence::default();
6987        let encoding_options = EncodingOptions {
6988            cache_bytes_per_column: 1,
6989            max_page_bytes: 32 * 1024 * 1024,
6990            keep_original_array: true,
6991            buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
6992            version,
6993        };
6994
6995        let mut encoder = encoding_strategy
6996            .create_field_encoder(
6997                encoding_strategy.as_ref(),
6998                &lance_field,
6999                &mut column_index_seq,
7000                &encoding_options,
7001            )
7002            .unwrap();
7003
7004        let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
7005        let repdef = RepDefBuilder::default();
7006        let num_rows = array.len() as u64;
7007        let mut pages = Vec::new();
7008        for task in encoder
7009            .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
7010            .unwrap()
7011        {
7012            pages.push(task.await.unwrap());
7013        }
7014        for task in encoder.flush(&mut external_buffers).unwrap() {
7015            pages.push(task.await.unwrap());
7016        }
7017        pages.into_iter().next().unwrap()
7018    }
7019
7020    #[tokio::test]
7021    async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
7022        use crate::format::pb21::page_layout::Layout;
7023
7024        let val = vec![0xABu8; 33];
7025        let arr: ArrayRef = Arc::new(
7026            arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
7027                std::iter::repeat_n(Some(val.as_slice()), 256),
7028                33,
7029            )
7030            .unwrap(),
7031        );
7032        let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
7033        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7034
7035        let PageEncoding::Structural(layout) = &page.description else {
7036            panic!("Expected structural encoding");
7037        };
7038        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7039            panic!("Expected constant layout in slot 2");
7040        };
7041        assert!(layout.inline_value.is_none());
7042        assert_eq!(page.data.len(), 1);
7043
7044        let test_cases = TestCases::default()
7045            .with_min_file_version(LanceFileVersion::V2_2)
7046            .with_max_file_version(LanceFileVersion::V2_2)
7047            .with_page_sizes(vec![4096]);
7048        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7049    }
7050
7051    #[tokio::test]
7052    async fn test_constant_layout_out_of_line_utf8_v2_2() {
7053        use crate::format::pb21::page_layout::Layout;
7054
7055        let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
7056            std::iter::repeat_n("hello", 512),
7057        ));
7058        let field = arrow_schema::Field::new("c", DataType::Utf8, true);
7059        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7060
7061        let PageEncoding::Structural(layout) = &page.description else {
7062            panic!("Expected structural encoding");
7063        };
7064        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7065            panic!("Expected constant layout in slot 2");
7066        };
7067        assert!(layout.inline_value.is_none());
7068        assert_eq!(page.data.len(), 1);
7069
7070        let test_cases = TestCases::default()
7071            .with_min_file_version(LanceFileVersion::V2_2)
7072            .with_max_file_version(LanceFileVersion::V2_2)
7073            .with_page_sizes(vec![4096]);
7074        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7075    }
7076
7077    #[tokio::test]
7078    async fn test_constant_layout_nullable_item_v2_2() {
7079        use crate::format::pb21::page_layout::Layout;
7080
7081        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
7082            Some(7),
7083            None,
7084            Some(7),
7085            None,
7086            Some(7),
7087        ]));
7088        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7089        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7090
7091        let PageEncoding::Structural(layout) = &page.description else {
7092            panic!("Expected structural encoding");
7093        };
7094        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7095            panic!("Expected constant layout in slot 2");
7096        };
7097        assert!(layout.inline_value.is_some());
7098        assert_eq!(page.data.len(), 2);
7099
7100        let test_cases = TestCases::default()
7101            .with_min_file_version(LanceFileVersion::V2_2)
7102            .with_max_file_version(LanceFileVersion::V2_2)
7103            .with_page_sizes(vec![4096]);
7104        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7105    }
7106
7107    #[tokio::test]
7108    async fn test_constant_layout_list_repdef_v2_2() {
7109        use crate::format::pb21::page_layout::Layout;
7110        use arrow_array::builder::{Int32Builder, ListBuilder};
7111
7112        let mut builder = ListBuilder::new(Int32Builder::new());
7113        builder.values().append_value(7);
7114        builder.values().append_null();
7115        builder.values().append_value(7);
7116        builder.append(true);
7117
7118        builder.append(true);
7119
7120        builder.values().append_value(7);
7121        builder.append(true);
7122
7123        builder.append_null();
7124
7125        let arr: ArrayRef = Arc::new(builder.finish());
7126        let field = arrow_schema::Field::new(
7127            "c",
7128            DataType::List(Arc::new(arrow_schema::Field::new(
7129                "item",
7130                DataType::Int32,
7131                true,
7132            ))),
7133            true,
7134        );
7135        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7136
7137        let PageEncoding::Structural(layout) = &page.description else {
7138            panic!("Expected structural encoding");
7139        };
7140        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7141            panic!("Expected constant layout in slot 2");
7142        };
7143        assert!(layout.inline_value.is_some());
7144        assert_eq!(page.data.len(), 2);
7145
7146        let test_cases = TestCases::default()
7147            .with_min_file_version(LanceFileVersion::V2_2)
7148            .with_max_file_version(LanceFileVersion::V2_2)
7149            .with_page_sizes(vec![4096]);
7150        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7151    }
7152
7153    #[tokio::test]
7154    async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
7155        use crate::format::pb21::page_layout::Layout;
7156        use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
7157
7158        let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
7159        for _ in 0..64 {
7160            builder.values().append_value(1);
7161            builder.values().append_null();
7162            builder.values().append_value(3);
7163            builder.append(true);
7164        }
7165        let arr: ArrayRef = Arc::new(builder.finish());
7166        let field = arrow_schema::Field::new(
7167            "c",
7168            DataType::FixedSizeList(
7169                Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
7170                3,
7171            ),
7172            true,
7173        );
7174        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7175
7176        if let PageEncoding::Structural(layout) = &page.description {
7177            assert!(
7178                !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7179                "FixedSizeList should not use constant layout yet"
7180            );
7181        }
7182
7183        let test_cases = TestCases::default()
7184            .with_min_file_version(LanceFileVersion::V2_2)
7185            .with_max_file_version(LanceFileVersion::V2_2)
7186            .with_page_sizes(vec![4096]);
7187        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7188    }
7189
7190    #[tokio::test]
7191    async fn test_constant_layout_not_written_before_v2_2() {
7192        use crate::format::pb21::page_layout::Layout;
7193
7194        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
7195        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7196        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
7197
7198        let PageEncoding::Structural(layout) = &page.description else {
7199            return;
7200        };
7201        assert!(
7202            !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7203            "Should not emit constant layout before v2.2"
7204        );
7205
7206        let test_cases = TestCases::default()
7207            .with_min_file_version(LanceFileVersion::V2_1)
7208            .with_max_file_version(LanceFileVersion::V2_1)
7209            .with_page_sizes(vec![4096]);
7210        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7211    }
7212
7213    #[tokio::test]
7214    async fn test_all_null_constant_layout_still_works_v2_2() {
7215        use crate::format::pb21::page_layout::Layout;
7216
7217        let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
7218        let field = arrow_schema::Field::new("c", DataType::Int32, true);
7219        let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7220
7221        let PageEncoding::Structural(layout) = &page.description else {
7222            panic!("Expected structural encoding");
7223        };
7224        let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7225            panic!("Expected layout in slot 2");
7226        };
7227        assert!(layout.inline_value.is_none());
7228        assert_eq!(page.data.len(), 0);
7229
7230        let test_cases = TestCases::default()
7231            .with_min_file_version(LanceFileVersion::V2_2)
7232            .with_max_file_version(LanceFileVersion::V2_2)
7233            .with_page_sizes(vec![4096]);
7234        check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7235    }
7236
7237    #[test]
7238    fn test_encode_decode_complex_all_null_vals_roundtrip() {
7239        use crate::compression::{
7240            DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
7241        };
7242
7243        let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
7244
7245        let compression_strategy = DefaultCompressionStrategy::default();
7246        let decompression_strategy = DefaultDecompressionStrategy::default();
7247
7248        let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
7249            &values,
7250            &compression_strategy,
7251        )
7252        .unwrap();
7253
7254        let decompressor = decompression_strategy
7255            .create_block_decompressor(&encoding)
7256            .unwrap();
7257        let decompressed = decompressor
7258            .decompress(compressed_buf, values.len() as u64)
7259            .unwrap();
7260        let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
7261        assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
7262        assert_eq!(decompressed_fixed_width.bits_per_value, 16);
7263        let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
7264        assert_eq!(rep_result.as_ref(), values.as_ref());
7265    }
7266
7267    #[tokio::test]
7268    async fn test_complex_all_null_compression_gated_by_version() {
7269        use crate::format::pb21::page_layout::Layout;
7270        use arrow_array::ListArray;
7271
7272        let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7273            (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7274        );
7275        let arr: ArrayRef = Arc::new(list_array);
7276        let field = arrow_schema::Field::new(
7277            "c",
7278            DataType::List(Arc::new(arrow_schema::Field::new(
7279                "item",
7280                DataType::Int32,
7281                true,
7282            ))),
7283            true,
7284        );
7285
7286        let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
7287        let PageEncoding::Structural(layout_v21) = &page_v21.description else {
7288            panic!("Expected structural encoding");
7289        };
7290        let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
7291            panic!("Expected constant layout");
7292        };
7293        assert!(layout_v21.rep_compression.is_none());
7294        assert!(layout_v21.def_compression.is_none());
7295        assert_eq!(layout_v21.num_rep_values, 0);
7296        assert_eq!(layout_v21.num_def_values, 0);
7297
7298        let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
7299        let PageEncoding::Structural(layout_v22) = &page_v22.description else {
7300            panic!("Expected structural encoding");
7301        };
7302        let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
7303            panic!("Expected constant layout");
7304        };
7305        assert!(layout_v22.def_compression.is_some());
7306        assert!(layout_v22.num_def_values > 0);
7307    }
7308
7309    #[tokio::test]
7310    async fn test_complex_all_null_round_trip() {
7311        use arrow_array::ListArray;
7312
7313        let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7314            (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7315        );
7316
7317        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
7318        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
7319            .await;
7320    }
7321}