Skip to main content

lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use crate::{
16    constants::{
17        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18    },
19    data::DictionaryDataBlock,
20    encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21    format::{
22        pb21::{self, compressive_encoding::Compression, CompressiveEncoding, PageLayout},
23        ProtobufUtils21,
24    },
25};
26use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
27use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
31use itertools::Itertools;
32use lance_arrow::deepcopy::deep_copy_nulls;
33use lance_core::{
34    cache::{CacheKey, Context, DeepSizeOf},
35    error::{Error, LanceOptionExt},
36    utils::bit::pad_bytes,
37};
38use log::trace;
39use snafu::location;
40
41use crate::{
42    compression::{
43        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44    },
45    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46    utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50    encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56    encodings::logical::primitive::miniblock::MiniBlockCompressed,
57    statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60    repdef::{
61        build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
62        ControlWordParser, DefinitionInterpretation, RepDefSlicer,
63    },
64    utils::accumulation::AccumulationQueue,
65};
66use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
67
68use crate::constants::DICT_SIZE_RATIO_META_KEY;
69use crate::encodings::logical::primitive::dict::DICT_INDICES_BITS_PER_VALUE;
70use crate::version::LanceFileVersion;
71use crate::{
72    buffer::LanceBuffer,
73    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
74    decoder::{
75        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
76        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
77        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
78        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
79    },
80    encoder::{
81        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
82    },
83    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
84    EncodingsIo,
85};
86
87pub mod blob;
88pub mod dict;
89pub mod fullzip;
90pub mod miniblock;
91
92const FILL_BYTE: u8 = 0xFE;
93
94struct PageLoadTask {
95    decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
96    num_rows: u64,
97}
98
99/// A trait for figuring out how to schedule the data within
100/// a single page.
101trait StructuralPageScheduler: std::fmt::Debug + Send {
102    /// Fetches any metadata required for the page
103    fn initialize<'a>(
104        &'a mut self,
105        io: &Arc<dyn EncodingsIo>,
106    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
107    /// Loads metadata from a previous initialize call
108    fn load(&mut self, data: &Arc<dyn CachedPageData>);
109    /// Schedules the read of the given ranges in the page
110    ///
111    /// The read may be split into multiple "shards" if the page is extremely large.
112    /// Each shard maps to one or more rows and can be decoded independently.
113    ///
114    /// Note: this sharding is for splitting up very large pages into smaller reads to
115    /// avoid buffering too much data in memory.  It is not related to the batch size or
116    /// compute units in any way.
117    fn schedule_ranges(
118        &self,
119        ranges: &[Range<u64>],
120        io: &Arc<dyn EncodingsIo>,
121    ) -> Result<Vec<PageLoadTask>>;
122}
123
124/// Metadata describing the decoded size of a mini-block
125#[derive(Debug)]
126struct ChunkMeta {
127    num_values: u64,
128    chunk_size_bytes: u64,
129    offset_bytes: u64,
130}
131
132/// A mini-block chunk that has been decoded and decompressed
133#[derive(Debug, Clone)]
134struct DecodedMiniBlockChunk {
135    rep: Option<ScalarBuffer<u16>>,
136    def: Option<ScalarBuffer<u16>>,
137    values: DataBlock,
138}
139
140/// A task to decode a one or more mini-blocks of data into an output batch
141///
142/// Note: Two batches might share the same mini-block of data.  When this happens
143/// then each batch gets a copy of the block and each batch decodes the block independently.
144///
145/// This means we have duplicated work but it is necessary to avoid having to synchronize
146/// the decoding of the block. (TODO: test this theory)
147#[derive(Debug)]
148struct DecodeMiniBlockTask {
149    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
150    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
151    value_decompressor: Arc<dyn MiniBlockDecompressor>,
152    dictionary_data: Option<Arc<DataBlock>>,
153    def_meaning: Arc<[DefinitionInterpretation]>,
154    num_buffers: u64,
155    max_visible_level: u16,
156    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
157    has_large_chunk: bool,
158}
159
160impl DecodeMiniBlockTask {
161    fn decode_levels(
162        rep_decompressor: &dyn BlockDecompressor,
163        levels: LanceBuffer,
164        num_levels: u16,
165    ) -> Result<ScalarBuffer<u16>> {
166        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
167        let rep = rep.as_fixed_width().unwrap();
168        debug_assert_eq!(rep.num_values, num_levels as u64);
169        debug_assert_eq!(rep.bits_per_value, 16);
170        Ok(rep.data.borrow_to_typed_slice::<u16>())
171    }
172
173    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
174    // values from `level_buf` starting at `offset`.
175    //
176    // We need to handle both the case where `levels` is None (no nulls encountered
177    // yet) and the case where `level_buf` is None (the input we are copying from has
178    // no nulls)
179    fn extend_levels(
180        range: Range<u64>,
181        levels: &mut Option<LevelBuffer>,
182        level_buf: &Option<impl AsRef<[u16]>>,
183        dest_offset: usize,
184    ) {
185        if let Some(level_buf) = level_buf {
186            if levels.is_none() {
187                // This is the first non-empty def buf we've hit, fill in the past
188                // with 0 (valid)
189                let mut new_levels_vec =
190                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
191                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
192                *levels = Some(new_levels_vec);
193            }
194            levels.as_mut().unwrap().extend(
195                level_buf.as_ref()[range.start as usize..range.end as usize]
196                    .iter()
197                    .copied(),
198            );
199        } else if let Some(levels) = levels {
200            let num_values = (range.end - range.start) as usize;
201            // This is an all-valid level_buf but we had nulls earlier and so we
202            // need to materialize it
203            levels.extend(iter::repeat_n(0, num_values));
204        }
205    }
206
207    /// Maps a range of rows to a range of items and a range of levels
208    ///
209    /// If there is no repetition information this just returns the range as-is.
210    ///
211    /// If there is repetition information then we need to do some work to figure out what
212    /// range of items corresponds to the requested range of rows.
213    ///
214    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
215    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
216    ///
217    /// Idx: 0 1 2 3 4 5 6
218    /// Rep: 1 0 0 1 0 1 0
219    ///
220    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
221    ///
222    /// If there are invisible items then we don't count them when calculating the range of items we
223    /// are interested in but we do count them when calculating the range of levels we are interested
224    /// in.  As a result we have to return both the item range (first return value) and the level range
225    /// (second return value).
226    ///
227    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
228    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
229    ///
230    /// Idx: 0 1 2 3 4 5 6 7 8
231    /// Rep: 1 0 0 1 0 1 1 0 0
232    /// Def: 0 0 0 0 0 1 0 0 0
233    /// Itm: 1 2 3 4 5 6 7 8
234    ///
235    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
236    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
237    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
238    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
239    ///
240    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
241    ///
242    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
243    fn map_range(
244        range: Range<u64>,
245        rep: Option<&impl AsRef<[u16]>>,
246        def: Option<&impl AsRef<[u16]>>,
247        max_rep: u16,
248        max_visible_def: u16,
249        // The total number of items (not rows) in the chunk.  This is not quite the same as
250        // rep.len() / def.len() because it doesn't count invisible items
251        total_items: u64,
252        preamble_action: PreambleAction,
253    ) -> (Range<u64>, Range<u64>) {
254        if let Some(rep) = rep {
255            let mut rep = rep.as_ref();
256            // If there is a preamble and we need to skip it then do that first.  The work is the same
257            // whether there is def information or not
258            let mut items_in_preamble = 0_u64;
259            let first_row_start = match preamble_action {
260                PreambleAction::Skip | PreambleAction::Take => {
261                    let first_row_start = if let Some(def) = def.as_ref() {
262                        let mut first_row_start = None;
263                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
264                            if *rep == max_rep {
265                                first_row_start = Some(idx as u64);
266                                break;
267                            }
268                            if *def <= max_visible_def {
269                                items_in_preamble += 1;
270                            }
271                        }
272                        first_row_start
273                    } else {
274                        let first_row_start =
275                            rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
276                        items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
277                        first_row_start
278                    };
279                    // It is possible for a chunk to be entirely partial values but if it is then it
280                    // should never show up as a preamble to skip
281                    if first_row_start.is_none() {
282                        assert!(preamble_action == PreambleAction::Take);
283                        return (0..total_items, 0..rep.len() as u64);
284                    }
285                    let first_row_start = first_row_start.unwrap();
286                    rep = &rep[first_row_start as usize..];
287                    first_row_start
288                }
289                PreambleAction::Absent => {
290                    debug_assert!(rep[0] == max_rep);
291                    0
292                }
293            };
294
295            // We hit this case when all we needed was the preamble
296            if range.start == range.end {
297                debug_assert!(preamble_action == PreambleAction::Take);
298                debug_assert!(items_in_preamble <= total_items);
299                return (0..items_in_preamble, 0..first_row_start);
300            }
301            assert!(range.start < range.end);
302
303            let mut rows_seen = 0;
304            let mut new_start = 0;
305            let mut new_levels_start = 0;
306
307            if let Some(def) = def {
308                let def = &def.as_ref()[first_row_start as usize..];
309
310                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
311                let mut lead_invis_seen = 0;
312
313                if range.start > 0 {
314                    if def[0] > max_visible_def {
315                        lead_invis_seen += 1;
316                    }
317                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
318                        if *rep == max_rep {
319                            rows_seen += 1;
320                            if rows_seen == range.start {
321                                new_start = idx as u64 + 1 - lead_invis_seen;
322                                new_levels_start = idx as u64 + 1;
323                                break;
324                            }
325                        }
326                        if *def > max_visible_def {
327                            lead_invis_seen += 1;
328                        }
329                    }
330                }
331
332                rows_seen += 1;
333
334                let mut new_end = u64::MAX;
335                let mut new_levels_end = rep.len() as u64;
336                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
337                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
338                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
339                    .iter()
340                    .zip(&def[(new_levels_start + 1) as usize..])
341                    .enumerate()
342                {
343                    if *rep == max_rep {
344                        rows_seen += 1;
345                        if rows_seen == range.end + 1 {
346                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
347                            new_levels_end = idx as u64 + new_levels_start + 1;
348                            break;
349                        }
350                    }
351                    if *def > max_visible_def {
352                        tail_invis_seen += 1;
353                    }
354                }
355
356                if new_end == u64::MAX {
357                    new_levels_end = rep.len() as u64;
358                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
359                    new_end = rep.len() as u64 - total_invis_seen;
360                }
361
362                assert_ne!(new_end, u64::MAX);
363
364                // Adjust for any skipped preamble
365                if preamble_action == PreambleAction::Skip {
366                    new_start += items_in_preamble;
367                    new_end += items_in_preamble;
368                    new_levels_start += first_row_start;
369                    new_levels_end += first_row_start;
370                } else if preamble_action == PreambleAction::Take {
371                    debug_assert_eq!(new_start, 0);
372                    debug_assert_eq!(new_levels_start, 0);
373                    new_end += items_in_preamble;
374                    new_levels_end += first_row_start;
375                }
376
377                debug_assert!(new_end <= total_items);
378                (new_start..new_end, new_levels_start..new_levels_end)
379            } else {
380                // Easy case, there are no invisible items, so we don't need to check for them
381                // The items range and levels range will be the same.  We do still need to walk
382                // the rep levels to find the row boundaries
383
384                // range.start == 0 always maps to 0, otherwise we need to walk
385                if range.start > 0 {
386                    for (idx, rep) in rep.iter().skip(1).enumerate() {
387                        if *rep == max_rep {
388                            rows_seen += 1;
389                            if rows_seen == range.start {
390                                new_start = idx as u64 + 1;
391                                break;
392                            }
393                        }
394                    }
395                }
396                let mut new_end = rep.len() as u64;
397                // range.end == max_items always maps to rep.len(), otherwise we need to walk
398                if range.end < total_items {
399                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
400                        if *rep == max_rep {
401                            rows_seen += 1;
402                            if rows_seen == range.end {
403                                new_end = idx as u64 + new_start + 1;
404                                break;
405                            }
406                        }
407                    }
408                }
409
410                // Adjust for any skipped preamble
411                if preamble_action == PreambleAction::Skip {
412                    new_start += first_row_start;
413                    new_end += first_row_start;
414                } else if preamble_action == PreambleAction::Take {
415                    debug_assert_eq!(new_start, 0);
416                    new_end += first_row_start;
417                }
418
419                debug_assert!(new_end <= total_items);
420                (new_start..new_end, new_start..new_end)
421            }
422        } else {
423            // No repetition info, easy case, just use the range as-is and the item
424            // and level ranges are the same
425            (range.clone(), range)
426        }
427    }
428
429    // read `num_buffers` buffer sizes from `buf` starting at `offset`
430    fn read_buffer_sizes<const LARGE: bool>(
431        buf: &[u8],
432        offset: &mut usize,
433        num_buffers: u64,
434    ) -> Vec<u32> {
435        let read_size = if LARGE { 4 } else { 2 };
436        (0..num_buffers)
437            .map(|_| {
438                let bytes = &buf[*offset..*offset + read_size];
439                let size = if LARGE {
440                    u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
441                } else {
442                    // the buffer size is read from u16 but is stored as u32 after decoding for consistency
443                    u16::from_le_bytes([bytes[0], bytes[1]]) as u32
444                };
445                *offset += read_size;
446                size
447            })
448            .collect()
449    }
450
451    // Unserialize a miniblock into a collection of vectors
452    fn decode_miniblock_chunk(
453        &self,
454        buf: &LanceBuffer,
455        items_in_chunk: u64,
456    ) -> Result<DecodedMiniBlockChunk> {
457        let mut offset = 0;
458        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
459        offset += 2;
460
461        let rep_size = if self.rep_decompressor.is_some() {
462            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
463            offset += 2;
464            Some(rep_size)
465        } else {
466            None
467        };
468        let def_size = if self.def_decompressor.is_some() {
469            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
470            offset += 2;
471            Some(def_size)
472        } else {
473            None
474        };
475
476        let buffer_sizes = if self.has_large_chunk {
477            Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
478        } else {
479            Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
480        };
481
482        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
483
484        let rep = rep_size.map(|rep_size| {
485            let rep = buf.slice_with_length(offset, rep_size as usize);
486            offset += rep_size as usize;
487            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
488            rep
489        });
490
491        let def = def_size.map(|def_size| {
492            let def = buf.slice_with_length(offset, def_size as usize);
493            offset += def_size as usize;
494            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
495            def
496        });
497
498        let buffers = buffer_sizes
499            .into_iter()
500            .map(|buf_size| {
501                let buf = buf.slice_with_length(offset, buf_size as usize);
502                offset += buf_size as usize;
503                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504                buf
505            })
506            .collect::<Vec<_>>();
507
508        let values = self
509            .value_decompressor
510            .decompress(buffers, items_in_chunk)?;
511
512        let rep = rep
513            .map(|rep| {
514                Self::decode_levels(
515                    self.rep_decompressor.as_ref().unwrap().as_ref(),
516                    rep,
517                    num_levels,
518                )
519            })
520            .transpose()?;
521        let def = def
522            .map(|def| {
523                Self::decode_levels(
524                    self.def_decompressor.as_ref().unwrap().as_ref(),
525                    def,
526                    num_levels,
527                )
528            })
529            .transpose()?;
530
531        Ok(DecodedMiniBlockChunk { rep, def, values })
532    }
533}
534
535impl DecodePageTask for DecodeMiniBlockTask {
536    fn decode(self: Box<Self>) -> Result<DecodedPage> {
537        // First, we create output buffers for the rep and def and data
538        let mut repbuf: Option<LevelBuffer> = None;
539        let mut defbuf: Option<LevelBuffer> = None;
540
541        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
542
543        // This is probably an over-estimate but it's quick and easy to calculate
544        let estimated_size_bytes = self
545            .instructions
546            .iter()
547            .map(|(_, chunk)| chunk.data.len())
548            .sum::<usize>()
549            * 2;
550        let mut data_builder =
551            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
552
553        // We need to keep track of the offset into repbuf/defbuf that we are building up
554        let mut level_offset = 0;
555
556        // Pre-compute caching needs for each chunk by checking if the next chunk is the same
557        let needs_caching: Vec<bool> = self
558            .instructions
559            .windows(2)
560            .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
561            .chain(std::iter::once(false)) // the last one never needs caching
562            .collect();
563
564        // Cache for storing decoded chunks when beneficial
565        let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
566
567        // Now we iterate through each instruction and process it
568        for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
569            let should_cache_this_chunk = needs_caching[idx];
570
571            let decoded_chunk = match &chunk_cache {
572                Some((cached_chunk_idx, ref cached_chunk))
573                    if *cached_chunk_idx == chunk.chunk_idx =>
574                {
575                    // Clone only when we have a cache hit (much cheaper than decoding)
576                    cached_chunk.clone()
577                }
578                _ => {
579                    // Cache miss, need to decode
580                    let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
581
582                    // Only update cache if this chunk will benefit the next access
583                    if should_cache_this_chunk {
584                        chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
585                    }
586                    decoded
587                }
588            };
589
590            let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
591
592            // Our instructions tell us which rows we want to take from this chunk
593            let row_range_start =
594                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
595            let row_range_end = row_range_start + instructions.rows_to_take;
596
597            // We use the rep info to map the row range to an item range / levels range
598            let (item_range, level_range) = Self::map_range(
599                row_range_start..row_range_end,
600                rep.as_ref(),
601                def.as_ref(),
602                max_rep,
603                self.max_visible_level,
604                chunk.items_in_chunk,
605                instructions.preamble_action,
606            );
607            if item_range.end - item_range.start > chunk.items_in_chunk {
608                return Err(lance_core::Error::Internal {
609                    message: format!(
610                        "Item range {:?} is greater than chunk items in chunk {:?}",
611                        item_range, chunk.items_in_chunk
612                    ),
613                    location: location!(),
614                });
615            }
616
617            // Now we append the data to the output buffers
618            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
619            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
620            level_offset += (level_range.end - level_range.start) as usize;
621            data_builder.append(&values, item_range);
622        }
623
624        let mut data = data_builder.finish();
625
626        let unraveler =
627            RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
628
629        if let Some(dictionary) = &self.dictionary_data {
630            // Don't decode here, that happens later (if needed)
631            let DataBlock::FixedWidth(indices) = data else {
632                return Err(lance_core::Error::Internal {
633                    message: format!(
634                        "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
635                        data
636                    ),
637                    location: location!(),
638                });
639            };
640            data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
641                indices,
642                dictionary.as_ref().clone(),
643            ));
644        }
645
646        Ok(DecodedPage {
647            data,
648            repdef: unraveler,
649        })
650    }
651}
652
653/// A chunk that has been loaded by the miniblock scheduler (but not
654/// yet decoded)
655#[derive(Debug)]
656struct LoadedChunk {
657    data: LanceBuffer,
658    items_in_chunk: u64,
659    byte_range: Range<u64>,
660    chunk_idx: usize,
661}
662
663impl Clone for LoadedChunk {
664    fn clone(&self) -> Self {
665        Self {
666            // Safe as we always create borrowed buffers here
667            data: self.data.clone(),
668            items_in_chunk: self.items_in_chunk,
669            byte_range: self.byte_range.clone(),
670            chunk_idx: self.chunk_idx,
671        }
672    }
673}
674
675/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
676/// details on the different layouts.
677#[derive(Debug)]
678struct MiniBlockDecoder {
679    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
680    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
681    value_decompressor: Arc<dyn MiniBlockDecompressor>,
682    def_meaning: Arc<[DefinitionInterpretation]>,
683    loaded_chunks: VecDeque<LoadedChunk>,
684    instructions: VecDeque<ChunkInstructions>,
685    offset_in_current_chunk: u64,
686    num_rows: u64,
687    num_buffers: u64,
688    dictionary: Option<Arc<DataBlock>>,
689    has_large_chunk: bool,
690}
691
692/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
693/// process for miniblock encoded data.
694impl StructuralPageDecoder for MiniBlockDecoder {
695    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
696        let mut items_desired = num_rows;
697        let mut need_preamble = false;
698        let mut skip_in_chunk = self.offset_in_current_chunk;
699        let mut drain_instructions = Vec::new();
700        while items_desired > 0 || need_preamble {
701            let (instructions, consumed) = self
702                .instructions
703                .front()
704                .unwrap()
705                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
706
707            while self.loaded_chunks.front().unwrap().chunk_idx
708                != instructions.chunk_instructions.chunk_idx
709            {
710                self.loaded_chunks.pop_front();
711            }
712            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
713            if consumed {
714                self.instructions.pop_front();
715            }
716        }
717        // We can throw away need_preamble here because it must be false.  If it were true it would mean
718        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
719        self.offset_in_current_chunk = skip_in_chunk;
720
721        let max_visible_level = self
722            .def_meaning
723            .iter()
724            .take_while(|l| !l.is_list())
725            .map(|l| l.num_def_levels())
726            .sum::<u16>();
727
728        Ok(Box::new(DecodeMiniBlockTask {
729            instructions: drain_instructions,
730            def_decompressor: self.def_decompressor.clone(),
731            rep_decompressor: self.rep_decompressor.clone(),
732            value_decompressor: self.value_decompressor.clone(),
733            dictionary_data: self.dictionary.clone(),
734            def_meaning: self.def_meaning.clone(),
735            num_buffers: self.num_buffers,
736            max_visible_level,
737            has_large_chunk: self.has_large_chunk,
738        }))
739    }
740
741    fn num_rows(&self) -> u64 {
742        self.num_rows
743    }
744}
745
746#[derive(Debug)]
747struct CachedComplexAllNullState {
748    rep: Option<ScalarBuffer<u16>>,
749    def: Option<ScalarBuffer<u16>>,
750}
751
752impl DeepSizeOf for CachedComplexAllNullState {
753    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
754        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
755            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756    }
757}
758
759impl CachedPageData for CachedComplexAllNullState {
760    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
761        self
762    }
763}
764
765/// A scheduler for all-null data that has repetition and definition levels
766///
767/// We still need to do some I/O in this case because we need to figure out what kind of null we
768/// are dealing with (null list, null struct, what level null struct, etc.)
769///
770/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
771/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
772/// and fast so...maybe not :)
773#[derive(Debug)]
774pub struct ComplexAllNullScheduler {
775    // Set from protobuf
776    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
777    def_meaning: Arc<[DefinitionInterpretation]>,
778    repdef: Option<Arc<CachedComplexAllNullState>>,
779    max_visible_level: u16,
780}
781
782impl ComplexAllNullScheduler {
783    pub fn new(
784        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
785        def_meaning: Arc<[DefinitionInterpretation]>,
786    ) -> Self {
787        let max_visible_level = def_meaning
788            .iter()
789            .take_while(|l| !l.is_list())
790            .map(|l| l.num_def_levels())
791            .sum::<u16>();
792        Self {
793            buffer_offsets_and_sizes,
794            def_meaning,
795            repdef: None,
796            max_visible_level,
797        }
798    }
799}
800
801impl StructuralPageScheduler for ComplexAllNullScheduler {
802    fn initialize<'a>(
803        &'a mut self,
804        io: &Arc<dyn EncodingsIo>,
805    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
806        // Fully load the rep & def buffers, as needed
807        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
808        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
809        let has_rep = rep_size > 0;
810        let has_def = def_size > 0;
811
812        let mut reads = Vec::with_capacity(2);
813        if has_rep {
814            reads.push(rep_pos..rep_pos + rep_size);
815        }
816        if has_def {
817            reads.push(def_pos..def_pos + def_size);
818        }
819
820        let data = io.submit_request(reads, 0);
821
822        async move {
823            let data = data.await?;
824            let mut data_iter = data.into_iter();
825
826            let rep = if has_rep {
827                let rep = data_iter.next().unwrap();
828                let rep = LanceBuffer::from_bytes(rep, 2);
829                let rep = rep.borrow_to_typed_slice::<u16>();
830                Some(rep)
831            } else {
832                None
833            };
834
835            let def = if has_def {
836                let def = data_iter.next().unwrap();
837                let def = LanceBuffer::from_bytes(def, 2);
838                let def = def.borrow_to_typed_slice::<u16>();
839                Some(def)
840            } else {
841                None
842            };
843
844            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
845
846            self.repdef = Some(repdef.clone());
847
848            Ok(repdef as Arc<dyn CachedPageData>)
849        }
850        .boxed()
851    }
852
853    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
854        self.repdef = Some(
855            data.clone()
856                .as_arc_any()
857                .downcast::<CachedComplexAllNullState>()
858                .unwrap(),
859        );
860    }
861
862    fn schedule_ranges(
863        &self,
864        ranges: &[Range<u64>],
865        _io: &Arc<dyn EncodingsIo>,
866    ) -> Result<Vec<PageLoadTask>> {
867        let ranges = VecDeque::from_iter(ranges.iter().cloned());
868        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
869        let decoder = Box::new(ComplexAllNullPageDecoder {
870            ranges,
871            rep: self.repdef.as_ref().unwrap().rep.clone(),
872            def: self.repdef.as_ref().unwrap().def.clone(),
873            num_rows,
874            def_meaning: self.def_meaning.clone(),
875            max_visible_level: self.max_visible_level,
876        }) as Box<dyn StructuralPageDecoder>;
877        let page_load_task = PageLoadTask {
878            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
879            num_rows,
880        };
881        Ok(vec![page_load_task])
882    }
883}
884
885#[derive(Debug)]
886pub struct ComplexAllNullPageDecoder {
887    ranges: VecDeque<Range<u64>>,
888    rep: Option<ScalarBuffer<u16>>,
889    def: Option<ScalarBuffer<u16>>,
890    num_rows: u64,
891    def_meaning: Arc<[DefinitionInterpretation]>,
892    max_visible_level: u16,
893}
894
895impl ComplexAllNullPageDecoder {
896    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
897        let mut rows_desired = num_rows;
898        let mut ranges = Vec::with_capacity(self.ranges.len());
899        while rows_desired > 0 {
900            let front = self.ranges.front_mut().unwrap();
901            let avail = front.end - front.start;
902            if avail > rows_desired {
903                ranges.push(front.start..front.start + rows_desired);
904                front.start += rows_desired;
905                rows_desired = 0;
906            } else {
907                ranges.push(self.ranges.pop_front().unwrap());
908                rows_desired -= avail;
909            }
910        }
911        ranges
912    }
913}
914
915impl StructuralPageDecoder for ComplexAllNullPageDecoder {
916    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
917        let drained_ranges = self.drain_ranges(num_rows);
918        Ok(Box::new(DecodeComplexAllNullTask {
919            ranges: drained_ranges,
920            rep: self.rep.clone(),
921            def: self.def.clone(),
922            def_meaning: self.def_meaning.clone(),
923            max_visible_level: self.max_visible_level,
924        }))
925    }
926
927    fn num_rows(&self) -> u64 {
928        self.num_rows
929    }
930}
931
932/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
933/// for the null data.
934#[derive(Debug)]
935pub struct DecodeComplexAllNullTask {
936    ranges: Vec<Range<u64>>,
937    rep: Option<ScalarBuffer<u16>>,
938    def: Option<ScalarBuffer<u16>>,
939    def_meaning: Arc<[DefinitionInterpretation]>,
940    max_visible_level: u16,
941}
942
943impl DecodeComplexAllNullTask {
944    fn decode_level(
945        &self,
946        levels: &Option<ScalarBuffer<u16>>,
947        num_values: u64,
948    ) -> Option<Vec<u16>> {
949        levels.as_ref().map(|levels| {
950            let mut referenced_levels = Vec::with_capacity(num_values as usize);
951            for range in &self.ranges {
952                referenced_levels.extend(
953                    levels[range.start as usize..range.end as usize]
954                        .iter()
955                        .copied(),
956                );
957            }
958            referenced_levels
959        })
960    }
961}
962
963impl DecodePageTask for DecodeComplexAllNullTask {
964    fn decode(self: Box<Self>) -> Result<DecodedPage> {
965        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
966        let rep = self.decode_level(&self.rep, num_values);
967        let def = self.decode_level(&self.def, num_values);
968
969        // If there are definition levels there may be empty / null lists which are not visible
970        // in the items array.  We need to account for that here to figure out how many values
971        // should be in the items array.
972        let num_values = if let Some(def) = &def {
973            def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
974        } else {
975            num_values
976        };
977
978        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
979        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
980        Ok(DecodedPage {
981            data,
982            repdef: unraveler,
983        })
984    }
985}
986
987/// A scheduler for simple all-null data
988///
989/// "simple" all-null data is data that is all null and only has a single level of definition and
990/// no repetition.  We don't need to read any data at all in this case.
991#[derive(Debug, Default)]
992pub struct SimpleAllNullScheduler {}
993
994impl StructuralPageScheduler for SimpleAllNullScheduler {
995    fn initialize<'a>(
996        &'a mut self,
997        _io: &Arc<dyn EncodingsIo>,
998    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
999        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1000    }
1001
1002    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1003
1004    fn schedule_ranges(
1005        &self,
1006        ranges: &[Range<u64>],
1007        _io: &Arc<dyn EncodingsIo>,
1008    ) -> Result<Vec<PageLoadTask>> {
1009        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1010        let decoder =
1011            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1012        let page_load_task = PageLoadTask {
1013            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1014            num_rows,
1015        };
1016        Ok(vec![page_load_task])
1017    }
1018}
1019
1020/// A page decode task for all-null data without any
1021/// repetition and only a single level of definition
1022#[derive(Debug)]
1023struct SimpleAllNullDecodePageTask {
1024    num_values: u64,
1025}
1026impl DecodePageTask for SimpleAllNullDecodePageTask {
1027    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1028        let unraveler = RepDefUnraveler::new(
1029            None,
1030            Some(vec![1; self.num_values as usize]),
1031            Arc::new([DefinitionInterpretation::NullableItem]),
1032            self.num_values,
1033        );
1034        Ok(DecodedPage {
1035            data: DataBlock::AllNull(AllNullDataBlock {
1036                num_values: self.num_values,
1037            }),
1038            repdef: unraveler,
1039        })
1040    }
1041}
1042
1043#[derive(Debug)]
1044pub struct SimpleAllNullPageDecoder {
1045    num_rows: u64,
1046}
1047
1048impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1049    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1050        Ok(Box::new(SimpleAllNullDecodePageTask {
1051            num_values: num_rows,
1052        }))
1053    }
1054
1055    fn num_rows(&self) -> u64 {
1056        self.num_rows
1057    }
1058}
1059
1060#[derive(Debug, Clone)]
1061struct MiniBlockSchedulerDictionary {
1062    // These come from the protobuf
1063    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1064    dictionary_buf_position_and_size: (u64, u64),
1065    dictionary_data_alignment: u64,
1066    num_dictionary_items: u64,
1067}
1068
1069/// Individual block metadata within a MiniBlock repetition index.
1070#[derive(Debug)]
1071struct MiniBlockRepIndexBlock {
1072    // The index of the first row that starts after the beginning of this block.  If the block
1073    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1074    // then this will be a row that starts in some future block.
1075    first_row: u64,
1076    // The number of rows in the block, including the trailer but not the preamble.
1077    // Can be 0 if the block is entirely preamble
1078    starts_including_trailer: u64,
1079    // Whether the block has a preamble
1080    has_preamble: bool,
1081    // Whether the block has a trailer
1082    has_trailer: bool,
1083}
1084
1085impl DeepSizeOf for MiniBlockRepIndexBlock {
1086    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1087        0
1088    }
1089}
1090
1091/// Repetition index for MiniBlock encoding.
1092///
1093/// Stores block-level offset information to enable efficient random
1094/// access to nested data structures within mini-blocks.
1095#[derive(Debug)]
1096struct MiniBlockRepIndex {
1097    blocks: Vec<MiniBlockRepIndexBlock>,
1098}
1099
1100impl DeepSizeOf for MiniBlockRepIndex {
1101    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1102        self.blocks.deep_size_of_children(context)
1103    }
1104}
1105
1106impl MiniBlockRepIndex {
1107    /// Decode repetition index from chunk metadata using default values.
1108    ///
1109    /// This creates a repetition index where each chunk has no partial values
1110    /// and no trailers, suitable for simple sequential data layouts.
1111    pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1112        let mut blocks = Vec::with_capacity(chunks.len());
1113        let mut offset: u64 = 0;
1114
1115        for c in chunks {
1116            blocks.push(MiniBlockRepIndexBlock {
1117                first_row: offset,
1118                starts_including_trailer: c.num_values,
1119                has_preamble: false,
1120                has_trailer: false,
1121            });
1122
1123            offset += c.num_values;
1124        }
1125
1126        Self { blocks }
1127    }
1128
1129    /// Decode repetition index from raw bytes in little-endian format.
1130    ///
1131    /// The bytes should contain u64 values arranged in groups of `stride` elements,
1132    /// where the first two values of each group represent ends_count and partial_count.
1133    /// Returns an empty index if no bytes are provided.
1134    pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1135        // Convert bytes to u64 slice, handling alignment automatically
1136        let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1137        let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1138        let n = u64_slice.len() / stride;
1139
1140        let mut blocks = Vec::with_capacity(n);
1141        let mut chunk_has_preamble = false;
1142        let mut offset: u64 = 0;
1143
1144        // Extract first two values from each block: ends_count and partial_count
1145        for i in 0..n {
1146            let base_idx = i * stride;
1147            let ends = u64_slice[base_idx];
1148            let partial = u64_slice[base_idx + 1];
1149
1150            let has_trailer = partial > 0;
1151            // Convert branches to arithmetic for better compiler optimization
1152            let starts_including_trailer =
1153                ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1154
1155            blocks.push(MiniBlockRepIndexBlock {
1156                first_row: offset,
1157                starts_including_trailer,
1158                has_preamble: chunk_has_preamble,
1159                has_trailer,
1160            });
1161
1162            chunk_has_preamble = has_trailer;
1163            offset += starts_including_trailer;
1164        }
1165
1166        Self { blocks }
1167    }
1168}
1169
1170/// State that is loaded once and cached for future lookups
1171#[derive(Debug)]
1172struct MiniBlockCacheableState {
1173    /// Metadata that describes each chunk in the page
1174    chunk_meta: Vec<ChunkMeta>,
1175    /// The decoded repetition index
1176    rep_index: MiniBlockRepIndex,
1177    /// The dictionary for the page, if any
1178    dictionary: Option<Arc<DataBlock>>,
1179}
1180
1181impl DeepSizeOf for MiniBlockCacheableState {
1182    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1183        self.rep_index.deep_size_of_children(context)
1184            + self
1185                .dictionary
1186                .as_ref()
1187                .map(|dict| dict.data_size() as usize)
1188                .unwrap_or(0)
1189    }
1190}
1191
1192impl CachedPageData for MiniBlockCacheableState {
1193    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1194        self
1195    }
1196}
1197
1198/// A scheduler for a page that has been encoded with the mini-block layout
1199///
1200/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1201/// in practice.
1202///
1203/// First, during initialization, we load the chunk metadata, the repetition index,
1204/// and the dictionary (these last two may not be present)
1205///
1206/// Then, during scheduling, we use the user's requested row ranges and the repetition
1207/// index to determine which chunks we need and which rows we need from those chunks.
1208///
1209/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1210/// from the user is 40..60 then we need to:
1211///
1212///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1213///    then read 3 items for the 11th row of our range.
1214///  - Read the second chunk and read the remaining items in our 11th row and then read
1215///    the remaining 9 full rows.
1216///
1217/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1218/// The first two decode tasks will just need the first chunk.  The third decode task will
1219/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1220/// chunk.  The final decode task will just need the second chunk.
1221///
1222/// The above prose descriptions are what are represented by `ChunkInstructions` and
1223/// `ChunkDrainInstructions`.
1224#[derive(Debug)]
1225pub struct MiniBlockScheduler {
1226    // These come from the protobuf
1227    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1228    priority: u64,
1229    items_in_page: u64,
1230    repetition_index_depth: u16,
1231    num_buffers: u64,
1232    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1233    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1234    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1235    def_meaning: Arc<[DefinitionInterpretation]>,
1236    dictionary: Option<MiniBlockSchedulerDictionary>,
1237    // This is set after initialization
1238    page_meta: Option<Arc<MiniBlockCacheableState>>,
1239    has_large_chunk: bool,
1240}
1241
1242impl MiniBlockScheduler {
1243    fn try_new(
1244        buffer_offsets_and_sizes: &[(u64, u64)],
1245        priority: u64,
1246        items_in_page: u64,
1247        layout: &pb21::MiniBlockLayout,
1248        decompressors: &dyn DecompressionStrategy,
1249    ) -> Result<Self> {
1250        let rep_decompressor = layout
1251            .rep_compression
1252            .as_ref()
1253            .map(|rep_compression| {
1254                decompressors
1255                    .create_block_decompressor(rep_compression)
1256                    .map(Arc::from)
1257            })
1258            .transpose()?;
1259        let def_decompressor = layout
1260            .def_compression
1261            .as_ref()
1262            .map(|def_compression| {
1263                decompressors
1264                    .create_block_decompressor(def_compression)
1265                    .map(Arc::from)
1266            })
1267            .transpose()?;
1268        let def_meaning = layout
1269            .layers
1270            .iter()
1271            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1272            .collect::<Vec<_>>();
1273        let value_decompressor = decompressors.create_miniblock_decompressor(
1274            layout.value_compression.as_ref().unwrap(),
1275            decompressors,
1276        )?;
1277
1278        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1279            let num_dictionary_items = layout.num_dictionary_items;
1280            match dictionary_encoding.compression.as_ref().unwrap() {
1281                Compression::Variable(_) => Some(MiniBlockSchedulerDictionary {
1282                    dictionary_decompressor: decompressors
1283                        .create_block_decompressor(dictionary_encoding)?
1284                        .into(),
1285                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1286                    dictionary_data_alignment: 4,
1287                    num_dictionary_items,
1288                }),
1289                Compression::Flat(_) => Some(MiniBlockSchedulerDictionary {
1290                    dictionary_decompressor: decompressors
1291                        .create_block_decompressor(dictionary_encoding)?
1292                        .into(),
1293                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1294                    dictionary_data_alignment: 16,
1295                    num_dictionary_items,
1296                }),
1297                Compression::General(_) => Some(MiniBlockSchedulerDictionary {
1298                    dictionary_decompressor: decompressors
1299                        .create_block_decompressor(dictionary_encoding)?
1300                        .into(),
1301                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1302                    dictionary_data_alignment: 1,
1303                    num_dictionary_items,
1304                }),
1305                _ => unreachable!(
1306                    "Mini-block dictionary encoding must use Variable, Flat, or General compression"
1307                ),
1308            }
1309        } else {
1310            None
1311        };
1312
1313        Ok(Self {
1314            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1315            rep_decompressor,
1316            def_decompressor,
1317            value_decompressor: value_decompressor.into(),
1318            repetition_index_depth: layout.repetition_index_depth as u16,
1319            num_buffers: layout.num_buffers,
1320            priority,
1321            items_in_page,
1322            dictionary,
1323            def_meaning: def_meaning.into(),
1324            page_meta: None,
1325            has_large_chunk: layout.has_large_chunk,
1326        })
1327    }
1328
1329    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1330        let page_meta = self.page_meta.as_ref().unwrap();
1331        chunk_indices
1332            .iter()
1333            .map(|&chunk_idx| {
1334                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1335                let bytes_start = chunk_meta.offset_bytes;
1336                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1337                LoadedChunk {
1338                    byte_range: bytes_start..bytes_end,
1339                    items_in_chunk: chunk_meta.num_values,
1340                    chunk_idx,
1341                    data: LanceBuffer::empty(),
1342                }
1343            })
1344            .collect()
1345    }
1346}
1347
1348#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1349enum PreambleAction {
1350    Take,
1351    Skip,
1352    Absent,
1353}
1354
1355// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1356// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1357// us how exactly to read from the chunk.
1358//
1359// Examples:
1360//
1361// | Chunk 0     | Chunk 1   | Chunk 2   | Chunk 3 |
1362// | xxxxyyyyzzz | zzzzzzzzz | zzzzzzzzz | aaabbcc |
1363//
1364// Full read (0..6)
1365//
1366// Chunk 0: (several rows, ends with trailer)
1367//   preamble: absent
1368//   rows_to_skip: 0
1369//   rows_to_take: 3 (x, y, z)
1370//   take_trailer: true
1371//
1372// Chunk 1: (all preamble, ends with trailer)
1373//   preamble: take
1374//   rows_to_skip: 0
1375//   rows_to_take: 0
1376//   take_trailer: true
1377//
1378// Chunk 2: (all preamble, no trailer)
1379//   preamble: take
1380//   rows_to_skip: 0
1381//   rows_to_take: 0
1382//   take_trailer: false
1383//
1384// Chunk 3: (several rows, no trailer or preamble)
1385//   preamble: absent
1386//   rows_to_skip: 0
1387//   rows_to_take: 3 (a, b, c)
1388//   take_trailer: false
1389#[derive(Clone, Debug, PartialEq, Eq)]
1390struct ChunkInstructions {
1391    // The index of the chunk to read
1392    chunk_idx: usize,
1393    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1394    // is no repetition index there is never a preamble.
1395    //
1396    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1397    // that spans several chunks.
1398    preamble: PreambleAction,
1399    // How many complete rows (not including the preamble or trailer) to skip
1400    //
1401    // If this is non-zero then premable must not be Take
1402    rows_to_skip: u64,
1403    // How many rows to take.  If a row splits across chunks then we will count the row in the first
1404    // chunk that contains the row.
1405    rows_to_take: u64,
1406    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1407    // never a trailer.
1408    //
1409    // A chunk that is all preamble may or may not have a trailer.
1410    //
1411    // If this is true then we want to include the trailer
1412    take_trailer: bool,
1413}
1414
1415// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1416// start decoding them, based on a batch size, which might not align with what we scheduled.
1417//
1418// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1419//
1420// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1421// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1422// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1423// then take 10 rows.
1424//
1425// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1426//  -no preamble, skip 5, take 10, take trailer
1427//  -take preamble, skip 0, take 50, no trailer
1428//
1429// and we are draining 20 rows then the drain instructions for the first batch will be:
1430//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1431//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1432#[derive(Debug, PartialEq, Eq)]
1433struct ChunkDrainInstructions {
1434    chunk_instructions: ChunkInstructions,
1435    rows_to_skip: u64,
1436    rows_to_take: u64,
1437    preamble_action: PreambleAction,
1438}
1439
1440impl ChunkInstructions {
1441    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1442    //
1443    // We assume that `user_ranges` are in sorted order and non-overlapping
1444    //
1445    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1446    fn schedule_instructions(
1447        rep_index: &MiniBlockRepIndex,
1448        user_ranges: &[Range<u64>],
1449    ) -> Vec<Self> {
1450        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1451        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1452        // per row which can happen with lists.
1453        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1454
1455        for user_range in user_ranges {
1456            let mut rows_needed = user_range.end - user_range.start;
1457            let mut need_preamble = false;
1458
1459            // Need to find the first chunk with a first row >= user_range.start.  If there are
1460            // multiple chunks with the same first row we need to take the first one.
1461            let mut block_index = match rep_index
1462                .blocks
1463                .binary_search_by_key(&user_range.start, |block| block.first_row)
1464            {
1465                Ok(idx) => {
1466                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1467                    // are grabbing first eligible chunk
1468                    let mut idx = idx;
1469                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1470                        idx -= 1;
1471                    }
1472                    idx
1473                }
1474                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1475                Err(idx) => idx - 1,
1476            };
1477
1478            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1479
1480            while rows_needed > 0 || need_preamble {
1481                // Check if we've gone past the last block (should not happen)
1482                if block_index >= rep_index.blocks.len() {
1483                    log::warn!("schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early");
1484                    break;
1485                }
1486
1487                let chunk = &rep_index.blocks[block_index];
1488                let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1489
1490                // Handle blocks that are entirely preamble (rows_avail = 0)
1491                // These blocks have no rows to take but may have a preamble we need
1492                // We only look for preamble if to_skip == 0 (we're not skipping rows)
1493                if rows_avail == 0 && to_skip == 0 {
1494                    // Only process if this chunk has a preamble we need
1495                    if chunk.has_preamble && need_preamble {
1496                        chunk_instructions.push(Self {
1497                            chunk_idx: block_index,
1498                            preamble: PreambleAction::Take,
1499                            rows_to_skip: 0,
1500                            rows_to_take: 0,
1501                            // We still need to look at has_trailer to distinguish between "all preamble
1502                            // and row ends at end of chunk" and "all preamble and row bleeds into next
1503                            // chunk".  Both cases will have 0 rows available.
1504                            take_trailer: chunk.has_trailer,
1505                        });
1506                        // Only set need_preamble = false if the chunk has at least one row,
1507                        // Or we are reaching the last block,
1508                        // Otherwise, the chunk is entirely preamble and we need the next chunk's preamble too
1509                        if chunk.starts_including_trailer > 0
1510                            || block_index == rep_index.blocks.len() - 1
1511                        {
1512                            need_preamble = false;
1513                        }
1514                    }
1515                    // Move to next block
1516                    block_index += 1;
1517                    continue;
1518                }
1519
1520                // Edge case: if rows_avail == 0 but to_skip > 0
1521                // This theoretically shouldn't happen (binary search should avoid it)
1522                // but handle it for safety
1523                if rows_avail == 0 && to_skip > 0 {
1524                    // This block doesn't have enough rows to skip, move to next block
1525                    // Adjust to_skip by the number of rows in this block
1526                    to_skip -= chunk.starts_including_trailer;
1527                    block_index += 1;
1528                    continue;
1529                }
1530
1531                let rows_to_take = rows_avail.min(rows_needed);
1532                rows_needed -= rows_to_take;
1533
1534                let mut take_trailer = false;
1535                let preamble = if chunk.has_preamble {
1536                    if need_preamble {
1537                        PreambleAction::Take
1538                    } else {
1539                        PreambleAction::Skip
1540                    }
1541                } else {
1542                    PreambleAction::Absent
1543                };
1544
1545                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1546                if rows_to_take == rows_avail && chunk.has_trailer {
1547                    take_trailer = true;
1548                    need_preamble = true;
1549                } else {
1550                    need_preamble = false;
1551                };
1552
1553                chunk_instructions.push(Self {
1554                    preamble,
1555                    chunk_idx: block_index,
1556                    rows_to_skip: to_skip,
1557                    rows_to_take,
1558                    take_trailer,
1559                });
1560
1561                to_skip = 0;
1562                block_index += 1;
1563            }
1564        }
1565
1566        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1567        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1568        // rows of chunk 0 starting at 0")
1569        if user_ranges.len() > 1 {
1570            // TODO: Could probably optimize this allocation away
1571            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1572            let mut instructions_iter = chunk_instructions.into_iter();
1573            merged_instructions.push(instructions_iter.next().unwrap());
1574            for instruction in instructions_iter {
1575                let last = merged_instructions.last_mut().unwrap();
1576                if last.chunk_idx == instruction.chunk_idx
1577                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1578                {
1579                    last.rows_to_take += instruction.rows_to_take;
1580                    last.take_trailer |= instruction.take_trailer;
1581                } else {
1582                    merged_instructions.push(instruction);
1583                }
1584            }
1585            merged_instructions
1586        } else {
1587            chunk_instructions
1588        }
1589    }
1590
1591    fn drain_from_instruction(
1592        &self,
1593        rows_desired: &mut u64,
1594        need_preamble: &mut bool,
1595        skip_in_chunk: &mut u64,
1596    ) -> (ChunkDrainInstructions, bool) {
1597        // If we need the premable then we shouldn't be skipping anything
1598        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1599        let rows_avail = self.rows_to_take - *skip_in_chunk;
1600        let has_preamble = self.preamble != PreambleAction::Absent;
1601        let preamble_action = match (*need_preamble, has_preamble) {
1602            (true, true) => PreambleAction::Take,
1603            (true, false) => panic!("Need preamble but there isn't one"),
1604            (false, true) => PreambleAction::Skip,
1605            (false, false) => PreambleAction::Absent,
1606        };
1607
1608        // How many rows are we actually taking in this take step (including the preamble
1609        // and trailer both as individual rows)
1610        let rows_taking = if *rows_desired >= rows_avail {
1611            // We want all the rows.  If there is a trailer we are grabbing it and will need
1612            // the preamble of the next chunk
1613            // If there is a trailer and we are taking all the rows then we need the preamble
1614            // of the next chunk.
1615            //
1616            // Also, if this chunk is entirely preamble (rows_avail == 0 && !take_trailer) then we
1617            // need the preamble of the next chunk.
1618            *need_preamble = self.take_trailer;
1619            rows_avail
1620        } else {
1621            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1622            // it so we will not need the preamble
1623            *need_preamble = false;
1624            *rows_desired
1625        };
1626        let rows_skipped = *skip_in_chunk;
1627
1628        // Update the state for the next iteration
1629        let consumed_chunk = if *rows_desired >= rows_avail {
1630            *rows_desired -= rows_avail;
1631            *skip_in_chunk = 0;
1632            true
1633        } else {
1634            *skip_in_chunk += *rows_desired;
1635            *rows_desired = 0;
1636            false
1637        };
1638
1639        (
1640            ChunkDrainInstructions {
1641                chunk_instructions: self.clone(),
1642                rows_to_skip: rows_skipped,
1643                rows_to_take: rows_taking,
1644                preamble_action,
1645            },
1646            consumed_chunk,
1647        )
1648    }
1649}
1650
1651enum Words {
1652    U16(ScalarBuffer<u16>),
1653    U32(ScalarBuffer<u32>),
1654}
1655
1656struct WordsIter<'a> {
1657    iter: Box<dyn Iterator<Item = u32> + 'a>,
1658}
1659
1660impl Words {
1661    pub fn len(&self) -> usize {
1662        match self {
1663            Self::U16(b) => b.len(),
1664            Self::U32(b) => b.len(),
1665        }
1666    }
1667
1668    pub fn iter(&self) -> WordsIter<'_> {
1669        match self {
1670            Self::U16(buf) => WordsIter {
1671                iter: Box::new(buf.iter().map(|&x| x as u32)),
1672            },
1673            Self::U32(buf) => WordsIter {
1674                iter: Box::new(buf.iter().copied()),
1675            },
1676        }
1677    }
1678
1679    pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1680        let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1681        assert_eq!(bytes.len() % bytes_per_value, 0);
1682        let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1683        if has_large_chunk {
1684            Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1685        } else {
1686            Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1687        }
1688    }
1689}
1690
1691impl<'a> Iterator for WordsIter<'a> {
1692    type Item = u32;
1693
1694    fn next(&mut self) -> Option<Self::Item> {
1695        self.iter.next()
1696    }
1697}
1698
1699impl StructuralPageScheduler for MiniBlockScheduler {
1700    fn initialize<'a>(
1701        &'a mut self,
1702        io: &Arc<dyn EncodingsIo>,
1703    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1704        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1705        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1706        // need.
1707        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1708        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1709        let mut bufs_needed = 1;
1710        if self.dictionary.is_some() {
1711            bufs_needed += 1;
1712        }
1713        if self.repetition_index_depth > 0 {
1714            bufs_needed += 1;
1715        }
1716        let mut required_ranges = Vec::with_capacity(bufs_needed);
1717        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1718        if let Some(ref dictionary) = self.dictionary {
1719            required_ranges.push(
1720                dictionary.dictionary_buf_position_and_size.0
1721                    ..dictionary.dictionary_buf_position_and_size.0
1722                        + dictionary.dictionary_buf_position_and_size.1,
1723            );
1724        }
1725        if self.repetition_index_depth > 0 {
1726            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1727            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1728        }
1729        let io_req = io.submit_request(required_ranges, 0);
1730
1731        async move {
1732            let mut buffers = io_req.await?.into_iter().fuse();
1733            let meta_bytes = buffers.next().unwrap();
1734            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1735            let rep_index_bytes = buffers.next();
1736
1737            // Parse the metadata and build the chunk meta
1738            let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1739            let mut chunk_meta = Vec::with_capacity(words.len());
1740
1741            let mut rows_counter = 0;
1742            let mut offset_bytes = value_buf_position;
1743            for (word_idx, word) in words.iter().enumerate() {
1744                let log_num_values = word & 0x0F;
1745                let divided_bytes = word >> 4;
1746                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1747                debug_assert!(num_bytes > 0);
1748                let num_values = if word_idx < words.len() - 1 {
1749                    debug_assert!(log_num_values > 0);
1750                    1 << log_num_values
1751                } else {
1752                    debug_assert!(
1753                        log_num_values == 0
1754                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1755                    );
1756                    self.items_in_page - rows_counter
1757                };
1758                rows_counter += num_values;
1759
1760                chunk_meta.push(ChunkMeta {
1761                    num_values,
1762                    chunk_size_bytes: num_bytes as u64,
1763                    offset_bytes,
1764                });
1765                offset_bytes += num_bytes as u64;
1766            }
1767
1768            // Build the repetition index
1769            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1770                assert!(rep_index_data.len() % 8 == 0);
1771                let stride = self.repetition_index_depth as usize + 1;
1772                MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1773            } else {
1774                MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1775            };
1776
1777            let mut page_meta = MiniBlockCacheableState {
1778                chunk_meta,
1779                rep_index,
1780                dictionary: None,
1781            };
1782
1783            // decode dictionary
1784            if let Some(ref mut dictionary) = self.dictionary {
1785                let dictionary_data = dictionary_bytes.unwrap();
1786                page_meta.dictionary =
1787                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1788                        LanceBuffer::from_bytes(
1789                            dictionary_data,
1790                            dictionary.dictionary_data_alignment,
1791                        ),
1792                        dictionary.num_dictionary_items,
1793                    )?));
1794            };
1795            let page_meta = Arc::new(page_meta);
1796            self.page_meta = Some(page_meta.clone());
1797            Ok(page_meta as Arc<dyn CachedPageData>)
1798        }
1799        .boxed()
1800    }
1801
1802    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1803        self.page_meta = Some(
1804            data.clone()
1805                .as_arc_any()
1806                .downcast::<MiniBlockCacheableState>()
1807                .unwrap(),
1808        );
1809    }
1810
1811    fn schedule_ranges(
1812        &self,
1813        ranges: &[Range<u64>],
1814        io: &Arc<dyn EncodingsIo>,
1815    ) -> Result<Vec<PageLoadTask>> {
1816        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1817
1818        let page_meta = self.page_meta.as_ref().unwrap();
1819
1820        let chunk_instructions =
1821            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1822
1823        debug_assert_eq!(
1824            num_rows,
1825            chunk_instructions
1826                .iter()
1827                .map(|ci| ci.rows_to_take)
1828                .sum::<u64>()
1829        );
1830
1831        let chunks_needed = chunk_instructions
1832            .iter()
1833            .map(|ci| ci.chunk_idx)
1834            .unique()
1835            .collect::<Vec<_>>();
1836
1837        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1838        let chunk_ranges = loaded_chunks
1839            .iter()
1840            .map(|c| c.byte_range.clone())
1841            .collect::<Vec<_>>();
1842        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1843
1844        let rep_decompressor = self.rep_decompressor.clone();
1845        let def_decompressor = self.def_decompressor.clone();
1846        let value_decompressor = self.value_decompressor.clone();
1847        let num_buffers = self.num_buffers;
1848        let has_large_chunk = self.has_large_chunk;
1849        let dictionary = page_meta
1850            .dictionary
1851            .as_ref()
1852            .map(|dictionary| dictionary.clone());
1853        let def_meaning = self.def_meaning.clone();
1854
1855        let res = async move {
1856            let loaded_chunk_data = loaded_chunk_data.await?;
1857            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1858                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1859            }
1860
1861            Ok(Box::new(MiniBlockDecoder {
1862                rep_decompressor,
1863                def_decompressor,
1864                value_decompressor,
1865                def_meaning,
1866                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1867                instructions: VecDeque::from(chunk_instructions),
1868                offset_in_current_chunk: 0,
1869                dictionary,
1870                num_rows,
1871                num_buffers,
1872                has_large_chunk,
1873            }) as Box<dyn StructuralPageDecoder>)
1874        }
1875        .boxed();
1876        let page_load_task = PageLoadTask {
1877            decoder_fut: res,
1878            num_rows,
1879        };
1880        Ok(vec![page_load_task])
1881    }
1882}
1883
1884#[derive(Debug, Clone, Copy)]
1885struct FullZipRepIndexDetails {
1886    buf_position: u64,
1887    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1888}
1889
1890#[derive(Debug)]
1891enum PerValueDecompressor {
1892    Fixed(Arc<dyn FixedPerValueDecompressor>),
1893    Variable(Arc<dyn VariablePerValueDecompressor>),
1894}
1895
1896#[derive(Debug)]
1897struct FullZipDecodeDetails {
1898    value_decompressor: PerValueDecompressor,
1899    def_meaning: Arc<[DefinitionInterpretation]>,
1900    ctrl_word_parser: ControlWordParser,
1901    max_rep: u16,
1902    max_visible_def: u16,
1903}
1904
1905/// A scheduler for full-zip encoded data
1906///
1907/// When the data type has a fixed-width then we simply need to map from
1908/// row ranges to byte ranges using the fixed-width of the data type.
1909///
1910/// When the data type is variable-width or has any repetition then a
1911/// repetition index is required.
1912#[derive(Debug)]
1913pub struct FullZipScheduler {
1914    data_buf_position: u64,
1915    rep_index: Option<FullZipRepIndexDetails>,
1916    priority: u64,
1917    rows_in_page: u64,
1918    bits_per_offset: u8,
1919    details: Arc<FullZipDecodeDetails>,
1920    /// Cached state containing the decoded repetition index
1921    cached_state: Option<Arc<FullZipCacheableState>>,
1922    /// Whether to enable caching of repetition indices
1923    enable_cache: bool,
1924}
1925
1926impl FullZipScheduler {
1927    fn try_new(
1928        buffer_offsets_and_sizes: &[(u64, u64)],
1929        priority: u64,
1930        rows_in_page: u64,
1931        layout: &pb21::FullZipLayout,
1932        decompressors: &dyn DecompressionStrategy,
1933    ) -> Result<Self> {
1934        // We don't need the data_buf_size because either the data type is
1935        // fixed-width (and we can tell size from rows_in_page) or it is not
1936        // and we have a repetition index.
1937        let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1938        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1939            let num_reps = rows_in_page + 1;
1940            let bytes_per_rep = len / num_reps;
1941            debug_assert_eq!(len % num_reps, 0);
1942            debug_assert!(
1943                bytes_per_rep == 1
1944                    || bytes_per_rep == 2
1945                    || bytes_per_rep == 4
1946                    || bytes_per_rep == 8
1947            );
1948            FullZipRepIndexDetails {
1949                buf_position: *pos,
1950                bytes_per_value: bytes_per_rep,
1951            }
1952        });
1953
1954        let value_decompressor = match layout.details {
1955            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
1956                let decompressor = decompressors.create_fixed_per_value_decompressor(
1957                    layout.value_compression.as_ref().unwrap(),
1958                )?;
1959                PerValueDecompressor::Fixed(decompressor.into())
1960            }
1961            Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
1962                let decompressor = decompressors.create_variable_per_value_decompressor(
1963                    layout.value_compression.as_ref().unwrap(),
1964                )?;
1965                PerValueDecompressor::Variable(decompressor.into())
1966            }
1967            None => {
1968                panic!("Full-zip layout must have a `details` field");
1969            }
1970        };
1971        let ctrl_word_parser = ControlWordParser::new(
1972            layout.bits_rep.try_into().unwrap(),
1973            layout.bits_def.try_into().unwrap(),
1974        );
1975        let def_meaning = layout
1976            .layers
1977            .iter()
1978            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1979            .collect::<Vec<_>>();
1980
1981        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1982        let max_visible_def = def_meaning
1983            .iter()
1984            .filter(|d| !d.is_list())
1985            .map(|d| d.num_def_levels())
1986            .sum();
1987
1988        let bits_per_offset = match layout.details {
1989            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
1990            Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1991                bits_per_offset as u8
1992            }
1993            None => panic!("Full-zip layout must have a `details` field"),
1994        };
1995
1996        let details = Arc::new(FullZipDecodeDetails {
1997            value_decompressor,
1998            def_meaning: def_meaning.into(),
1999            ctrl_word_parser,
2000            max_rep,
2001            max_visible_def,
2002        });
2003        Ok(Self {
2004            data_buf_position,
2005            rep_index,
2006            details,
2007            priority,
2008            rows_in_page,
2009            bits_per_offset,
2010            cached_state: None,
2011            enable_cache: false, // Default to false, will be set later
2012        })
2013    }
2014
2015    /// Creates a decoder from the loaded data
2016    fn create_decoder(
2017        details: Arc<FullZipDecodeDetails>,
2018        data: VecDeque<LanceBuffer>,
2019        num_rows: u64,
2020        bits_per_offset: u8,
2021    ) -> Result<Box<dyn StructuralPageDecoder>> {
2022        match &details.value_decompressor {
2023            PerValueDecompressor::Fixed(decompressor) => {
2024                let bits_per_value = decompressor.bits_per_value();
2025                if bits_per_value == 0 {
2026                    return Err(lance_core::Error::Internal {
2027                        message: "Invalid encoding: bits_per_value must be greater than 0".into(),
2028                        location: location!(),
2029                    });
2030                }
2031                if bits_per_value % 8 != 0 {
2032                    return Err(lance_core::Error::NotSupported {
2033                        source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
2034                        location: location!(),
2035                    });
2036                }
2037                let bytes_per_value = bits_per_value / 8;
2038                let total_bytes_per_value =
2039                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2040                Ok(Box::new(FixedFullZipDecoder {
2041                    details,
2042                    data,
2043                    num_rows,
2044                    offset_in_current: 0,
2045                    bytes_per_value: bytes_per_value as usize,
2046                    total_bytes_per_value,
2047                }) as Box<dyn StructuralPageDecoder>)
2048            }
2049            PerValueDecompressor::Variable(_decompressor) => {
2050                Ok(Box::new(VariableFullZipDecoder::new(
2051                    details,
2052                    data,
2053                    num_rows,
2054                    bits_per_offset,
2055                    bits_per_offset,
2056                )))
2057            }
2058        }
2059    }
2060
2061    /// Extracts byte ranges from a repetition index buffer
2062    /// The buffer contains pairs of (start, end) values for each range
2063    fn extract_byte_ranges_from_pairs(
2064        buffer: LanceBuffer,
2065        bytes_per_value: u64,
2066        data_buf_position: u64,
2067    ) -> Vec<Range<u64>> {
2068        ByteUnpacker::new(buffer, bytes_per_value as usize)
2069            .chunks(2)
2070            .into_iter()
2071            .map(|mut c| {
2072                let start = c.next().unwrap() + data_buf_position;
2073                let end = c.next().unwrap() + data_buf_position;
2074                start..end
2075            })
2076            .collect::<Vec<_>>()
2077    }
2078
2079    /// Extracts byte ranges from a cached repetition index buffer
2080    /// The buffer contains all values and we need to extract specific ranges
2081    fn extract_byte_ranges_from_cached(
2082        buffer: &LanceBuffer,
2083        ranges: &[Range<u64>],
2084        bytes_per_value: u64,
2085        data_buf_position: u64,
2086    ) -> Vec<Range<u64>> {
2087        ranges
2088            .iter()
2089            .map(|r| {
2090                let start_offset = (r.start * bytes_per_value) as usize;
2091                let end_offset = (r.end * bytes_per_value) as usize;
2092
2093                let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2094                let start_val =
2095                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2096                        .next()
2097                        .unwrap();
2098
2099                let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2100                let end_val =
2101                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2102                        .next()
2103                        .unwrap();
2104
2105                (data_buf_position + start_val)..(data_buf_position + end_val)
2106            })
2107            .collect()
2108    }
2109
2110    /// Computes the ranges in the repetition index that need to be loaded
2111    fn compute_rep_index_ranges(
2112        ranges: &[Range<u64>],
2113        rep_index: &FullZipRepIndexDetails,
2114    ) -> Vec<Range<u64>> {
2115        ranges
2116            .iter()
2117            .flat_map(|r| {
2118                let first_val_start =
2119                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2120                let first_val_end = first_val_start + rep_index.bytes_per_value;
2121                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2122                let last_val_end = last_val_start + rep_index.bytes_per_value;
2123                [first_val_start..first_val_end, last_val_start..last_val_end]
2124            })
2125            .collect()
2126    }
2127
2128    /// Resolves byte ranges from repetition index (either from cache or disk)
2129    async fn resolve_byte_ranges(
2130        data_buf_position: u64,
2131        ranges: &[Range<u64>],
2132        io: &Arc<dyn EncodingsIo>,
2133        rep_index: &FullZipRepIndexDetails,
2134        cached_state: Option<&Arc<FullZipCacheableState>>,
2135        priority: u64,
2136    ) -> Result<Vec<Range<u64>>> {
2137        if let Some(cached_state) = cached_state {
2138            // Use cached repetition index
2139            Ok(Self::extract_byte_ranges_from_cached(
2140                &cached_state.rep_index_buffer,
2141                ranges,
2142                rep_index.bytes_per_value,
2143                data_buf_position,
2144            ))
2145        } else {
2146            // Load from disk
2147            let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
2148            let rep_data = io.submit_request(rep_ranges, priority).await?;
2149            let rep_buffer = LanceBuffer::concat(
2150                &rep_data
2151                    .into_iter()
2152                    .map(|d| LanceBuffer::from_bytes(d, 1))
2153                    .collect::<Vec<_>>(),
2154            );
2155            Ok(Self::extract_byte_ranges_from_pairs(
2156                rep_buffer,
2157                rep_index.bytes_per_value,
2158                data_buf_position,
2159            ))
2160        }
2161    }
2162
2163    /// Schedules ranges in the presence of a repetition index
2164    fn schedule_ranges_rep(
2165        &self,
2166        ranges: &[Range<u64>],
2167        io: &Arc<dyn EncodingsIo>,
2168        rep_index: FullZipRepIndexDetails,
2169    ) -> Result<Vec<PageLoadTask>> {
2170        // Copy necessary fields to avoid lifetime issues
2171        let data_buf_position = self.data_buf_position;
2172        let cached_state = self.cached_state.clone();
2173        let priority = self.priority;
2174        let details = self.details.clone();
2175        let bits_per_offset = self.bits_per_offset;
2176        let ranges = ranges.to_vec();
2177        let io_clone = io.clone();
2178        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2179
2180        let load_task = async move {
2181            // Step 1: Resolve byte ranges from repetition index
2182            let byte_ranges = Self::resolve_byte_ranges(
2183                data_buf_position,
2184                &ranges,
2185                &io_clone,
2186                &rep_index,
2187                cached_state.as_ref(),
2188                priority,
2189            )
2190            .await?;
2191
2192            // Step 2: Load data
2193            let data = io_clone.submit_request(byte_ranges, priority).await?;
2194            let data = data
2195                .into_iter()
2196                .map(|d| LanceBuffer::from_bytes(d, 1))
2197                .collect::<VecDeque<_>>();
2198
2199            // Step 3: Calculate total rows
2200            let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2201
2202            // Step 4: Create decoder
2203            Self::create_decoder(details, data, num_rows, bits_per_offset)
2204        }
2205        .boxed();
2206        let page_load_task = PageLoadTask {
2207            decoder_fut: load_task,
2208            num_rows,
2209        };
2210        Ok(vec![page_load_task])
2211    }
2212
2213    // In the simple case there is no repetition and we just have large fixed-width
2214    // rows of data.  We can just map row ranges to byte ranges directly using the
2215    // fixed-width of the data type.
2216    fn schedule_ranges_simple(
2217        &self,
2218        ranges: &[Range<u64>],
2219        io: &dyn EncodingsIo,
2220    ) -> Result<Vec<PageLoadTask>> {
2221        // Convert row ranges to item ranges (i.e. multiply by items per row)
2222        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2223
2224        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2225            unreachable!()
2226        };
2227
2228        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2229        let bits_per_value = decompressor.bits_per_value();
2230        assert_eq!(bits_per_value % 8, 0);
2231        let bytes_per_value = bits_per_value / 8;
2232        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2233        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2234        let byte_ranges = ranges.iter().map(|r| {
2235            debug_assert!(r.end <= self.rows_in_page);
2236            let start = self.data_buf_position + r.start * total_bytes_per_value;
2237            let end = self.data_buf_position + r.end * total_bytes_per_value;
2238            start..end
2239        });
2240
2241        // Request byte ranges
2242        let data = io.submit_request(byte_ranges.collect(), self.priority);
2243
2244        let details = self.details.clone();
2245
2246        let load_task = async move {
2247            let data = data.await?;
2248            let data = data
2249                .into_iter()
2250                .map(|d| LanceBuffer::from_bytes(d, 1))
2251                .collect();
2252            Ok(Box::new(FixedFullZipDecoder {
2253                details,
2254                data,
2255                num_rows,
2256                offset_in_current: 0,
2257                bytes_per_value: bytes_per_value as usize,
2258                total_bytes_per_value: total_bytes_per_value as usize,
2259            }) as Box<dyn StructuralPageDecoder>)
2260        }
2261        .boxed();
2262        let page_load_task = PageLoadTask {
2263            decoder_fut: load_task,
2264            num_rows,
2265        };
2266        Ok(vec![page_load_task])
2267    }
2268}
2269
2270/// Cacheable state for FullZip encoding, storing the decoded repetition index
2271#[derive(Debug)]
2272struct FullZipCacheableState {
2273    /// The raw repetition index buffer for future decoding
2274    rep_index_buffer: LanceBuffer,
2275}
2276
2277impl DeepSizeOf for FullZipCacheableState {
2278    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2279        self.rep_index_buffer.len()
2280    }
2281}
2282
2283impl CachedPageData for FullZipCacheableState {
2284    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2285        self
2286    }
2287}
2288
2289impl StructuralPageScheduler for FullZipScheduler {
2290    /// Initializes the scheduler. If there's a repetition index, loads and caches it.
2291    /// Otherwise returns NoCachedPageData.
2292    fn initialize<'a>(
2293        &'a mut self,
2294        io: &Arc<dyn EncodingsIo>,
2295    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2296        // Check if caching is enabled and we have a repetition index
2297        if self.enable_cache && self.rep_index.is_some() {
2298            let rep_index = self.rep_index.as_ref().unwrap();
2299            // Calculate the total size of the repetition index
2300            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2301            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2302
2303            // Load the repetition index buffer
2304            let io_clone = io.clone();
2305            let future = async move {
2306                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2307                let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2308
2309                // Create and return the cacheable state
2310                Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2311            };
2312
2313            future.boxed()
2314        } else {
2315            // Caching disabled or no repetition index, skip caching
2316            std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2317        }
2318    }
2319
2320    /// Loads previously cached repetition index data from the cache system.
2321    /// This method is called when a scheduler instance needs to use cached data
2322    /// that was initialized by another instance or in a previous operation.
2323    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2324        // Try to downcast to our specific cache type
2325        if let Ok(cached_state) = cache
2326            .clone()
2327            .as_arc_any()
2328            .downcast::<FullZipCacheableState>()
2329        {
2330            // Store the cached state for use in schedule_ranges
2331            self.cached_state = Some(cached_state);
2332        }
2333    }
2334
2335    fn schedule_ranges(
2336        &self,
2337        ranges: &[Range<u64>],
2338        io: &Arc<dyn EncodingsIo>,
2339    ) -> Result<Vec<PageLoadTask>> {
2340        if let Some(rep_index) = self.rep_index {
2341            self.schedule_ranges_rep(ranges, io, rep_index)
2342        } else {
2343            self.schedule_ranges_simple(ranges, io.as_ref())
2344        }
2345    }
2346}
2347
2348/// A decoder for full-zip encoded data when the data has a fixed-width
2349///
2350/// Here we need to unzip the control words from the values themselves and
2351/// then decompress the requested values.
2352///
2353/// We use a PerValueDecompressor because we will only be decompressing the
2354/// requested data.  This decoder / scheduler does not do any read amplification.
2355#[derive(Debug)]
2356struct FixedFullZipDecoder {
2357    details: Arc<FullZipDecodeDetails>,
2358    data: VecDeque<LanceBuffer>,
2359    offset_in_current: usize,
2360    bytes_per_value: usize,
2361    total_bytes_per_value: usize,
2362    num_rows: u64,
2363}
2364
2365impl FixedFullZipDecoder {
2366    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2367        debug_assert!(num_rows > 0);
2368        let cur_buf = self.data.front_mut().unwrap();
2369        let start = self.offset_in_current;
2370        if self.details.ctrl_word_parser.has_rep() {
2371            // This is a slightly slower path.  In order to figure out where to split we need to
2372            // examine the rep index so we can convert num_lists to num_rows
2373            let mut rows_started = 0;
2374            // We always need at least one value.  Now loop through until we have passed num_rows
2375            // values
2376            let mut num_items = 0;
2377            while self.offset_in_current < cur_buf.len() {
2378                let control = self.details.ctrl_word_parser.parse_desc(
2379                    &cur_buf[self.offset_in_current..],
2380                    self.details.max_rep,
2381                    self.details.max_visible_def,
2382                );
2383                if control.is_new_row {
2384                    if rows_started == num_rows {
2385                        break;
2386                    }
2387                    rows_started += 1;
2388                }
2389                num_items += 1;
2390                if control.is_visible {
2391                    self.offset_in_current += self.total_bytes_per_value;
2392                } else {
2393                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2394                }
2395            }
2396
2397            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2398            if self.offset_in_current == cur_buf.len() {
2399                self.data.pop_front();
2400                self.offset_in_current = 0;
2401            }
2402
2403            FullZipDecodeTaskItem {
2404                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2405                    data: task_slice,
2406                    bits_per_value: self.bytes_per_value as u64 * 8,
2407                    num_values: num_items,
2408                    block_info: BlockInfo::new(),
2409                }),
2410                rows_in_buf: rows_started,
2411            }
2412        } else {
2413            // If there's no repetition we can calculate the slicing point by just multiplying
2414            // the number of rows by the total bytes per value
2415            let cur_buf = self.data.front_mut().unwrap();
2416            let bytes_avail = cur_buf.len() - self.offset_in_current;
2417            let offset_in_cur = self.offset_in_current;
2418
2419            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2420            let mut rows_taken = num_rows;
2421            let task_slice = if bytes_needed >= bytes_avail {
2422                self.offset_in_current = 0;
2423                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2424                self.data
2425                    .pop_front()
2426                    .unwrap()
2427                    .slice_with_length(offset_in_cur, bytes_avail)
2428            } else {
2429                self.offset_in_current += bytes_needed;
2430                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2431            };
2432            FullZipDecodeTaskItem {
2433                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2434                    data: task_slice,
2435                    bits_per_value: self.bytes_per_value as u64 * 8,
2436                    num_values: rows_taken,
2437                    block_info: BlockInfo::new(),
2438                }),
2439                rows_in_buf: rows_taken,
2440            }
2441        }
2442    }
2443}
2444
2445impl StructuralPageDecoder for FixedFullZipDecoder {
2446    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2447        let mut task_data = Vec::with_capacity(self.data.len());
2448        let mut remaining = num_rows;
2449        while remaining > 0 {
2450            let task_item = self.slice_next_task(remaining);
2451            remaining -= task_item.rows_in_buf;
2452            task_data.push(task_item);
2453        }
2454        Ok(Box::new(FixedFullZipDecodeTask {
2455            details: self.details.clone(),
2456            data: task_data,
2457            bytes_per_value: self.bytes_per_value,
2458            num_rows: num_rows as usize,
2459        }))
2460    }
2461
2462    fn num_rows(&self) -> u64 {
2463        self.num_rows
2464    }
2465}
2466
2467/// A decoder for full-zip encoded data when the data has a variable-width
2468///
2469/// Here we need to unzip the control words AND lengths from the values and
2470/// then decompress the requested values.
2471#[derive(Debug)]
2472struct VariableFullZipDecoder {
2473    details: Arc<FullZipDecodeDetails>,
2474    decompressor: Arc<dyn VariablePerValueDecompressor>,
2475    data: LanceBuffer,
2476    offsets: LanceBuffer,
2477    rep: ScalarBuffer<u16>,
2478    def: ScalarBuffer<u16>,
2479    repdef_starts: Vec<usize>,
2480    data_starts: Vec<usize>,
2481    offset_starts: Vec<usize>,
2482    visible_item_counts: Vec<u64>,
2483    bits_per_offset: u8,
2484    current_idx: usize,
2485    num_rows: u64,
2486}
2487
2488impl VariableFullZipDecoder {
2489    fn new(
2490        details: Arc<FullZipDecodeDetails>,
2491        data: VecDeque<LanceBuffer>,
2492        num_rows: u64,
2493        in_bits_per_length: u8,
2494        out_bits_per_offset: u8,
2495    ) -> Self {
2496        let decompressor = match details.value_decompressor {
2497            PerValueDecompressor::Variable(ref d) => d.clone(),
2498            _ => unreachable!(),
2499        };
2500
2501        assert_eq!(in_bits_per_length % 8, 0);
2502        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2503
2504        let mut decoder = Self {
2505            details,
2506            decompressor,
2507            data: LanceBuffer::empty(),
2508            offsets: LanceBuffer::empty(),
2509            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2510            def: LanceBuffer::empty().borrow_to_typed_slice(),
2511            bits_per_offset: out_bits_per_offset,
2512            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2513            data_starts: Vec::with_capacity(num_rows as usize + 1),
2514            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2515            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2516            current_idx: 0,
2517            num_rows,
2518        };
2519
2520        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2521        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2522        // unpacked to know where the values start and end.
2523        //
2524        // We don't want to unzip on the decode thread because that is a single-threaded path
2525        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2526        //
2527        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2528        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2529        // we load all the data into memory and then throw it away only to load it all into memory again during
2530        // the decode.
2531        //
2532        // There are some alternatives to investigate:
2533        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2534        //     range in between.  This will give us the break points that we need for slicing and won't increase
2535        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2536        //     even when doing a full scan.
2537        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2538        //     has to do more work but the work is all fused.
2539        //   - We could just try doing this work on the decode thread and see if it is a problem.
2540        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2541
2542        decoder
2543    }
2544
2545    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2546        match bits_per_offset {
2547            8 => *data.get_unchecked(0) as u64,
2548            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2549            32 => u32::from_le_bytes([
2550                *data.get_unchecked(0),
2551                *data.get_unchecked(1),
2552                *data.get_unchecked(2),
2553                *data.get_unchecked(3),
2554            ]) as u64,
2555            64 => u64::from_le_bytes([
2556                *data.get_unchecked(0),
2557                *data.get_unchecked(1),
2558                *data.get_unchecked(2),
2559                *data.get_unchecked(3),
2560                *data.get_unchecked(4),
2561                *data.get_unchecked(5),
2562                *data.get_unchecked(6),
2563                *data.get_unchecked(7),
2564            ]),
2565            _ => unreachable!(),
2566        }
2567    }
2568
2569    fn unzip(
2570        &mut self,
2571        data: VecDeque<LanceBuffer>,
2572        in_bits_per_length: u8,
2573        out_bits_per_offset: u8,
2574        num_rows: u64,
2575    ) {
2576        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2577        let mut rep = Vec::with_capacity(num_rows as usize);
2578        let mut def = Vec::with_capacity(num_rows as usize);
2579        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2580
2581        // This undercounts if there are lists
2582        // It can also overcount if there are invisible items
2583        let bytes_per_offset = out_bits_per_offset as usize / 8;
2584        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2585        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2586
2587        let bytes_per_length = in_bits_per_length as usize / 8;
2588        let bytes_lengths = bytes_per_length * num_rows as usize;
2589
2590        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2591        // This overcounts since bytes_lengths and bytes_cw are undercounts
2592        // It can also undercount if there are invisible items (hence the saturating_sub)
2593        let mut unzipped_data =
2594            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2595
2596        let mut current_offset = 0_u64;
2597        let mut visible_item_count = 0_u64;
2598        for databuf in data.into_iter() {
2599            let mut databuf = databuf.as_ref();
2600            while !databuf.is_empty() {
2601                let data_start = unzipped_data.len();
2602                let offset_start = offsets_data.len();
2603                // We might have only-rep or only-def, neither, or both.  They move at the same
2604                // speed though so we only need one index into it
2605                let repdef_start = rep.len().max(def.len());
2606                // TODO: Kind of inefficient we parse the control word twice here
2607                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2608                    databuf,
2609                    self.details.max_rep,
2610                    self.details.max_visible_def,
2611                );
2612                self.details
2613                    .ctrl_word_parser
2614                    .parse(databuf, &mut rep, &mut def);
2615                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2616
2617                if ctrl_desc.is_new_row {
2618                    self.repdef_starts.push(repdef_start);
2619                    self.data_starts.push(data_start);
2620                    self.offset_starts.push(offset_start);
2621                    self.visible_item_counts.push(visible_item_count);
2622                }
2623                if ctrl_desc.is_visible {
2624                    visible_item_count += 1;
2625                    if ctrl_desc.is_valid_item {
2626                        // Safety: Data should have at least bytes_per_length bytes remaining
2627                        debug_assert!(databuf.len() >= bytes_per_length);
2628                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2629                        match out_bits_per_offset {
2630                            32 => offsets_data
2631                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2632                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2633                            _ => unreachable!(),
2634                        };
2635                        databuf = &databuf[bytes_per_offset..];
2636                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2637                        databuf = &databuf[length as usize..];
2638                        current_offset += length;
2639                    } else {
2640                        // Null items still get an offset
2641                        match out_bits_per_offset {
2642                            32 => offsets_data
2643                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2644                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2645                            _ => unreachable!(),
2646                        }
2647                    }
2648                }
2649            }
2650        }
2651        self.repdef_starts.push(rep.len().max(def.len()));
2652        self.data_starts.push(unzipped_data.len());
2653        self.offset_starts.push(offsets_data.len());
2654        self.visible_item_counts.push(visible_item_count);
2655        match out_bits_per_offset {
2656            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2657            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2658            _ => unreachable!(),
2659        };
2660        self.rep = ScalarBuffer::from(rep);
2661        self.def = ScalarBuffer::from(def);
2662        self.data = LanceBuffer::from(unzipped_data);
2663        self.offsets = LanceBuffer::from(offsets_data);
2664    }
2665}
2666
2667impl StructuralPageDecoder for VariableFullZipDecoder {
2668    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2669        let start = self.current_idx;
2670        let end = start + num_rows as usize;
2671
2672        // This might seem a little peculiar.  We are returning the entire data for every single
2673        // batch.  This is because the offsets are relative to the start of the data.  In other words
2674        // imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
2675        // and we return in batches of two.  The second set of offsets will be [20, 30, 40].
2676        //
2677        // So either we pay for a copy to normalize the offsets or we just return the entire data buffer
2678        // which is slightly cheaper.
2679        let data = self.data.clone();
2680
2681        let offset_start = self.offset_starts[start];
2682        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2683        let offsets = self
2684            .offsets
2685            .slice_with_length(offset_start, offset_end - offset_start);
2686
2687        let repdef_start = self.repdef_starts[start];
2688        let repdef_end = self.repdef_starts[end];
2689        let rep = if self.rep.is_empty() {
2690            self.rep.clone()
2691        } else {
2692            self.rep.slice(repdef_start, repdef_end - repdef_start)
2693        };
2694        let def = if self.def.is_empty() {
2695            self.def.clone()
2696        } else {
2697            self.def.slice(repdef_start, repdef_end - repdef_start)
2698        };
2699
2700        let visible_item_counts_start = self.visible_item_counts[start];
2701        let visible_item_counts_end = self.visible_item_counts[end];
2702        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2703
2704        self.current_idx += num_rows as usize;
2705
2706        Ok(Box::new(VariableFullZipDecodeTask {
2707            details: self.details.clone(),
2708            decompressor: self.decompressor.clone(),
2709            data,
2710            offsets,
2711            bits_per_offset: self.bits_per_offset,
2712            num_visible_items,
2713            rep,
2714            def,
2715        }))
2716    }
2717
2718    fn num_rows(&self) -> u64 {
2719        self.num_rows
2720    }
2721}
2722
2723#[derive(Debug)]
2724struct VariableFullZipDecodeTask {
2725    details: Arc<FullZipDecodeDetails>,
2726    decompressor: Arc<dyn VariablePerValueDecompressor>,
2727    data: LanceBuffer,
2728    offsets: LanceBuffer,
2729    bits_per_offset: u8,
2730    num_visible_items: u64,
2731    rep: ScalarBuffer<u16>,
2732    def: ScalarBuffer<u16>,
2733}
2734
2735impl DecodePageTask for VariableFullZipDecodeTask {
2736    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2737        let block = VariableWidthBlock {
2738            data: self.data,
2739            offsets: self.offsets,
2740            bits_per_offset: self.bits_per_offset,
2741            num_values: self.num_visible_items,
2742            block_info: BlockInfo::new(),
2743        };
2744        let decomopressed = self.decompressor.decompress(block)?;
2745        let rep = if self.rep.is_empty() {
2746            None
2747        } else {
2748            Some(self.rep.to_vec())
2749        };
2750        let def = if self.def.is_empty() {
2751            None
2752        } else {
2753            Some(self.def.to_vec())
2754        };
2755        let unraveler = RepDefUnraveler::new(
2756            rep,
2757            def,
2758            self.details.def_meaning.clone(),
2759            self.num_visible_items,
2760        );
2761        Ok(DecodedPage {
2762            data: decomopressed,
2763            repdef: unraveler,
2764        })
2765    }
2766}
2767
2768#[derive(Debug)]
2769struct FullZipDecodeTaskItem {
2770    data: PerValueDataBlock,
2771    rows_in_buf: u64,
2772}
2773
2774/// A task to unzip and decompress full-zip encoded data when that data
2775/// has a fixed-width.
2776#[derive(Debug)]
2777struct FixedFullZipDecodeTask {
2778    details: Arc<FullZipDecodeDetails>,
2779    data: Vec<FullZipDecodeTaskItem>,
2780    num_rows: usize,
2781    bytes_per_value: usize,
2782}
2783
2784impl DecodePageTask for FixedFullZipDecodeTask {
2785    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2786        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2787        let estimated_size_bytes = self
2788            .data
2789            .iter()
2790            .map(|task_item| task_item.data.data_size() as usize)
2791            .sum::<usize>()
2792            * 2;
2793        let mut data_builder =
2794            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2795
2796        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2797            // Fast path, no need to unzip because there is no rep/def
2798            //
2799            // We decompress each buffer and add it to our output buffer
2800            for task_item in self.data.into_iter() {
2801                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2802                    unreachable!()
2803                };
2804                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2805                else {
2806                    unreachable!()
2807                };
2808                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2809                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2810                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2811            }
2812
2813            let unraveler = RepDefUnraveler::new(
2814                None,
2815                None,
2816                self.details.def_meaning.clone(),
2817                self.num_rows as u64,
2818            );
2819
2820            Ok(DecodedPage {
2821                data: data_builder.finish(),
2822                repdef: unraveler,
2823            })
2824        } else {
2825            // Slow path, unzipping needed
2826            let mut rep = Vec::with_capacity(self.num_rows);
2827            let mut def = Vec::with_capacity(self.num_rows);
2828
2829            for task_item in self.data.into_iter() {
2830                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2831                    unreachable!()
2832                };
2833                let mut buf_slice = fixed_data.data.as_ref();
2834                let num_values = fixed_data.num_values as usize;
2835                // We will be unzipping repdef in to `rep` and `def` and the
2836                // values into `values` (which contains the compressed values)
2837                let mut values = Vec::with_capacity(
2838                    fixed_data.data.len()
2839                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2840                );
2841                let mut visible_items = 0;
2842                for _ in 0..num_values {
2843                    // Extract rep/def
2844                    self.details
2845                        .ctrl_word_parser
2846                        .parse(buf_slice, &mut rep, &mut def);
2847                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2848
2849                    let is_visible = def
2850                        .last()
2851                        .map(|d| *d <= self.details.max_visible_def)
2852                        .unwrap_or(true);
2853                    if is_visible {
2854                        // Extract value
2855                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2856                        buf_slice = &buf_slice[self.bytes_per_value..];
2857                        visible_items += 1;
2858                    }
2859                }
2860
2861                // Finally, we decompress the values and add them to our output buffer
2862                let values_buf = LanceBuffer::from(values);
2863                let fixed_data = FixedWidthDataBlock {
2864                    bits_per_value: self.bytes_per_value as u64 * 8,
2865                    block_info: BlockInfo::new(),
2866                    data: values_buf,
2867                    num_values: visible_items,
2868                };
2869                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2870                else {
2871                    unreachable!()
2872                };
2873                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2874                data_builder.append(&decompressed, 0..visible_items);
2875            }
2876
2877            let repetition = if rep.is_empty() { None } else { Some(rep) };
2878            let definition = if def.is_empty() { None } else { Some(def) };
2879
2880            let unraveler = RepDefUnraveler::new(
2881                repetition,
2882                definition,
2883                self.details.def_meaning.clone(),
2884                self.num_rows as u64,
2885            );
2886            let data = data_builder.finish();
2887
2888            Ok(DecodedPage {
2889                data,
2890                repdef: unraveler,
2891            })
2892        }
2893    }
2894}
2895
2896#[derive(Debug)]
2897struct StructuralPrimitiveFieldSchedulingJob<'a> {
2898    scheduler: &'a StructuralPrimitiveFieldScheduler,
2899    ranges: Vec<Range<u64>>,
2900    page_idx: usize,
2901    range_idx: usize,
2902    global_row_offset: u64,
2903}
2904
2905impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2906    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2907        Self {
2908            scheduler,
2909            ranges,
2910            page_idx: 0,
2911            range_idx: 0,
2912            global_row_offset: 0,
2913        }
2914    }
2915}
2916
2917impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2918    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
2919        if self.range_idx >= self.ranges.len() {
2920            return Ok(Vec::new());
2921        }
2922        // Get our current range
2923        let mut range = self.ranges[self.range_idx].clone();
2924        let priority = range.start;
2925
2926        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2927        trace!(
2928            "Current range is {:?} and current page has {} rows",
2929            range,
2930            cur_page.num_rows
2931        );
2932        // Skip entire pages until we have some overlap with our next range
2933        while cur_page.num_rows + self.global_row_offset <= range.start {
2934            self.global_row_offset += cur_page.num_rows;
2935            self.page_idx += 1;
2936            trace!("Skipping entire page of {} rows", cur_page.num_rows);
2937            cur_page = &self.scheduler.page_schedulers[self.page_idx];
2938        }
2939
2940        // Now the cur_page has overlap with range.  Continue looping through ranges
2941        // until we find a range that exceeds the current page
2942
2943        let mut ranges_in_page = Vec::new();
2944        while cur_page.num_rows + self.global_row_offset > range.start {
2945            range.start = range.start.max(self.global_row_offset);
2946            let start_in_page = range.start - self.global_row_offset;
2947            let end_in_page = start_in_page + (range.end - range.start);
2948            let end_in_page = end_in_page.min(cur_page.num_rows);
2949            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2950
2951            ranges_in_page.push(start_in_page..end_in_page);
2952            if last_in_range {
2953                self.range_idx += 1;
2954                if self.range_idx == self.ranges.len() {
2955                    break;
2956                }
2957                range = self.ranges[self.range_idx].clone();
2958            } else {
2959                break;
2960            }
2961        }
2962
2963        trace!(
2964            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2965            ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
2966            ranges_in_page.len(),
2967            cur_page.num_rows,
2968            priority,
2969            self.scheduler.column_index,
2970            cur_page.page_index,
2971        );
2972
2973        self.global_row_offset += cur_page.num_rows;
2974        self.page_idx += 1;
2975
2976        let page_decoders = cur_page
2977            .scheduler
2978            .schedule_ranges(&ranges_in_page, context.io())?;
2979
2980        let cur_path = context.current_path();
2981        page_decoders
2982            .into_iter()
2983            .map(|page_load_task| {
2984                let cur_path = cur_path.clone();
2985                let page_decoder = page_load_task.decoder_fut;
2986                let unloaded_page = async move {
2987                    let page_decoder = page_decoder.await?;
2988                    Ok(LoadedPageShard {
2989                        decoder: page_decoder,
2990                        path: cur_path,
2991                    })
2992                }
2993                .boxed();
2994                Ok(ScheduledScanLine {
2995                    decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
2996                    rows_scheduled: page_load_task.num_rows,
2997                })
2998            })
2999            .collect::<Result<Vec<_>>>()
3000    }
3001}
3002
3003#[derive(Debug)]
3004struct PageInfoAndScheduler {
3005    page_index: usize,
3006    num_rows: u64,
3007    scheduler: Box<dyn StructuralPageScheduler>,
3008}
3009
3010/// A scheduler for a leaf node
3011///
3012/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
3013/// appropriate for the layout of the page.
3014#[derive(Debug)]
3015pub struct StructuralPrimitiveFieldScheduler {
3016    page_schedulers: Vec<PageInfoAndScheduler>,
3017    column_index: u32,
3018}
3019
3020impl StructuralPrimitiveFieldScheduler {
3021    pub fn try_new(
3022        column_info: &ColumnInfo,
3023        decompressors: &dyn DecompressionStrategy,
3024        cache_repetition_index: bool,
3025        target_field: &Field,
3026    ) -> Result<Self> {
3027        let page_schedulers = column_info
3028            .page_infos
3029            .iter()
3030            .enumerate()
3031            .map(|(page_index, page_info)| {
3032                Self::page_info_to_scheduler(
3033                    page_info,
3034                    page_index,
3035                    decompressors,
3036                    cache_repetition_index,
3037                    target_field,
3038                )
3039            })
3040            .collect::<Result<Vec<_>>>()?;
3041        Ok(Self {
3042            page_schedulers,
3043            column_index: column_info.index,
3044        })
3045    }
3046
3047    fn page_layout_to_scheduler(
3048        page_info: &PageInfo,
3049        page_layout: &PageLayout,
3050        decompressors: &dyn DecompressionStrategy,
3051        cache_repetition_index: bool,
3052        target_field: &Field,
3053    ) -> Result<Box<dyn StructuralPageScheduler>> {
3054        use pb21::page_layout::Layout;
3055        Ok(match page_layout.layout.as_ref().expect_ok()? {
3056            Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3057                &page_info.buffer_offsets_and_sizes,
3058                page_info.priority,
3059                mini_block.num_items,
3060                mini_block,
3061                decompressors,
3062            )?),
3063            Layout::FullZipLayout(full_zip) => {
3064                let mut scheduler = FullZipScheduler::try_new(
3065                    &page_info.buffer_offsets_and_sizes,
3066                    page_info.priority,
3067                    page_info.num_rows,
3068                    full_zip,
3069                    decompressors,
3070                )?;
3071                scheduler.enable_cache = cache_repetition_index;
3072                Box::new(scheduler)
3073            }
3074            Layout::AllNullLayout(all_null) => {
3075                let def_meaning = all_null
3076                    .layers
3077                    .iter()
3078                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3079                    .collect::<Vec<_>>();
3080                if def_meaning.len() == 1
3081                    && def_meaning[0] == DefinitionInterpretation::NullableItem
3082                {
3083                    Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3084                } else {
3085                    Box::new(ComplexAllNullScheduler::new(
3086                        page_info.buffer_offsets_and_sizes.clone(),
3087                        def_meaning.into(),
3088                    )) as Box<dyn StructuralPageScheduler>
3089                }
3090            }
3091            Layout::BlobLayout(blob) => {
3092                let inner_scheduler = Self::page_layout_to_scheduler(
3093                    page_info,
3094                    blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3095                    decompressors,
3096                    cache_repetition_index,
3097                    target_field,
3098                )?;
3099                let def_meaning = blob
3100                    .layers
3101                    .iter()
3102                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3103                    .collect::<Vec<_>>();
3104                if matches!(target_field.data_type(), DataType::Struct(_)) {
3105                    // User wants to decode blob into struct
3106                    Box::new(BlobDescriptionPageScheduler::new(
3107                        inner_scheduler,
3108                        def_meaning.into(),
3109                    ))
3110                } else {
3111                    // User wants to decode blob into binary data
3112                    Box::new(BlobPageScheduler::new(
3113                        inner_scheduler,
3114                        page_info.priority,
3115                        page_info.num_rows,
3116                        def_meaning.into(),
3117                    ))
3118                }
3119            }
3120        })
3121    }
3122
3123    fn page_info_to_scheduler(
3124        page_info: &PageInfo,
3125        page_index: usize,
3126        decompressors: &dyn DecompressionStrategy,
3127        cache_repetition_index: bool,
3128        target_field: &Field,
3129    ) -> Result<PageInfoAndScheduler> {
3130        let page_layout = page_info.encoding.as_structural();
3131        let scheduler = Self::page_layout_to_scheduler(
3132            page_info,
3133            page_layout,
3134            decompressors,
3135            cache_repetition_index,
3136            target_field,
3137        )?;
3138        Ok(PageInfoAndScheduler {
3139            page_index,
3140            num_rows: page_info.num_rows,
3141            scheduler,
3142        })
3143    }
3144}
3145
3146pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3147    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3148}
3149
3150pub struct NoCachedPageData;
3151
3152impl DeepSizeOf for NoCachedPageData {
3153    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3154        0
3155    }
3156}
3157impl CachedPageData for NoCachedPageData {
3158    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3159        self
3160    }
3161}
3162
3163pub struct CachedFieldData {
3164    pages: Vec<Arc<dyn CachedPageData>>,
3165}
3166
3167impl DeepSizeOf for CachedFieldData {
3168    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3169        self.pages.deep_size_of_children(ctx)
3170    }
3171}
3172
3173// Cache key for field data
3174#[derive(Debug, Clone)]
3175pub struct FieldDataCacheKey {
3176    pub column_index: u32,
3177}
3178
3179impl CacheKey for FieldDataCacheKey {
3180    type ValueType = CachedFieldData;
3181
3182    fn key(&self) -> std::borrow::Cow<'_, str> {
3183        self.column_index.to_string().into()
3184    }
3185}
3186
3187impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3188    fn initialize<'a>(
3189        &'a mut self,
3190        _filter: &'a FilterExpression,
3191        context: &'a SchedulerContext,
3192    ) -> BoxFuture<'a, Result<()>> {
3193        let cache_key = FieldDataCacheKey {
3194            column_index: self.column_index,
3195        };
3196        let cache = context.cache().clone();
3197
3198        async move {
3199            if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3200                self.page_schedulers
3201                    .iter_mut()
3202                    .zip(cached_data.pages.iter())
3203                    .for_each(|(page_scheduler, cached_data)| {
3204                        page_scheduler.scheduler.load(cached_data);
3205                    });
3206                return Ok(());
3207            }
3208
3209            let page_data = self
3210                .page_schedulers
3211                .iter_mut()
3212                .map(|s| s.scheduler.initialize(context.io()))
3213                .collect::<FuturesOrdered<_>>();
3214
3215            let page_data = page_data.try_collect::<Vec<_>>().await?;
3216            let cached_data = Arc::new(CachedFieldData { pages: page_data });
3217            cache.insert_with_key(&cache_key, cached_data).await;
3218            Ok(())
3219        }
3220        .boxed()
3221    }
3222
3223    fn schedule_ranges<'a>(
3224        &'a self,
3225        ranges: &[Range<u64>],
3226        _filter: &FilterExpression,
3227    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3228        let ranges = ranges.to_vec();
3229        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3230            self, ranges,
3231        )))
3232    }
3233}
3234
3235/// Takes the output from several pages decoders and
3236/// concatenates them.
3237#[derive(Debug)]
3238pub struct StructuralCompositeDecodeArrayTask {
3239    tasks: Vec<Box<dyn DecodePageTask>>,
3240    should_validate: bool,
3241    data_type: DataType,
3242}
3243
3244impl StructuralCompositeDecodeArrayTask {
3245    fn restore_validity(
3246        array: Arc<dyn Array>,
3247        unraveler: &mut CompositeRepDefUnraveler,
3248    ) -> Arc<dyn Array> {
3249        let validity = unraveler.unravel_validity(array.len());
3250        let Some(validity) = validity else {
3251            return array;
3252        };
3253        if array.data_type() == &DataType::Null {
3254            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3255            return array;
3256        }
3257        assert_eq!(validity.len(), array.len());
3258        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3259        // adding null buffers to the array here
3260        make_array(unsafe {
3261            array
3262                .to_data()
3263                .into_builder()
3264                .nulls(Some(validity))
3265                .build_unchecked()
3266        })
3267    }
3268}
3269
3270impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3271    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3272        let mut arrays = Vec::with_capacity(self.tasks.len());
3273        let mut unravelers = Vec::with_capacity(self.tasks.len());
3274        for task in self.tasks {
3275            let decoded = task.decode()?;
3276            unravelers.push(decoded.repdef);
3277
3278            let array = make_array(
3279                decoded
3280                    .data
3281                    .into_arrow(self.data_type.clone(), self.should_validate)?,
3282            );
3283
3284            arrays.push(array);
3285        }
3286        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3287        let array = arrow_select::concat::concat(&array_refs)?;
3288        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3289
3290        let array = Self::restore_validity(array, &mut repdef);
3291
3292        Ok(DecodedArray { array, repdef })
3293    }
3294}
3295
3296#[derive(Debug)]
3297pub struct StructuralPrimitiveFieldDecoder {
3298    field: Arc<ArrowField>,
3299    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3300    should_validate: bool,
3301    rows_drained_in_current: u64,
3302}
3303
3304impl StructuralPrimitiveFieldDecoder {
3305    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3306        Self {
3307            field: field.clone(),
3308            page_decoders: VecDeque::new(),
3309            should_validate,
3310            rows_drained_in_current: 0,
3311        }
3312    }
3313}
3314
3315impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3316    fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3317        assert!(child.path.is_empty());
3318        self.page_decoders.push_back(child.decoder);
3319        Ok(())
3320    }
3321
3322    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3323        let mut remaining = num_rows;
3324        let mut tasks = Vec::new();
3325        while remaining > 0 {
3326            let cur_page = self.page_decoders.front_mut().unwrap();
3327            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3328            let to_take = num_in_page.min(remaining);
3329
3330            let task = cur_page.drain(to_take)?;
3331            tasks.push(task);
3332
3333            if to_take == num_in_page {
3334                self.page_decoders.pop_front();
3335                self.rows_drained_in_current = 0;
3336            } else {
3337                self.rows_drained_in_current += to_take;
3338            }
3339
3340            remaining -= to_take;
3341        }
3342        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3343            tasks,
3344            should_validate: self.should_validate,
3345            data_type: self.field.data_type().clone(),
3346        }))
3347    }
3348
3349    fn data_type(&self) -> &DataType {
3350        self.field.data_type()
3351    }
3352}
3353
3354/// The serialized representation of full-zip data
3355struct SerializedFullZip {
3356    /// The zipped values buffer
3357    values: LanceBuffer,
3358    /// The repetition index (only present if there is repetition)
3359    repetition_index: Option<LanceBuffer>,
3360}
3361
3362// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3363// to allow us to store a chunk size in 12 bits.
3364//
3365// If we directly record the size in bytes with 12 bits we would be limited to
3366// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3367// words we can store the # of words instead which gives us 32KiB.  We want
3368// at least 24KiB so we can handle even the worst case of
3369// - 4Ki values compressed into an 8186 byte buffer
3370// - 4 bytes to describe rep & def lengths
3371// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3372//   plan for it)
3373//
3374// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3375// values like offsets to be stored in a mini-block and safely read back out.  It also
3376// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3377// needed).
3378//
3379// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3380// the page" and "aligned to 8 bytes from the start of the file."
3381const MINIBLOCK_ALIGNMENT: usize = 8;
3382
3383/// An encoder for primitive (leaf) arrays
3384///
3385/// This encoder is fairly complicated and follows a number of paths depending
3386/// on the data.
3387///
3388/// First, we convert the validity & offsets information into repetition and
3389/// definition levels.  Then we compress the data itself into a single buffer.
3390///
3391/// If the data is narrow then we encode the data in small chunks (each chunk
3392/// should be a few disk sectors and contains a buffer of repetition, a buffer
3393/// of definition, and a buffer of value data).  This approach is called
3394/// "mini-block".  These mini-blocks are stored into a single data buffer.
3395///
3396/// If the data is wide then we zip together the repetition and definition value
3397/// with the value data into a single buffer.  This approach is called "zipped".
3398///
3399/// If there is any repetition information then we create a repetition index
3400///
3401/// In addition, the compression process may create zero or more metadata buffers.
3402/// For example, a dictionary compression will create dictionary metadata.  Any
3403/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3404/// stored in a separate buffer on disk and read at initialization time.
3405///
3406/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3407/// at (roughly) the end of the file so there is, at most, one read per column of
3408/// metadata per file.
3409pub struct PrimitiveStructuralEncoder {
3410    // Accumulates arrays until we have enough data to justify a disk page
3411    accumulation_queue: AccumulationQueue,
3412
3413    keep_original_array: bool,
3414    support_large_chunk: bool,
3415    accumulated_repdefs: Vec<RepDefBuilder>,
3416    // The compression strategy we will use to compress the data
3417    compression_strategy: Arc<dyn CompressionStrategy>,
3418    column_index: u32,
3419    field: Field,
3420    encoding_metadata: Arc<HashMap<String, String>>,
3421    version: LanceFileVersion,
3422}
3423
3424struct CompressedLevelsChunk {
3425    data: LanceBuffer,
3426    num_levels: u16,
3427}
3428
3429struct CompressedLevels {
3430    data: Vec<CompressedLevelsChunk>,
3431    compression: CompressiveEncoding,
3432    rep_index: Option<LanceBuffer>,
3433}
3434
3435struct SerializedMiniBlockPage {
3436    num_buffers: u64,
3437    data: LanceBuffer,
3438    metadata: LanceBuffer,
3439}
3440
3441impl PrimitiveStructuralEncoder {
3442    pub fn try_new(
3443        options: &EncodingOptions,
3444        compression_strategy: Arc<dyn CompressionStrategy>,
3445        column_index: u32,
3446        field: Field,
3447        encoding_metadata: Arc<HashMap<String, String>>,
3448    ) -> Result<Self> {
3449        Ok(Self {
3450            accumulation_queue: AccumulationQueue::new(
3451                options.cache_bytes_per_column,
3452                column_index,
3453                options.keep_original_array,
3454            ),
3455            support_large_chunk: options.support_large_chunk(),
3456            keep_original_array: options.keep_original_array,
3457            accumulated_repdefs: Vec::new(),
3458            column_index,
3459            compression_strategy,
3460            field,
3461            encoding_metadata,
3462            version: options.version,
3463        })
3464    }
3465
3466    // TODO: This is a heuristic we may need to tune at some point
3467    //
3468    // As data gets narrow then the "zipping" process gets too expensive
3469    //   and we prefer mini-block
3470    // As data gets wide then the # of values per block shrinks (very wide)
3471    //   data doesn't even fit in a mini-block and the block overhead gets
3472    //   too large and we prefer zipped.
3473    fn is_narrow(data_block: &DataBlock) -> bool {
3474        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3475
3476        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3477            let max_len_array = max_len_array
3478                .as_any()
3479                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3480                .unwrap();
3481            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3482                return true;
3483            }
3484        }
3485        false
3486    }
3487
3488    fn prefers_miniblock(
3489        data_block: &DataBlock,
3490        encoding_metadata: &HashMap<String, String>,
3491    ) -> bool {
3492        // If the user specifically requested miniblock then use it
3493        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3494            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3495        }
3496        // Otherwise only use miniblock if it is narrow
3497        Self::is_narrow(data_block)
3498    }
3499
3500    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3501        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3502        // user specifically requested not to use it (in which case we're probably going
3503        // to emit an error)
3504        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3505            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3506        }
3507        true
3508    }
3509
3510    // Converts value data, repetition levels, and definition levels into a single
3511    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3512    // which tells us the size of each block.  Finally, if repetition is present then
3513    // we also create a buffer for the repetition index.
3514    //
3515    // Each chunk is serialized as:
3516    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3517    //
3518    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3519    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3520    //
3521    // Each block has a u16 word of metadata.  The upper 12 bits contain the
3522    // # of 8-byte words in the block (if the block does not fill the final word
3523    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3524    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3525    // 0xA)  All blocks except the last must have power-of-two number of values.
3526    // This not only makes metadata smaller but it makes decoding easier since
3527    // batch sizes are typically a power of 2.  4 bits would allow us to express
3528    // up to 16Ki values but we restrict this further to 4Ki values.
3529    //
3530    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3531    //
3532    // All metadata words are serialized (as little endian) into a single buffer
3533    // of metadata values.
3534    //
3535    // If there is repetition then we also create a repetition index.  This is a
3536    // single buffer of integer vectors (stored in row major order).  There is one
3537    // entry for each chunk.  The size of the vector is based on the depth of random
3538    // access we want to support.
3539    //
3540    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3541    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3542    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3543    // levels of nested access and so on.
3544    //
3545    // The first number in the vector is the number of top-level rows that complete in
3546    // the chunk.  The second number is the number of second-level rows that complete
3547    // after the final top-level row completed (or beginning of the chunk if no top-level
3548    // row completes in the chunk).  And so on.  The final number in the vector is always
3549    // the number of leftover items not covered by earlier entries in the vector.
3550    //
3551    // Currently we are limited to 0 levels of nested access but that will change in the
3552    // future.
3553    //
3554    // The repetition index and the chunk metadata are read at initialization time and
3555    // cached in memory.
3556    fn serialize_miniblocks(
3557        miniblocks: MiniBlockCompressed,
3558        rep: Option<Vec<CompressedLevelsChunk>>,
3559        def: Option<Vec<CompressedLevelsChunk>>,
3560        support_large_chunk: bool,
3561    ) -> SerializedMiniBlockPage {
3562        let bytes_rep = rep
3563            .as_ref()
3564            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3565            .unwrap_or(0);
3566        let bytes_def = def
3567            .as_ref()
3568            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3569            .unwrap_or(0);
3570        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3571        let mut num_buffers = miniblocks.data.len();
3572        if rep.is_some() {
3573            num_buffers += 1;
3574        }
3575        if def.is_some() {
3576            num_buffers += 1;
3577        }
3578        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3579        let max_extra = 9 * num_buffers;
3580        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3581        let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3582        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3583
3584        let mut rep_iter = rep.map(|r| r.into_iter());
3585        let mut def_iter = def.map(|d| d.into_iter());
3586
3587        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3588        for chunk in miniblocks.chunks {
3589            let start_pos = data_buffer.len();
3590            // Start of chunk should be aligned
3591            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3592
3593            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3594            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3595
3596            // Write the number of levels, or 0 if there is no rep/def
3597            let num_levels = rep
3598                .as_ref()
3599                .map(|r| r.num_levels)
3600                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3601            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3602
3603            // Write the buffer lengths
3604            if let Some(rep) = rep.as_ref() {
3605                let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3606                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3607            }
3608            if let Some(def) = def.as_ref() {
3609                let bytes_def = u16::try_from(def.data.len()).unwrap();
3610                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3611            }
3612
3613            if support_large_chunk {
3614                for &buffer_size in &chunk.buffer_sizes {
3615                    data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
3616                }
3617            } else {
3618                for &buffer_size in &chunk.buffer_sizes {
3619                    data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
3620                }
3621            }
3622
3623            // Pad
3624            let add_padding = |data_buffer: &mut Vec<u8>| {
3625                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3626                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3627            };
3628            add_padding(&mut data_buffer);
3629
3630            // Write the buffers themselves
3631            if let Some(rep) = rep.as_ref() {
3632                data_buffer.extend_from_slice(&rep.data);
3633                add_padding(&mut data_buffer);
3634            }
3635            if let Some(def) = def.as_ref() {
3636                data_buffer.extend_from_slice(&def.data);
3637                add_padding(&mut data_buffer);
3638            }
3639            for (buffer_size, (buffer, buffer_offset)) in chunk
3640                .buffer_sizes
3641                .iter()
3642                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3643            {
3644                let start = *buffer_offset;
3645                let end = start + *buffer_size as usize;
3646                *buffer_offset += *buffer_size as usize;
3647                data_buffer.extend_from_slice(&buffer[start..end]);
3648                add_padding(&mut data_buffer);
3649            }
3650
3651            let chunk_bytes = data_buffer.len() - start_pos;
3652            let max_chunk_size = if support_large_chunk {
3653                4 * 1024 * 1024 * 1024 // 4GB limit with u32 metadata
3654            } else {
3655                32 * 1024 // 32KiB limit with u16 metadata
3656            };
3657            assert!(chunk_bytes <= max_chunk_size);
3658            assert!(chunk_bytes > 0);
3659            assert_eq!(chunk_bytes % 8, 0);
3660            // 4Ki values max
3661            assert!(chunk.log_num_values <= 12);
3662            // We subtract 1 here from chunk_bytes because we want to be able to express
3663            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3664            // 0xFFF
3665            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3666            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3667
3668            let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
3669            if support_large_chunk {
3670                meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
3671            } else {
3672                meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
3673            }
3674        }
3675
3676        let data_buffer = LanceBuffer::from(data_buffer);
3677        let metadata_buffer = LanceBuffer::from(meta_buffer);
3678
3679        SerializedMiniBlockPage {
3680            num_buffers: miniblocks.data.len() as u64,
3681            data: data_buffer,
3682            metadata: metadata_buffer,
3683        }
3684    }
3685
3686    /// Compresses a buffer of levels into chunks
3687    ///
3688    /// If these are repetition levels then we also calculate the repetition index here (that
3689    /// is the third return value)
3690    fn compress_levels(
3691        mut levels: RepDefSlicer<'_>,
3692        num_elements: u64,
3693        compression_strategy: &dyn CompressionStrategy,
3694        chunks: &[MiniBlockChunk],
3695        // This will be 0 if we are compressing def levels
3696        max_rep: u16,
3697    ) -> Result<CompressedLevels> {
3698        let mut rep_index = if max_rep > 0 {
3699            Vec::with_capacity(chunks.len())
3700        } else {
3701            vec![]
3702        };
3703        // Make the levels into a FixedWidth data block
3704        let num_levels = levels.num_levels() as u64;
3705        let levels_buf = levels.all_levels().clone();
3706
3707        let mut fixed_width_block = FixedWidthDataBlock {
3708            data: levels_buf,
3709            bits_per_value: 16,
3710            num_values: num_levels,
3711            block_info: BlockInfo::new(),
3712        };
3713        // Compute statistics to enable optimal compression for rep/def levels
3714        fixed_width_block.compute_stat();
3715
3716        let levels_block = DataBlock::FixedWidth(fixed_width_block);
3717        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3718        // Pick a block compressor
3719        let (compressor, compressor_desc) =
3720            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3721        // Compress blocks of levels (sized according to the chunks)
3722        let mut level_chunks = Vec::with_capacity(chunks.len());
3723        let mut values_counter = 0;
3724        for (chunk_idx, chunk) in chunks.iter().enumerate() {
3725            let chunk_num_values = chunk.num_values(values_counter, num_elements);
3726            debug_assert!(chunk_num_values > 0);
3727            values_counter += chunk_num_values;
3728            let chunk_levels = if chunk_idx < chunks.len() - 1 {
3729                levels.slice_next(chunk_num_values as usize)
3730            } else {
3731                levels.slice_rest()
3732            };
3733            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3734            if max_rep > 0 {
3735                // If max_rep > 0 then we are working with rep levels and we need
3736                // to calculate the repetition index.  The repetition index for a
3737                // chunk is currently 2 values (in the future it may be more).
3738                //
3739                // The first value is the number of rows that _finish_ in the
3740                // chunk.
3741                //
3742                // The second value is the number of "leftovers" after the last
3743                // finished row in the chunk.
3744                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3745                let rep_values = rep_values.as_ref();
3746
3747                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3748                // will count it in the previous chunk)
3749                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3750                let num_leftovers = if chunk_idx < chunks.len() - 1 {
3751                    rep_values
3752                        .iter()
3753                        .rev()
3754                        .position(|v| *v == max_rep)
3755                        // # of leftovers includes the max_rep spot
3756                        .map(|pos| pos + 1)
3757                        .unwrap_or(rep_values.len())
3758                } else {
3759                    // Last chunk can't have leftovers
3760                    0
3761                };
3762
3763                if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
3764                    // This chunk starts with a new row and so, if we thought we had leftovers
3765                    // in the previous chunk, we were mistaken
3766                    // TODO: Can use unchecked here
3767                    let rep_len = rep_index.len();
3768                    if rep_index[rep_len - 1] != 0 {
3769                        // We thought we had leftovers but that was actually a full row
3770                        rep_index[rep_len - 2] += 1;
3771                        rep_index[rep_len - 1] = 0;
3772                    }
3773                }
3774
3775                if chunk_idx == chunks.len() - 1 {
3776                    // The final list
3777                    num_rows += 1;
3778                }
3779                rep_index.push(num_rows as u64);
3780                rep_index.push(num_leftovers as u64);
3781            }
3782            let mut chunk_fixed_width = FixedWidthDataBlock {
3783                data: chunk_levels,
3784                bits_per_value: 16,
3785                num_values: num_chunk_levels,
3786                block_info: BlockInfo::new(),
3787            };
3788            chunk_fixed_width.compute_stat();
3789            let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
3790            let compressed_levels = compressor.compress(chunk_levels_block)?;
3791            level_chunks.push(CompressedLevelsChunk {
3792                data: compressed_levels,
3793                num_levels: num_chunk_levels as u16,
3794            });
3795        }
3796        debug_assert_eq!(levels.num_levels_remaining(), 0);
3797        let rep_index = if rep_index.is_empty() {
3798            None
3799        } else {
3800            Some(LanceBuffer::reinterpret_vec(rep_index))
3801        };
3802        Ok(CompressedLevels {
3803            data: level_chunks,
3804            compression: compressor_desc,
3805            rep_index,
3806        })
3807    }
3808
3809    fn encode_simple_all_null(
3810        column_idx: u32,
3811        num_rows: u64,
3812        row_number: u64,
3813    ) -> Result<EncodedPage> {
3814        let description = ProtobufUtils21::simple_all_null_layout();
3815        Ok(EncodedPage {
3816            column_idx,
3817            data: vec![],
3818            description: PageEncoding::Structural(description),
3819            num_rows,
3820            row_number,
3821        })
3822    }
3823
3824    // Encodes a page where all values are null but we have rep/def
3825    // information that we need to store (e.g. to distinguish between
3826    // different kinds of null)
3827    fn encode_complex_all_null(
3828        column_idx: u32,
3829        repdefs: Vec<RepDefBuilder>,
3830        row_number: u64,
3831        num_rows: u64,
3832    ) -> Result<EncodedPage> {
3833        let repdef = RepDefBuilder::serialize(repdefs);
3834
3835        // TODO: Actually compress repdef
3836        let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3837            LanceBuffer::reinterpret_slice(rep.clone())
3838        } else {
3839            LanceBuffer::empty()
3840        };
3841
3842        let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3843            LanceBuffer::reinterpret_slice(def.clone())
3844        } else {
3845            LanceBuffer::empty()
3846        };
3847
3848        let description = ProtobufUtils21::all_null_layout(&repdef.def_meaning);
3849        Ok(EncodedPage {
3850            column_idx,
3851            data: vec![rep_bytes, def_bytes],
3852            description: PageEncoding::Structural(description),
3853            num_rows,
3854            row_number,
3855        })
3856    }
3857
3858    #[allow(clippy::too_many_arguments)]
3859    fn encode_miniblock(
3860        column_idx: u32,
3861        field: &Field,
3862        compression_strategy: &dyn CompressionStrategy,
3863        data: DataBlock,
3864        repdefs: Vec<RepDefBuilder>,
3865        row_number: u64,
3866        dictionary_data: Option<DataBlock>,
3867        num_rows: u64,
3868        support_large_chunk: bool,
3869    ) -> Result<EncodedPage> {
3870        let repdef = RepDefBuilder::serialize(repdefs);
3871
3872        if let DataBlock::AllNull(_null_block) = data {
3873            // We should not be using mini-block for all-null.  There are other structural
3874            // encodings for that.
3875            unreachable!()
3876        }
3877
3878        let num_items = data.num_values();
3879
3880        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3881        let (compressed_data, value_encoding) = compressor.compress(data)?;
3882
3883        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3884
3885        let mut compressed_rep = repdef
3886            .rep_slicer()
3887            .map(|rep_slicer| {
3888                Self::compress_levels(
3889                    rep_slicer,
3890                    num_items,
3891                    compression_strategy,
3892                    &compressed_data.chunks,
3893                    max_rep,
3894                )
3895            })
3896            .transpose()?;
3897
3898        let (rep_index, rep_index_depth) =
3899            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3900                Some(rep_index) => (Some(rep_index.clone()), 1),
3901                None => (None, 0),
3902            };
3903
3904        let mut compressed_def = repdef
3905            .def_slicer()
3906            .map(|def_slicer| {
3907                Self::compress_levels(
3908                    def_slicer,
3909                    num_items,
3910                    compression_strategy,
3911                    &compressed_data.chunks,
3912                    /*max_rep=*/ 0,
3913                )
3914            })
3915            .transpose()?;
3916
3917        // TODO: Parquet sparsely encodes values here.  We could do the same but
3918        // then we won't have log2 values per chunk.  This means more metadata
3919        // and potentially more decoder asymmetry.  However, it may be worth
3920        // investigating at some point
3921
3922        let rep_data = compressed_rep
3923            .as_mut()
3924            .map(|cr| std::mem::take(&mut cr.data));
3925        let def_data = compressed_def
3926            .as_mut()
3927            .map(|cd| std::mem::take(&mut cd.data));
3928
3929        let serialized =
3930            Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk);
3931
3932        // Metadata, Data, Dictionary, (maybe) Repetition Index
3933        let mut data = Vec::with_capacity(4);
3934        data.push(serialized.metadata);
3935        data.push(serialized.data);
3936
3937        if let Some(dictionary_data) = dictionary_data {
3938            let num_dictionary_items = dictionary_data.num_values();
3939            // field in `create_block_compressor` is not used currently.
3940            let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3941
3942            let (compressor, dictionary_encoding) = compression_strategy
3943                .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3944            let dictionary_buffer = compressor.compress(dictionary_data)?;
3945
3946            data.push(dictionary_buffer);
3947            if let Some(rep_index) = rep_index {
3948                data.push(rep_index);
3949            }
3950
3951            let description = ProtobufUtils21::miniblock_layout(
3952                compressed_rep.map(|cr| cr.compression),
3953                compressed_def.map(|cd| cd.compression),
3954                value_encoding,
3955                rep_index_depth,
3956                serialized.num_buffers,
3957                Some((dictionary_encoding, num_dictionary_items)),
3958                &repdef.def_meaning,
3959                num_items,
3960                support_large_chunk,
3961            );
3962            Ok(EncodedPage {
3963                num_rows,
3964                column_idx,
3965                data,
3966                description: PageEncoding::Structural(description),
3967                row_number,
3968            })
3969        } else {
3970            let description = ProtobufUtils21::miniblock_layout(
3971                compressed_rep.map(|cr| cr.compression),
3972                compressed_def.map(|cd| cd.compression),
3973                value_encoding,
3974                rep_index_depth,
3975                serialized.num_buffers,
3976                None,
3977                &repdef.def_meaning,
3978                num_items,
3979                support_large_chunk,
3980            );
3981
3982            if let Some(rep_index) = rep_index {
3983                let view = rep_index.borrow_to_typed_slice::<u64>();
3984                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3985                debug_assert_eq!(total, num_rows);
3986
3987                data.push(rep_index);
3988            }
3989
3990            Ok(EncodedPage {
3991                num_rows,
3992                column_idx,
3993                data,
3994                description: PageEncoding::Structural(description),
3995                row_number,
3996            })
3997        }
3998    }
3999
4000    // For fixed-size data we encode < control word | data > for each value
4001    fn serialize_full_zip_fixed(
4002        fixed: FixedWidthDataBlock,
4003        mut repdef: ControlWordIterator,
4004        num_values: u64,
4005    ) -> SerializedFullZip {
4006        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4007        let mut zipped_data = Vec::with_capacity(len);
4008
4009        let max_rep_index_val = if repdef.has_repetition() {
4010            len as u64
4011        } else {
4012            // Setting this to 0 means we won't write a repetition index
4013            0
4014        };
4015        let mut rep_index_builder =
4016            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4017
4018        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
4019        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
4020        assert_eq!(
4021            fixed.bits_per_value % 8,
4022            0,
4023            "Non-byte aligned full-zip compression not yet supported"
4024        );
4025
4026        let bytes_per_value = fixed.bits_per_value as usize / 8;
4027        let mut offset = 0;
4028
4029        if bytes_per_value == 0 {
4030            // No data, just dump the repdef into the buffer
4031            while let Some(control) = repdef.append_next(&mut zipped_data) {
4032                if control.is_new_row {
4033                    // We have finished a row
4034                    debug_assert!(offset <= len);
4035                    // SAFETY: We know that `start <= len`
4036                    unsafe { rep_index_builder.append(offset as u64) };
4037                }
4038                offset = zipped_data.len();
4039            }
4040        } else {
4041            // We have data, zip it with the repdef
4042            let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4043            while let Some(control) = repdef.append_next(&mut zipped_data) {
4044                if control.is_new_row {
4045                    // We have finished a row
4046                    debug_assert!(offset <= len);
4047                    // SAFETY: We know that `start <= len`
4048                    unsafe { rep_index_builder.append(offset as u64) };
4049                }
4050                if control.is_visible {
4051                    let value = data_iter.next().unwrap();
4052                    zipped_data.extend_from_slice(value);
4053                }
4054                offset = zipped_data.len();
4055            }
4056        }
4057
4058        debug_assert_eq!(zipped_data.len(), len);
4059        // Put the final value in the rep index
4060        // SAFETY: `zipped_data.len() == len`
4061        unsafe {
4062            rep_index_builder.append(zipped_data.len() as u64);
4063        }
4064
4065        let zipped_data = LanceBuffer::from(zipped_data);
4066        let rep_index = rep_index_builder.into_data();
4067        let rep_index = if rep_index.is_empty() {
4068            None
4069        } else {
4070            Some(LanceBuffer::from(rep_index))
4071        };
4072        SerializedFullZip {
4073            values: zipped_data,
4074            repetition_index: rep_index,
4075        }
4076    }
4077
4078    // For variable-size data we encode < control word | length | data > for each value
4079    //
4080    // In addition, we create a second buffer, the repetition index
4081    fn serialize_full_zip_variable(
4082        variable: VariableWidthBlock,
4083        mut repdef: ControlWordIterator,
4084        num_items: u64,
4085    ) -> SerializedFullZip {
4086        let bytes_per_offset = variable.bits_per_offset as usize / 8;
4087        assert_eq!(
4088            variable.bits_per_offset % 8,
4089            0,
4090            "Only byte-aligned offsets supported"
4091        );
4092        let len = variable.data.len()
4093            + repdef.bytes_per_word() * num_items as usize
4094            + bytes_per_offset * variable.num_values as usize;
4095        let mut buf = Vec::with_capacity(len);
4096
4097        let max_rep_index_val = len as u64;
4098        let mut rep_index_builder =
4099            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4100
4101        // TODO: byte pack the item lengths with varint encoding
4102        match bytes_per_offset {
4103            4 => {
4104                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4105                let mut rep_offset = 0;
4106                let mut windows_iter = offs.as_ref().windows(2);
4107                while let Some(control) = repdef.append_next(&mut buf) {
4108                    if control.is_new_row {
4109                        // We have finished a row
4110                        debug_assert!(rep_offset <= len);
4111                        // SAFETY: We know that `buf.len() <= len`
4112                        unsafe { rep_index_builder.append(rep_offset as u64) };
4113                    }
4114                    if control.is_visible {
4115                        let window = windows_iter.next().unwrap();
4116                        if control.is_valid_item {
4117                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4118                            buf.extend_from_slice(
4119                                &variable.data[window[0] as usize..window[1] as usize],
4120                            );
4121                        }
4122                    }
4123                    rep_offset = buf.len();
4124                }
4125            }
4126            8 => {
4127                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4128                let mut rep_offset = 0;
4129                let mut windows_iter = offs.as_ref().windows(2);
4130                while let Some(control) = repdef.append_next(&mut buf) {
4131                    if control.is_new_row {
4132                        // We have finished a row
4133                        debug_assert!(rep_offset <= len);
4134                        // SAFETY: We know that `buf.len() <= len`
4135                        unsafe { rep_index_builder.append(rep_offset as u64) };
4136                    }
4137                    if control.is_visible {
4138                        let window = windows_iter.next().unwrap();
4139                        if control.is_valid_item {
4140                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4141                            buf.extend_from_slice(
4142                                &variable.data[window[0] as usize..window[1] as usize],
4143                            );
4144                        }
4145                    }
4146                    rep_offset = buf.len();
4147                }
4148            }
4149            _ => panic!("Unsupported offset size"),
4150        }
4151
4152        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4153        // if we are over `len` then we have a bug.
4154        debug_assert!(buf.len() <= len);
4155        // Put the final value in the rep index
4156        // SAFETY: `zipped_data.len() == len`
4157        unsafe {
4158            rep_index_builder.append(buf.len() as u64);
4159        }
4160
4161        let zipped_data = LanceBuffer::from(buf);
4162        let rep_index = rep_index_builder.into_data();
4163        debug_assert!(!rep_index.is_empty());
4164        let rep_index = Some(LanceBuffer::from(rep_index));
4165        SerializedFullZip {
4166            values: zipped_data,
4167            repetition_index: rep_index,
4168        }
4169    }
4170
4171    /// Serializes data into a single buffer according to the full-zip format which zips
4172    /// together the repetition, definition, and value data into a single buffer.
4173    fn serialize_full_zip(
4174        compressed_data: PerValueDataBlock,
4175        repdef: ControlWordIterator,
4176        num_items: u64,
4177    ) -> SerializedFullZip {
4178        match compressed_data {
4179            PerValueDataBlock::Fixed(fixed) => {
4180                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4181            }
4182            PerValueDataBlock::Variable(var) => {
4183                Self::serialize_full_zip_variable(var, repdef, num_items)
4184            }
4185        }
4186    }
4187
4188    fn encode_full_zip(
4189        column_idx: u32,
4190        field: &Field,
4191        compression_strategy: &dyn CompressionStrategy,
4192        data: DataBlock,
4193        repdefs: Vec<RepDefBuilder>,
4194        row_number: u64,
4195        num_lists: u64,
4196    ) -> Result<EncodedPage> {
4197        let repdef = RepDefBuilder::serialize(repdefs);
4198        let max_rep = repdef
4199            .repetition_levels
4200            .as_ref()
4201            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4202        let max_def = repdef
4203            .definition_levels
4204            .as_ref()
4205            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4206
4207        // To handle FSL we just flatten
4208        // let data = data.flatten();
4209
4210        let (num_items, num_visible_items) =
4211            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4212                // If there are rep levels there may be "invisible" items and we need to encode
4213                // rep_levels.len() things which might be larger than data.num_values()
4214                (rep_levels.len() as u64, data.num_values())
4215            } else {
4216                // If there are no rep levels then we encode data.num_values() things
4217                (data.num_values(), data.num_values())
4218            };
4219
4220        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4221
4222        let repdef_iter = build_control_word_iterator(
4223            repdef.repetition_levels.as_deref(),
4224            max_rep,
4225            repdef.definition_levels.as_deref(),
4226            max_def,
4227            max_visible_def,
4228            num_items as usize,
4229        );
4230        let bits_rep = repdef_iter.bits_rep();
4231        let bits_def = repdef_iter.bits_def();
4232
4233        let compressor = compression_strategy.create_per_value(field, &data)?;
4234        let (compressed_data, value_encoding) = compressor.compress(data)?;
4235
4236        let description = match &compressed_data {
4237            PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4238                bits_rep,
4239                bits_def,
4240                fixed.bits_per_value as u32,
4241                value_encoding,
4242                &repdef.def_meaning,
4243                num_items as u32,
4244                num_visible_items as u32,
4245            ),
4246            PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4247                bits_rep,
4248                bits_def,
4249                variable.bits_per_offset as u32,
4250                value_encoding,
4251                &repdef.def_meaning,
4252                num_items as u32,
4253                num_visible_items as u32,
4254            ),
4255        };
4256
4257        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4258
4259        let data = if let Some(repindex) = zipped.repetition_index {
4260            vec![zipped.values, repindex]
4261        } else {
4262            vec![zipped.values]
4263        };
4264
4265        Ok(EncodedPage {
4266            num_rows: num_lists,
4267            column_idx,
4268            data,
4269            description: PageEncoding::Structural(description),
4270            row_number,
4271        })
4272    }
4273
4274    /// Estimates the total size of dictionary-encoded data
4275    ///
4276    /// Dictionary encoding splits data into two parts:
4277    /// 1. Dictionary: stores unique values
4278    /// 2. Indices: maps each value to a dictionary entry
4279    ///
4280    /// For FixedWidth (e.g., 128-bit Decimal):
4281    /// - Dictionary: cardinality × 16 bytes (128 bits per value)
4282    /// - Indices: num_values × 4 bytes (32-bit i32)
4283    ///
4284    /// For VariableWidth (strings/binary):
4285    /// - Dictionary values: cardinality × avg_value_size (actual data)
4286    /// - Dictionary offsets: cardinality × offset_size (32 or 64 bits)
4287    /// - Indices: num_values × offset_size (same as dictionary offsets)
4288    fn estimate_dict_size(data_block: &DataBlock, version: LanceFileVersion) -> Option<u64> {
4289        let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4290            cardinality_array.as_primitive::<UInt64Type>().value(0)
4291        } else {
4292            return None;
4293        };
4294
4295        let num_values = data_block.num_values();
4296
4297        match data_block {
4298            DataBlock::FixedWidth(fixed) => {
4299                if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4300                    return None;
4301                }
4302                // The current fixed-width dictionary encoding uses i32 indices.
4303                if cardinality > i32::MAX as u64 {
4304                    return None;
4305                }
4306                // We currently only support dictionary encoding for 64-bit and 128-bit fixed-width values.
4307                if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4308                    return None;
4309                }
4310                if fixed.bits_per_value % 8 != 0 {
4311                    return None;
4312                }
4313                // Dictionary: cardinality unique values at value bit width
4314                let dict_size = cardinality * (fixed.bits_per_value / 8);
4315                // Indices: num_values indices at 32 bits each
4316                let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8);
4317                Some(dict_size + indices_size)
4318            }
4319            DataBlock::VariableWidth(var) => {
4320                // Only 32-bit and 64-bit offsets are supported
4321                if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4322                    return None;
4323                }
4324                let bits_per_offset = var.bits_per_offset as u64;
4325                if (bits_per_offset == 32 && cardinality > i32::MAX as u64)
4326                    || (bits_per_offset == 64 && cardinality > i64::MAX as u64)
4327                {
4328                    return None;
4329                }
4330
4331                let data_size = data_block.data_size();
4332                let avg_value_size = data_size / num_values;
4333
4334                // Dictionary values: actual bytes of unique strings/binary
4335                let dict_values_size = cardinality * avg_value_size;
4336                // Dictionary offsets: pointers into dictionary values
4337                let dict_offsets_size = cardinality * (bits_per_offset / 8);
4338                // Indices: map each row to dictionary entry
4339                let indices_size = num_values * (bits_per_offset / 8);
4340
4341                Some(dict_values_size + dict_offsets_size + indices_size)
4342            }
4343            _ => None,
4344        }
4345    }
4346
4347    fn should_dictionary_encode(
4348        data_block: &DataBlock,
4349        field: &Field,
4350        version: LanceFileVersion,
4351    ) -> bool {
4352        // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip
4353        // estimating the size
4354        match data_block {
4355            DataBlock::FixedWidth(fixed) => {
4356                if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4357                    return false;
4358                }
4359                if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4360                    return false;
4361                }
4362            }
4363            DataBlock::VariableWidth(_) => {}
4364            _ => return false,
4365        }
4366
4367        // Don't dictionary encode tiny arrays
4368        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4369            .ok()
4370            .and_then(|val| val.parse().ok())
4371            .unwrap_or(100);
4372        if data_block.num_values() < too_small {
4373            return false;
4374        }
4375
4376        // Get size ratio from metadata or env var, default to 0.8
4377        let threshold_ratio = field
4378            .metadata
4379            .get(DICT_SIZE_RATIO_META_KEY)
4380            .and_then(|val| val.parse::<f64>().ok())
4381            .or_else(|| {
4382                env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4383                    .ok()
4384                    .and_then(|val| val.parse().ok())
4385            })
4386            .unwrap_or(0.8);
4387
4388        // Validate size ratio is in valid range
4389        if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4390            panic!(
4391                "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4392                threshold_ratio
4393            );
4394        }
4395
4396        // Get raw data size
4397        let data_size = data_block.data_size();
4398
4399        // Estimate dictionary-encoded size
4400        let Some(encoded_size) = Self::estimate_dict_size(data_block, version) else {
4401            return false;
4402        };
4403
4404        let size_ratio_actual = if data_size > 0 {
4405            encoded_size as f64 / data_size as f64
4406        } else {
4407            return false;
4408        };
4409        size_ratio_actual < threshold_ratio
4410    }
4411
4412    // Creates an encode task, consuming all buffered data
4413    fn do_flush(
4414        &mut self,
4415        arrays: Vec<ArrayRef>,
4416        repdefs: Vec<RepDefBuilder>,
4417        row_number: u64,
4418        num_rows: u64,
4419    ) -> Result<Vec<EncodeTask>> {
4420        let column_idx = self.column_index;
4421        let compression_strategy = self.compression_strategy.clone();
4422        let field = self.field.clone();
4423        let encoding_metadata = self.encoding_metadata.clone();
4424        let support_large_chunk = self.support_large_chunk;
4425        let version = self.version;
4426        let task = spawn_cpu(move || {
4427                let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4428
4429            if num_values == 0 {
4430                // We should not encode empty arrays.  So if we get here that should mean that we
4431                // either have all empty lists or all null lists (or a mix).  We still need to encode
4432                // the rep/def information but we can skip the data encoding.
4433                log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
4434                return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4435            }
4436            let num_nulls = arrays
4437                .iter()
4438                .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4439                .sum::<u64>();
4440
4441            if num_values == num_nulls {
4442                return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4443                    log::debug!(
4444                        "Encoding column {} with {} items ({} rows) using simple-null layout",
4445                        column_idx,
4446                        num_values,
4447                        num_rows
4448                    );
4449                    // Simple case, no rep/def and all nulls, we don't need to encode any data
4450                    Self::encode_simple_all_null(column_idx, num_values, row_number)
4451                } else {
4452                    log::debug!(
4453                        "Encoding column {} with {} items ({} rows) using complex-null layout",
4454                        column_idx,
4455                        num_values,
4456                        num_rows
4457                    );
4458                    // If we get here then we have definition levels and we need to store those
4459                    Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4460                };
4461            }
4462
4463            if let DataType::Struct(fields) = &field.data_type() {
4464                if fields.is_empty() {
4465                    if repdefs.iter().any(|rd| !rd.is_empty()) {
4466                        return Err(Error::InvalidInput { source: format!("Empty structs with rep/def information are not yet supported.  The field {} is an empty struct that either has nulls or is in a list.", field.name).into(), location: location!() });
4467                    }
4468                    // This is maybe a little confusing but the reader should never look at this anyways and it
4469                    // seems like overkill to invent a new layout just for "empty structs".
4470                    return Self::encode_simple_all_null(column_idx, num_values, row_number);
4471                }
4472            }
4473
4474            let data_block = DataBlock::from_arrays(&arrays, num_values);
4475
4476            let requires_full_zip_packed_struct =
4477                if let DataBlock::Struct(ref struct_data_block) = data_block {
4478                    struct_data_block.has_variable_width_child()
4479                } else {
4480                    false
4481                };
4482
4483            if requires_full_zip_packed_struct {
4484                log::debug!(
4485                    "Encoding column {} with {} items using full-zip packed struct layout",
4486                    column_idx,
4487                    num_values
4488                );
4489                return Self::encode_full_zip(
4490                    column_idx,
4491                    &field,
4492                    compression_strategy.as_ref(),
4493                    data_block,
4494                    repdefs,
4495                    row_number,
4496                    num_rows,
4497                );
4498            }
4499
4500            if let DataBlock::Dictionary(dict) = data_block {
4501                log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
4502                let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
4503                // TODO: https://github.com/lancedb/lance/issues/4809
4504                // If we compute stats on dictionary_data_block => panic.
4505                // If we don't compute stats on indices_data_block => panic.
4506                // This is messy.  Don't make me call compute_stat ever.
4507                indices_data_block.compute_stat();
4508                Self::encode_miniblock(
4509                    column_idx,
4510                    &field,
4511                    compression_strategy.as_ref(),
4512                    indices_data_block,
4513                    repdefs,
4514                    row_number,
4515                    Some(dictionary_data_block),
4516                    num_rows,
4517                    support_large_chunk,
4518                )
4519            } else if Self::should_dictionary_encode(&data_block, &field, version) {
4520                log::debug!(
4521                    "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4522                    column_idx,
4523                    num_values
4524                );
4525                let (indices_data_block, dictionary_data_block) =
4526                    dict::dictionary_encode(data_block);
4527                Self::encode_miniblock(
4528                    column_idx,
4529                    &field,
4530                    compression_strategy.as_ref(),
4531                    indices_data_block,
4532                    repdefs,
4533                    row_number,
4534                    Some(dictionary_data_block),
4535                    num_rows,
4536                    support_large_chunk,
4537                )
4538            } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4539                log::debug!(
4540                    "Encoding column {} with {} items using mini-block layout",
4541                    column_idx,
4542                    num_values
4543                );
4544                Self::encode_miniblock(
4545                    column_idx,
4546                    &field,
4547                    compression_strategy.as_ref(),
4548                    data_block,
4549                    repdefs,
4550                    row_number,
4551                    None,
4552                    num_rows,
4553                    support_large_chunk,
4554                )
4555            } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4556                log::debug!(
4557                    "Encoding column {} with {} items using full-zip layout",
4558                    column_idx,
4559                    num_values
4560                );
4561                Self::encode_full_zip(
4562                    column_idx,
4563                    &field,
4564                    compression_strategy.as_ref(),
4565                    data_block,
4566                    repdefs,
4567                    row_number,
4568                    num_rows,
4569                )
4570            } else {
4571                Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4572            }
4573        })
4574        .boxed();
4575        Ok(vec![task])
4576    }
4577
4578    fn extract_validity_buf(
4579        array: Arc<dyn Array>,
4580        repdef: &mut RepDefBuilder,
4581        keep_original_array: bool,
4582    ) -> Result<Arc<dyn Array>> {
4583        if let Some(validity) = array.nulls() {
4584            if keep_original_array {
4585                repdef.add_validity_bitmap(validity.clone());
4586            } else {
4587                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4588            }
4589            let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
4590            Ok(make_array(data_no_nulls))
4591        } else {
4592            repdef.add_no_null(array.len());
4593            Ok(array)
4594        }
4595    }
4596
4597    fn extract_validity(
4598        mut array: Arc<dyn Array>,
4599        repdef: &mut RepDefBuilder,
4600        keep_original_array: bool,
4601    ) -> Result<Arc<dyn Array>> {
4602        match array.data_type() {
4603            DataType::Null => {
4604                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4605                Ok(array)
4606            }
4607            DataType::Dictionary(_, _) => {
4608                array = dict::normalize_dict_nulls(array)?;
4609                Self::extract_validity_buf(array, repdef, keep_original_array)
4610            }
4611            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
4612            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
4613            // NOT want to increase the repdef depth.
4614            //
4615            // This would be quite catasrophic for something like vector embeddings.  Imagine we
4616            // had thousands of vectors and some were null but no vector contained null items.  If
4617            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
4618            // with a rep/def value for every single item in the vector.
4619            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4620        }
4621    }
4622}
4623
4624impl FieldEncoder for PrimitiveStructuralEncoder {
4625    // Buffers data, if there is enough to write a page then we create an encode task
4626    fn maybe_encode(
4627        &mut self,
4628        array: ArrayRef,
4629        _external_buffers: &mut OutOfLineBuffers,
4630        mut repdef: RepDefBuilder,
4631        row_number: u64,
4632        num_rows: u64,
4633    ) -> Result<Vec<EncodeTask>> {
4634        let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
4635        self.accumulated_repdefs.push(repdef);
4636
4637        if let Some((arrays, row_number, num_rows)) =
4638            self.accumulation_queue.insert(array, row_number, num_rows)
4639        {
4640            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4641            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4642        } else {
4643            Ok(vec![])
4644        }
4645    }
4646
4647    // If there is any data left in the buffer then create an encode task from it
4648    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4649        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4650            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4651            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4652        } else {
4653            Ok(vec![])
4654        }
4655    }
4656
4657    fn num_columns(&self) -> u32 {
4658        1
4659    }
4660
4661    fn finish(
4662        &mut self,
4663        _external_buffers: &mut OutOfLineBuffers,
4664    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4665        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4666    }
4667}
4668
4669#[cfg(test)]
4670#[allow(clippy::single_range_in_vec_init)]
4671mod tests {
4672    use super::{
4673        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4674        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4675        FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4676        StructuralPageScheduler,
4677    };
4678    use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
4679    use crate::data::BlockInfo;
4680    use crate::decoder::PageEncoding;
4681    use crate::encodings::logical::primitive::{
4682        ChunkDrainInstructions, PrimitiveStructuralEncoder,
4683    };
4684    use crate::format::pb21;
4685    use crate::format::pb21::compressive_encoding::Compression;
4686    use crate::testing::{check_round_trip_encoding_of_data, TestCases};
4687    use crate::version::LanceFileVersion;
4688    use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array};
4689    use arrow_schema::DataType;
4690    use std::collections::HashMap;
4691    use std::{collections::VecDeque, sync::Arc};
4692
4693    #[test]
4694    fn test_is_narrow() {
4695        let int8_array = Int8Array::from(vec![1, 2, 3]);
4696        let array_ref: ArrayRef = Arc::new(int8_array);
4697        let block = DataBlock::from_array(array_ref);
4698
4699        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4700
4701        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4702        let block = DataBlock::from_array(string_array);
4703        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4704
4705        let string_array = StringArray::from(vec![
4706            Some("hello world".repeat(100)),
4707            Some("world".to_string()),
4708        ]);
4709        let block = DataBlock::from_array(string_array);
4710        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4711    }
4712
4713    #[test]
4714    fn test_map_range() {
4715        // Null in the middle
4716        // [[A, B, C], [D, E], NULL, [F, G, H]]
4717        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4718        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4719        let max_visible_def = 0;
4720        let total_items = 8;
4721        let max_rep = 1;
4722
4723        let check = |range, expected_item_range, expected_level_range| {
4724            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4725                range,
4726                rep.as_ref(),
4727                def.as_ref(),
4728                max_rep,
4729                max_visible_def,
4730                total_items,
4731                PreambleAction::Absent,
4732            );
4733            assert_eq!(item_range, expected_item_range);
4734            assert_eq!(level_range, expected_level_range);
4735        };
4736
4737        check(0..1, 0..3, 0..3);
4738        check(1..2, 3..5, 3..5);
4739        check(2..3, 5..5, 5..6);
4740        check(3..4, 5..8, 6..9);
4741        check(0..2, 0..5, 0..5);
4742        check(1..3, 3..5, 3..6);
4743        check(2..4, 5..8, 5..9);
4744        check(0..3, 0..5, 0..6);
4745        check(1..4, 3..8, 3..9);
4746        check(0..4, 0..8, 0..9);
4747
4748        // Null at start
4749        // [NULL, [A, B], [C]]
4750        let rep = Some(vec![1, 1, 0, 1]);
4751        let def = Some(vec![1, 0, 0, 0]);
4752        let max_visible_def = 0;
4753        let total_items = 3;
4754
4755        let check = |range, expected_item_range, expected_level_range| {
4756            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4757                range,
4758                rep.as_ref(),
4759                def.as_ref(),
4760                max_rep,
4761                max_visible_def,
4762                total_items,
4763                PreambleAction::Absent,
4764            );
4765            assert_eq!(item_range, expected_item_range);
4766            assert_eq!(level_range, expected_level_range);
4767        };
4768
4769        check(0..1, 0..0, 0..1);
4770        check(1..2, 0..2, 1..3);
4771        check(2..3, 2..3, 3..4);
4772        check(0..2, 0..2, 0..3);
4773        check(1..3, 0..3, 1..4);
4774        check(0..3, 0..3, 0..4);
4775
4776        // Null at end
4777        // [[A], [B, C], NULL]
4778        let rep = Some(vec![1, 1, 0, 1]);
4779        let def = Some(vec![0, 0, 0, 1]);
4780        let max_visible_def = 0;
4781        let total_items = 3;
4782
4783        let check = |range, expected_item_range, expected_level_range| {
4784            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4785                range,
4786                rep.as_ref(),
4787                def.as_ref(),
4788                max_rep,
4789                max_visible_def,
4790                total_items,
4791                PreambleAction::Absent,
4792            );
4793            assert_eq!(item_range, expected_item_range);
4794            assert_eq!(level_range, expected_level_range);
4795        };
4796
4797        check(0..1, 0..1, 0..1);
4798        check(1..2, 1..3, 1..3);
4799        check(2..3, 3..3, 3..4);
4800        check(0..2, 0..3, 0..3);
4801        check(1..3, 1..3, 1..4);
4802        check(0..3, 0..3, 0..4);
4803
4804        // No nulls, with repetition
4805        // [[A, B], [C, D], [E, F]]
4806        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4807        let def: Option<&[u16]> = None;
4808        let max_visible_def = 0;
4809        let total_items = 6;
4810
4811        let check = |range, expected_item_range, expected_level_range| {
4812            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4813                range,
4814                rep.as_ref(),
4815                def.as_ref(),
4816                max_rep,
4817                max_visible_def,
4818                total_items,
4819                PreambleAction::Absent,
4820            );
4821            assert_eq!(item_range, expected_item_range);
4822            assert_eq!(level_range, expected_level_range);
4823        };
4824
4825        check(0..1, 0..2, 0..2);
4826        check(1..2, 2..4, 2..4);
4827        check(2..3, 4..6, 4..6);
4828        check(0..2, 0..4, 0..4);
4829        check(1..3, 2..6, 2..6);
4830        check(0..3, 0..6, 0..6);
4831
4832        // No repetition, with nulls (this case is trivial)
4833        // [A, B, NULL, C]
4834        let rep: Option<&[u16]> = None;
4835        let def = Some(vec![0, 0, 1, 0]);
4836        let max_visible_def = 1;
4837        let total_items = 4;
4838
4839        let check = |range, expected_item_range, expected_level_range| {
4840            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4841                range,
4842                rep.as_ref(),
4843                def.as_ref(),
4844                max_rep,
4845                max_visible_def,
4846                total_items,
4847                PreambleAction::Absent,
4848            );
4849            assert_eq!(item_range, expected_item_range);
4850            assert_eq!(level_range, expected_level_range);
4851        };
4852
4853        check(0..1, 0..1, 0..1);
4854        check(1..2, 1..2, 1..2);
4855        check(2..3, 2..3, 2..3);
4856        check(0..2, 0..2, 0..2);
4857        check(1..3, 1..3, 1..3);
4858        check(0..3, 0..3, 0..3);
4859
4860        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
4861        // [[..., A] [B, C], NULL]
4862        //
4863        // What we do will depend on the preamble action
4864        let rep = Some(vec![0, 1, 0, 1]);
4865        let def = Some(vec![0, 0, 0, 1]);
4866        let max_visible_def = 0;
4867        let total_items = 3;
4868
4869        let check = |range, expected_item_range, expected_level_range| {
4870            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4871                range,
4872                rep.as_ref(),
4873                def.as_ref(),
4874                max_rep,
4875                max_visible_def,
4876                total_items,
4877                PreambleAction::Take,
4878            );
4879            assert_eq!(item_range, expected_item_range);
4880            assert_eq!(level_range, expected_level_range);
4881        };
4882
4883        // If we are taking the preamble then the range must start at 0
4884        check(0..1, 0..3, 0..3);
4885        check(0..2, 0..3, 0..4);
4886
4887        let check = |range, expected_item_range, expected_level_range| {
4888            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4889                range,
4890                rep.as_ref(),
4891                def.as_ref(),
4892                max_rep,
4893                max_visible_def,
4894                total_items,
4895                PreambleAction::Skip,
4896            );
4897            assert_eq!(item_range, expected_item_range);
4898            assert_eq!(level_range, expected_level_range);
4899        };
4900
4901        check(0..1, 1..3, 1..3);
4902        check(1..2, 3..3, 3..4);
4903        check(0..2, 1..3, 1..4);
4904
4905        // Another preamble case but now it doesn't end with a new list
4906        // [[..., A], NULL, [D, E]]
4907        //
4908        // What we do will depend on the preamble action
4909        let rep = Some(vec![0, 1, 1, 0]);
4910        let def = Some(vec![0, 1, 0, 0]);
4911        let max_visible_def = 0;
4912        let total_items = 4;
4913
4914        let check = |range, expected_item_range, expected_level_range| {
4915            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4916                range,
4917                rep.as_ref(),
4918                def.as_ref(),
4919                max_rep,
4920                max_visible_def,
4921                total_items,
4922                PreambleAction::Take,
4923            );
4924            assert_eq!(item_range, expected_item_range);
4925            assert_eq!(level_range, expected_level_range);
4926        };
4927
4928        // If we are taking the preamble then the range must start at 0
4929        check(0..1, 0..1, 0..2);
4930        check(0..2, 0..3, 0..4);
4931
4932        let check = |range, expected_item_range, expected_level_range| {
4933            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4934                range,
4935                rep.as_ref(),
4936                def.as_ref(),
4937                max_rep,
4938                max_visible_def,
4939                total_items,
4940                PreambleAction::Skip,
4941            );
4942            assert_eq!(item_range, expected_item_range);
4943            assert_eq!(level_range, expected_level_range);
4944        };
4945
4946        // If we are taking the preamble then the range must start at 0
4947        check(0..1, 1..1, 1..2);
4948        check(1..2, 1..3, 2..4);
4949        check(0..2, 1..3, 1..4);
4950
4951        // Now a preamble case without any definition levels
4952        // [[..., A] [B, C], [D]]
4953        let rep = Some(vec![0, 1, 0, 1]);
4954        let def: Option<Vec<u16>> = None;
4955        let max_visible_def = 0;
4956        let total_items = 4;
4957
4958        let check = |range, expected_item_range, expected_level_range| {
4959            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4960                range,
4961                rep.as_ref(),
4962                def.as_ref(),
4963                max_rep,
4964                max_visible_def,
4965                total_items,
4966                PreambleAction::Take,
4967            );
4968            assert_eq!(item_range, expected_item_range);
4969            assert_eq!(level_range, expected_level_range);
4970        };
4971
4972        // If we are taking the preamble then the range must start at 0
4973        check(0..1, 0..3, 0..3);
4974        check(0..2, 0..4, 0..4);
4975
4976        let check = |range, expected_item_range, expected_level_range| {
4977            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4978                range,
4979                rep.as_ref(),
4980                def.as_ref(),
4981                max_rep,
4982                max_visible_def,
4983                total_items,
4984                PreambleAction::Skip,
4985            );
4986            assert_eq!(item_range, expected_item_range);
4987            assert_eq!(level_range, expected_level_range);
4988        };
4989
4990        check(0..1, 1..3, 1..3);
4991        check(1..2, 3..4, 3..4);
4992        check(0..2, 1..4, 1..4);
4993
4994        // If we have nested lists then non-top level lists may be empty/null
4995        // and we need to make sure we still handle them as invisible items (we
4996        // failed to do this previously)
4997        let rep = Some(vec![2, 1, 2, 0, 1, 2]);
4998        let def = Some(vec![0, 1, 2, 0, 0, 0]);
4999        let max_rep = 2;
5000        let max_visible_def = 0;
5001        let total_items = 4;
5002
5003        let check = |range, expected_item_range, expected_level_range| {
5004            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5005                range,
5006                rep.as_ref(),
5007                def.as_ref(),
5008                max_rep,
5009                max_visible_def,
5010                total_items,
5011                PreambleAction::Absent,
5012            );
5013            assert_eq!(item_range, expected_item_range);
5014            assert_eq!(level_range, expected_level_range);
5015        };
5016
5017        check(0..3, 0..4, 0..6);
5018        check(0..1, 0..1, 0..2);
5019        check(1..2, 1..3, 2..5);
5020        check(2..3, 3..4, 5..6);
5021
5022        // Invisible items in a preamble that we are taking (regressing a previous failure)
5023        let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5024        let def = Some(vec![0, 1, 0, 0, 0, 0]);
5025        let max_rep = 1;
5026        let max_visible_def = 0;
5027        let total_items = 5;
5028
5029        let check = |range, expected_item_range, expected_level_range| {
5030            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5031                range,
5032                rep.as_ref(),
5033                def.as_ref(),
5034                max_rep,
5035                max_visible_def,
5036                total_items,
5037                PreambleAction::Take,
5038            );
5039            assert_eq!(item_range, expected_item_range);
5040            assert_eq!(level_range, expected_level_range);
5041        };
5042
5043        check(0..0, 0..1, 0..2);
5044        check(0..1, 0..3, 0..4);
5045        check(0..2, 0..4, 0..5);
5046
5047        // Skip preamble (with invis items) and skip a few rows (with invis items)
5048        // and then take a few rows but not all the rows
5049        let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5050        let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5051        let max_rep = 1;
5052        let max_visible_def = 0;
5053        let total_items = 5;
5054
5055        let check = |range, expected_item_range, expected_level_range| {
5056            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5057                range,
5058                rep.as_ref(),
5059                def.as_ref(),
5060                max_rep,
5061                max_visible_def,
5062                total_items,
5063                PreambleAction::Skip,
5064            );
5065            assert_eq!(item_range, expected_item_range);
5066            assert_eq!(level_range, expected_level_range);
5067        };
5068
5069        check(2..3, 2..4, 5..7);
5070    }
5071
5072    #[test]
5073    fn test_schedule_instructions() {
5074        // Convert repetition index to bytes for testing
5075        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5076        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5077        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5078
5079        let check = |user_ranges, expected_instructions| {
5080            let instructions =
5081                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5082            assert_eq!(instructions, expected_instructions);
5083        };
5084
5085        // The instructions we expect if we're grabbing the whole range
5086        let expected_take_all = vec![
5087            ChunkInstructions {
5088                chunk_idx: 0,
5089                preamble: PreambleAction::Absent,
5090                rows_to_skip: 0,
5091                rows_to_take: 6,
5092                take_trailer: true,
5093            },
5094            ChunkInstructions {
5095                chunk_idx: 1,
5096                preamble: PreambleAction::Take,
5097                rows_to_skip: 0,
5098                rows_to_take: 2,
5099                take_trailer: false,
5100            },
5101            ChunkInstructions {
5102                chunk_idx: 2,
5103                preamble: PreambleAction::Absent,
5104                rows_to_skip: 0,
5105                rows_to_take: 5,
5106                take_trailer: true,
5107            },
5108            ChunkInstructions {
5109                chunk_idx: 3,
5110                preamble: PreambleAction::Take,
5111                rows_to_skip: 0,
5112                rows_to_take: 1,
5113                take_trailer: false,
5114            },
5115        ];
5116
5117        // Take all as 1 range
5118        check(&[0..14], expected_take_all.clone());
5119
5120        // Take all a individual rows
5121        check(
5122            &[
5123                0..1,
5124                1..2,
5125                2..3,
5126                3..4,
5127                4..5,
5128                5..6,
5129                6..7,
5130                7..8,
5131                8..9,
5132                9..10,
5133                10..11,
5134                11..12,
5135                12..13,
5136                13..14,
5137            ],
5138            expected_take_all,
5139        );
5140
5141        // Test some partial takes
5142
5143        // 2 rows in the same chunk but not contiguous
5144        check(
5145            &[0..1, 3..4],
5146            vec![
5147                ChunkInstructions {
5148                    chunk_idx: 0,
5149                    preamble: PreambleAction::Absent,
5150                    rows_to_skip: 0,
5151                    rows_to_take: 1,
5152                    take_trailer: false,
5153                },
5154                ChunkInstructions {
5155                    chunk_idx: 0,
5156                    preamble: PreambleAction::Absent,
5157                    rows_to_skip: 3,
5158                    rows_to_take: 1,
5159                    take_trailer: false,
5160                },
5161            ],
5162        );
5163
5164        // Taking just a trailer/preamble
5165        check(
5166            &[5..6],
5167            vec![
5168                ChunkInstructions {
5169                    chunk_idx: 0,
5170                    preamble: PreambleAction::Absent,
5171                    rows_to_skip: 5,
5172                    rows_to_take: 1,
5173                    take_trailer: true,
5174                },
5175                ChunkInstructions {
5176                    chunk_idx: 1,
5177                    preamble: PreambleAction::Take,
5178                    rows_to_skip: 0,
5179                    rows_to_take: 0,
5180                    take_trailer: false,
5181                },
5182            ],
5183        );
5184
5185        // Skipping an entire chunk
5186        check(
5187            &[7..10],
5188            vec![
5189                ChunkInstructions {
5190                    chunk_idx: 1,
5191                    preamble: PreambleAction::Skip,
5192                    rows_to_skip: 1,
5193                    rows_to_take: 1,
5194                    take_trailer: false,
5195                },
5196                ChunkInstructions {
5197                    chunk_idx: 2,
5198                    preamble: PreambleAction::Absent,
5199                    rows_to_skip: 0,
5200                    rows_to_take: 2,
5201                    take_trailer: false,
5202                },
5203            ],
5204        );
5205    }
5206
5207    #[test]
5208    fn test_drain_instructions() {
5209        fn drain_from_instructions(
5210            instructions: &mut VecDeque<ChunkInstructions>,
5211            mut rows_desired: u64,
5212            need_preamble: &mut bool,
5213            skip_in_chunk: &mut u64,
5214        ) -> Vec<ChunkDrainInstructions> {
5215            // Note: instructions.len() is an upper bound, we typically take much fewer
5216            let mut drain_instructions = Vec::with_capacity(instructions.len());
5217            while rows_desired > 0 || *need_preamble {
5218                let (next_instructions, consumed_chunk) = instructions
5219                    .front()
5220                    .unwrap()
5221                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5222                if consumed_chunk {
5223                    instructions.pop_front();
5224                }
5225                drain_instructions.push(next_instructions);
5226            }
5227            drain_instructions
5228        }
5229
5230        // Convert repetition index to bytes for testing
5231        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5232        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5233        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5234        let user_ranges = vec![1..7, 10..14];
5235
5236        // First, schedule the ranges
5237        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5238
5239        let mut to_drain = VecDeque::from(scheduled.clone());
5240
5241        // Now we drain in batches of 4
5242
5243        let mut need_preamble = false;
5244        let mut skip_in_chunk = 0;
5245
5246        let next_batch =
5247            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5248
5249        assert!(!need_preamble);
5250        assert_eq!(skip_in_chunk, 4);
5251        assert_eq!(
5252            next_batch,
5253            vec![ChunkDrainInstructions {
5254                chunk_instructions: scheduled[0].clone(),
5255                rows_to_take: 4,
5256                rows_to_skip: 0,
5257                preamble_action: PreambleAction::Absent,
5258            }]
5259        );
5260
5261        let next_batch =
5262            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5263
5264        assert!(!need_preamble);
5265        assert_eq!(skip_in_chunk, 2);
5266
5267        assert_eq!(
5268            next_batch,
5269            vec![
5270                ChunkDrainInstructions {
5271                    chunk_instructions: scheduled[0].clone(),
5272                    rows_to_take: 1,
5273                    rows_to_skip: 4,
5274                    preamble_action: PreambleAction::Absent,
5275                },
5276                ChunkDrainInstructions {
5277                    chunk_instructions: scheduled[1].clone(),
5278                    rows_to_take: 1,
5279                    rows_to_skip: 0,
5280                    preamble_action: PreambleAction::Take,
5281                },
5282                ChunkDrainInstructions {
5283                    chunk_instructions: scheduled[2].clone(),
5284                    rows_to_take: 2,
5285                    rows_to_skip: 0,
5286                    preamble_action: PreambleAction::Absent,
5287                }
5288            ]
5289        );
5290
5291        let next_batch =
5292            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5293
5294        assert!(!need_preamble);
5295        assert_eq!(skip_in_chunk, 0);
5296
5297        assert_eq!(
5298            next_batch,
5299            vec![
5300                ChunkDrainInstructions {
5301                    chunk_instructions: scheduled[2].clone(),
5302                    rows_to_take: 1,
5303                    rows_to_skip: 2,
5304                    preamble_action: PreambleAction::Absent,
5305                },
5306                ChunkDrainInstructions {
5307                    chunk_instructions: scheduled[3].clone(),
5308                    rows_to_take: 1,
5309                    rows_to_skip: 0,
5310                    preamble_action: PreambleAction::Take,
5311                },
5312            ]
5313        );
5314
5315        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
5316        let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
5317        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5318        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5319        let user_ranges = vec![0..28];
5320
5321        // First, schedule the ranges
5322        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5323
5324        let mut to_drain = VecDeque::from(scheduled.clone());
5325
5326        // Drain first chunk and some of second chunk
5327
5328        let mut need_preamble = false;
5329        let mut skip_in_chunk = 0;
5330
5331        let next_batch =
5332            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5333
5334        assert_eq!(
5335            next_batch,
5336            vec![
5337                ChunkDrainInstructions {
5338                    chunk_instructions: scheduled[0].clone(),
5339                    rows_to_take: 6,
5340                    rows_to_skip: 0,
5341                    preamble_action: PreambleAction::Absent,
5342                },
5343                ChunkDrainInstructions {
5344                    chunk_instructions: scheduled[1].clone(),
5345                    rows_to_take: 1,
5346                    rows_to_skip: 0,
5347                    preamble_action: PreambleAction::Take,
5348                },
5349            ]
5350        );
5351
5352        assert!(!need_preamble);
5353        assert_eq!(skip_in_chunk, 1);
5354
5355        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
5356        // we get a drain task to take the preamble of the third chunk (and nothing else)
5357        let next_batch =
5358            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5359
5360        assert_eq!(
5361            next_batch,
5362            vec![
5363                ChunkDrainInstructions {
5364                    chunk_instructions: scheduled[1].clone(),
5365                    rows_to_take: 2,
5366                    rows_to_skip: 1,
5367                    preamble_action: PreambleAction::Skip,
5368                },
5369                ChunkDrainInstructions {
5370                    chunk_instructions: scheduled[2].clone(),
5371                    rows_to_take: 0,
5372                    rows_to_skip: 0,
5373                    preamble_action: PreambleAction::Take,
5374                },
5375            ]
5376        );
5377
5378        assert!(!need_preamble);
5379        assert_eq!(skip_in_chunk, 0);
5380    }
5381
5382    #[tokio::test]
5383    async fn test_fullzip_repetition_index_caching() {
5384        use crate::testing::SimulatedScheduler;
5385
5386        // Simplified FixedPerValueDecompressor for testing
5387        #[derive(Debug)]
5388        struct TestFixedDecompressor;
5389
5390        impl FixedPerValueDecompressor for TestFixedDecompressor {
5391            fn decompress(
5392                &self,
5393                _data: FixedWidthDataBlock,
5394                _num_rows: u64,
5395            ) -> crate::Result<DataBlock> {
5396                unimplemented!("Test decompressor")
5397            }
5398
5399            fn bits_per_value(&self) -> u64 {
5400                32
5401            }
5402        }
5403
5404        // Create test repetition index data
5405        let rows_in_page = 100u64;
5406        let bytes_per_value = 4u64;
5407        let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
5408
5409        // Create mock repetition index data
5410        let mut rep_index_data = Vec::new();
5411        for i in 0..=rows_in_page {
5412            let offset = (i * 100) as u32; // Each row starts at i * 100 bytes
5413            rep_index_data.extend_from_slice(&offset.to_le_bytes());
5414        }
5415
5416        // Simulate storage with the repetition index at position 1000
5417        let mut full_data = vec![0u8; 1000];
5418        full_data.extend_from_slice(&rep_index_data);
5419        full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after
5420
5421        let data = bytes::Bytes::from(full_data);
5422        let io = Arc::new(SimulatedScheduler::new(data));
5423        let _cache = Arc::new(lance_core::cache::LanceCache::with_capacity(1024 * 1024));
5424
5425        // Create FullZipScheduler with repetition index
5426        let mut scheduler = FullZipScheduler {
5427            data_buf_position: 0,
5428            rep_index: Some(FullZipRepIndexDetails {
5429                buf_position: 1000,
5430                bytes_per_value,
5431            }),
5432            priority: 0,
5433            rows_in_page,
5434            bits_per_offset: 32,
5435            details: Arc::new(FullZipDecodeDetails {
5436                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5437                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5438                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5439                max_rep: 0,
5440                max_visible_def: 0,
5441            }),
5442            cached_state: None,
5443            enable_cache: true, // Enable caching for test
5444        };
5445
5446        // First initialization should load and cache the repetition index
5447        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5448        let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5449
5450        // Verify that we got a FullZipCacheableState (not NoCachedPageData)
5451        let is_cached = cached_data1
5452            .clone()
5453            .as_arc_any()
5454            .downcast::<FullZipCacheableState>()
5455            .is_ok();
5456        assert!(
5457            is_cached,
5458            "Expected FullZipCacheableState, got NoCachedPageData"
5459        );
5460
5461        // Load the cached data into the scheduler
5462        scheduler.load(&cached_data1);
5463
5464        // Verify that cached_state is now populated
5465        assert!(
5466            scheduler.cached_state.is_some(),
5467            "cached_state should be populated after load"
5468        );
5469
5470        // Verify the cached data contains the repetition index
5471        let cached_state = scheduler.cached_state.as_ref().unwrap();
5472
5473        // Test that schedule_ranges_rep uses the cached data
5474        let ranges = vec![0..10, 20..30];
5475        let result = scheduler.schedule_ranges_rep(
5476            &ranges,
5477            &io_dyn,
5478            FullZipRepIndexDetails {
5479                buf_position: 1000,
5480                bytes_per_value,
5481            },
5482        );
5483
5484        // The result should be OK (not an error)
5485        assert!(
5486            result.is_ok(),
5487            "schedule_ranges_rep should succeed with cached data"
5488        );
5489
5490        // Second scheduler instance should be able to use the cached data
5491        let mut scheduler2 = FullZipScheduler {
5492            data_buf_position: 0,
5493            rep_index: Some(FullZipRepIndexDetails {
5494                buf_position: 1000,
5495                bytes_per_value,
5496            }),
5497            priority: 0,
5498            rows_in_page,
5499            bits_per_offset: 32,
5500            details: scheduler.details.clone(),
5501            cached_state: None,
5502            enable_cache: true, // Enable caching for test
5503        };
5504
5505        // Load cached data from the first scheduler
5506        scheduler2.load(&cached_data1);
5507        assert!(
5508            scheduler2.cached_state.is_some(),
5509            "Second scheduler should have cached_state after load"
5510        );
5511
5512        // Verify that both schedulers have the same cached data
5513        let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5514        assert!(
5515            Arc::ptr_eq(cached_state, cached_state2),
5516            "Both schedulers should share the same cached data"
5517        );
5518    }
5519
5520    #[tokio::test]
5521    async fn test_fullzip_cache_config_controls_caching() {
5522        use crate::testing::SimulatedScheduler;
5523
5524        // Simplified FixedPerValueDecompressor for testing
5525        #[derive(Debug)]
5526        struct TestFixedDecompressor;
5527
5528        impl FixedPerValueDecompressor for TestFixedDecompressor {
5529            fn decompress(
5530                &self,
5531                _data: FixedWidthDataBlock,
5532                _num_rows: u64,
5533            ) -> crate::Result<DataBlock> {
5534                unimplemented!("Test decompressor")
5535            }
5536
5537            fn bits_per_value(&self) -> u64 {
5538                32
5539            }
5540        }
5541
5542        // Test that enable_cache flag actually controls caching behavior
5543        let rows_in_page = 1000_u64;
5544        let bytes_per_value = 4_u64;
5545
5546        // Create simulated data
5547        let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5548        let value_data = vec![0u8; 4000]; // Dummy value data
5549        let mut full_data = vec![0u8; 1000]; // Padding before rep index
5550        full_data.extend_from_slice(&rep_index_data);
5551        full_data.extend_from_slice(&value_data);
5552
5553        let data = bytes::Bytes::from(full_data);
5554        let io = Arc::new(SimulatedScheduler::new(data));
5555
5556        // Test 1: With caching disabled
5557        let mut scheduler_no_cache = FullZipScheduler {
5558            data_buf_position: 0,
5559            rep_index: Some(FullZipRepIndexDetails {
5560                buf_position: 1000,
5561                bytes_per_value,
5562            }),
5563            priority: 0,
5564            rows_in_page,
5565            bits_per_offset: 32,
5566            details: Arc::new(FullZipDecodeDetails {
5567                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5568                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5569                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5570                max_rep: 0,
5571                max_visible_def: 0,
5572            }),
5573            cached_state: None,
5574            enable_cache: false, // Caching disabled
5575        };
5576
5577        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5578        let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5579
5580        // Should return NoCachedPageData when caching is disabled
5581        assert!(
5582            cached_data
5583                .as_arc_any()
5584                .downcast_ref::<super::NoCachedPageData>()
5585                .is_some(),
5586            "With enable_cache=false, should return NoCachedPageData"
5587        );
5588
5589        // Test 2: With caching enabled
5590        let mut scheduler_with_cache = FullZipScheduler {
5591            data_buf_position: 0,
5592            rep_index: Some(FullZipRepIndexDetails {
5593                buf_position: 1000,
5594                bytes_per_value,
5595            }),
5596            priority: 0,
5597            rows_in_page,
5598            bits_per_offset: 32,
5599            details: Arc::new(FullZipDecodeDetails {
5600                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5601                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5602                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5603                max_rep: 0,
5604                max_visible_def: 0,
5605            }),
5606            cached_state: None,
5607            enable_cache: true, // Caching enabled
5608        };
5609
5610        let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5611
5612        // Should return FullZipCacheableState when caching is enabled
5613        assert!(
5614            cached_data2
5615                .as_arc_any()
5616                .downcast_ref::<super::FullZipCacheableState>()
5617                .is_some(),
5618            "With enable_cache=true, should return FullZipCacheableState"
5619        );
5620    }
5621
5622    /// This test is used to reproduce fuzz test https://github.com/lancedb/lance/issues/4492
5623    #[tokio::test]
5624    async fn test_fuzz_issue_4492_empty_rep_values() {
5625        use lance_datagen::{array, gen_batch, RowCount, Seed};
5626
5627        let seed = 1823859942947654717u64;
5628        let num_rows = 2741usize;
5629
5630        // Generate the exact same data that caused the failure
5631        let batch_gen = gen_batch().with_seed(Seed::from(seed));
5632        let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
5633        let list_generator = array::rand_list_any(base_generator, false);
5634
5635        let batch = batch_gen
5636            .anon_col(list_generator)
5637            .into_batch_rows(RowCount::from(num_rows as u64))
5638            .unwrap();
5639
5640        let list_array = batch.column(0).clone();
5641
5642        // Force miniblock encoding
5643        let mut metadata = HashMap::new();
5644        metadata.insert(
5645            STRUCTURAL_ENCODING_META_KEY.to_string(),
5646            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5647        );
5648
5649        let test_cases = TestCases::default()
5650            .with_min_file_version(LanceFileVersion::V2_1)
5651            .with_batch_size(100)
5652            .with_range(0..num_rows.min(500) as u64)
5653            .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
5654
5655        check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
5656    }
5657
5658    async fn test_minichunk_size_helper(
5659        string_data: Vec<Option<String>>,
5660        minichunk_size: u64,
5661        file_version: LanceFileVersion,
5662    ) {
5663        use crate::constants::MINICHUNK_SIZE_META_KEY;
5664        use crate::testing::{check_round_trip_encoding_of_data, TestCases};
5665        use arrow_array::{ArrayRef, StringArray};
5666        use std::sync::Arc;
5667
5668        let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
5669
5670        let mut metadata = HashMap::new();
5671        metadata.insert(
5672            MINICHUNK_SIZE_META_KEY.to_string(),
5673            minichunk_size.to_string(),
5674        );
5675        metadata.insert(
5676            STRUCTURAL_ENCODING_META_KEY.to_string(),
5677            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5678        );
5679
5680        let test_cases = TestCases::default()
5681            .with_min_file_version(file_version)
5682            .with_batch_size(1000);
5683
5684        check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
5685    }
5686
5687    #[tokio::test]
5688    async fn test_minichunk_size_roundtrip() {
5689        // Test that minichunk size can be configured and works correctly in round-trip encoding
5690        let mut string_data = Vec::new();
5691        for i in 0..100 {
5692            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
5693        }
5694        // configure minichunk size to 64 bytes (smaller than the default 4kb) for Lance 2.1
5695        test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
5696    }
5697
5698    #[tokio::test]
5699    async fn test_minichunk_size_128kb_v2_2() {
5700        // Test that minichunk size can be configured to 128KB and works correctly with Lance 2.2
5701        let mut string_data = Vec::new();
5702        // create a 500kb string array
5703        for i in 0..10000 {
5704            string_data.push(Some(format!("test_string_{}", i).repeat(50)));
5705        }
5706        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
5707    }
5708
5709    #[tokio::test]
5710    async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
5711        let mut string_data = Vec::new();
5712        // 128kb/chunk / 6 bytes (t_9999) = 21845 > max 4096 items per chunk
5713        for i in 0..10000 {
5714            string_data.push(Some(format!("t_{}", i)));
5715        }
5716        test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
5717    }
5718
5719    #[tokio::test]
5720    async fn test_large_dictionary_general_compression() {
5721        use arrow_array::{ArrayRef, StringArray};
5722        use std::collections::HashMap;
5723        use std::sync::Arc;
5724
5725        // Create large string dictionary data (>32KiB) with low cardinality
5726        // Use 100 unique strings, each 500 bytes long = 50KB dictionary
5727        let unique_values: Vec<String> = (0..100)
5728            .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
5729            .collect();
5730
5731        // Repeat these strings many times to create a large array
5732        let repeated_strings: Vec<_> = unique_values
5733            .iter()
5734            .cycle()
5735            .take(100_000)
5736            .map(|s| Some(s.as_str()))
5737            .collect();
5738
5739        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
5740
5741        // Configure test to use V2_2 and verify encoding
5742        let test_cases = TestCases::default()
5743            .with_min_file_version(LanceFileVersion::V2_2)
5744            .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
5745                assert_eq!(cols.len(), 1);
5746                let col = &cols[0];
5747
5748                // Navigate to the dictionary encoding in the page layout
5749                if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) {
5750                    // Check that dictionary is wrapped with general compression
5751                    if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout {
5752                        if let Some(dictionary_encoding) = &mini_block.dictionary {
5753                            match dictionary_encoding.compression.as_ref() {
5754                                Some(Compression::General(general)) => {
5755                                    // Verify it's using LZ4 or Zstd
5756                                    let compression = general.compression.as_ref().unwrap();
5757                                    assert!(
5758                                        compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4
5759                                        || compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd,
5760                                        "Expected LZ4 or Zstd compression for large dictionary"
5761                                    );
5762                                }
5763                                _ => panic!("Expected General compression for large dictionary"),
5764                            }
5765                        }
5766                    }
5767                }
5768            }));
5769
5770        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
5771    }
5772
5773    #[tokio::test]
5774    async fn test_dictionary_encode_int64() {
5775        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
5776        use crate::testing::{check_round_trip_encoding_of_data, TestCases};
5777        use crate::version::LanceFileVersion;
5778        use arrow_array::{ArrayRef, Int64Array};
5779        use std::collections::HashMap;
5780        use std::sync::Arc;
5781
5782        // Low cardinality with poor RLE opportunity.
5783        let values = (0..1000)
5784            .map(|i| match i % 3 {
5785                0 => 10i64,
5786                1 => 20i64,
5787                _ => 30i64,
5788            })
5789            .collect::<Vec<_>>();
5790        let array = Arc::new(Int64Array::from(values)) as ArrayRef;
5791
5792        let mut metadata = HashMap::new();
5793        metadata.insert(
5794            STRUCTURAL_ENCODING_META_KEY.to_string(),
5795            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5796        );
5797        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
5798
5799        let test_cases = TestCases::default()
5800            .with_min_file_version(LanceFileVersion::V2_2)
5801            .with_batch_size(1000)
5802            .with_range(0..1000)
5803            .with_indices(vec![0, 1, 10, 999])
5804            .with_expected_encoding("dictionary");
5805
5806        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
5807    }
5808
5809    #[tokio::test]
5810    async fn test_dictionary_encode_float64() {
5811        use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
5812        use crate::testing::{check_round_trip_encoding_of_data, TestCases};
5813        use crate::version::LanceFileVersion;
5814        use arrow_array::{ArrayRef, Float64Array};
5815        use std::collections::HashMap;
5816        use std::sync::Arc;
5817
5818        // Low cardinality with poor RLE opportunity.
5819        let values = (0..1000)
5820            .map(|i| match i % 3 {
5821                0 => 0.1f64,
5822                1 => 0.2f64,
5823                _ => 0.3f64,
5824            })
5825            .collect::<Vec<_>>();
5826        let array = Arc::new(Float64Array::from(values)) as ArrayRef;
5827
5828        let mut metadata = HashMap::new();
5829        metadata.insert(
5830            STRUCTURAL_ENCODING_META_KEY.to_string(),
5831            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5832        );
5833        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
5834
5835        let test_cases = TestCases::default()
5836            .with_min_file_version(LanceFileVersion::V2_2)
5837            .with_batch_size(1000)
5838            .with_range(0..1000)
5839            .with_indices(vec![0, 1, 10, 999])
5840            .with_expected_encoding("dictionary");
5841
5842        check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
5843    }
5844
5845    // Dictionary encoding decision tests
5846    /// Helper to create FixedWidth test data block with exact cardinality stat injected
5847    /// to ensure consistent test behavior (avoids HLL estimation error)
5848    fn create_test_fixed_data_block(
5849        num_values: u64,
5850        cardinality: u64,
5851        bits_per_value: u64,
5852    ) -> DataBlock {
5853        use crate::statistics::Stat;
5854
5855        let block_info = BlockInfo::default();
5856
5857        // Manually inject exact cardinality stat for consistent test behavior
5858        let cardinality_array = Arc::new(UInt64Array::from(vec![cardinality]));
5859        block_info
5860            .0
5861            .write()
5862            .unwrap()
5863            .insert(Stat::Cardinality, cardinality_array);
5864
5865        assert_eq!(bits_per_value % 8, 0);
5866        let bytes_per_value = bits_per_value / 8;
5867        DataBlock::FixedWidth(FixedWidthDataBlock {
5868            bits_per_value,
5869            data: crate::buffer::LanceBuffer::from(vec![
5870                0u8;
5871                (num_values * bytes_per_value) as usize
5872            ]),
5873            num_values,
5874            block_info,
5875        })
5876    }
5877
5878    /// Helper to create VariableWidth (string) test data block with exact cardinality
5879    fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
5880        use crate::statistics::Stat;
5881        use arrow_array::StringArray;
5882
5883        assert!(cardinality <= num_values && cardinality > 0);
5884
5885        let mut values = Vec::with_capacity(num_values as usize);
5886        for i in 0..num_values {
5887            values.push(format!("value_{:016}", i % cardinality));
5888        }
5889
5890        let array = StringArray::from(values);
5891        let block = DataBlock::from_array(Arc::new(array) as ArrayRef);
5892
5893        // Manually inject stats for consistent test behavior
5894        if let DataBlock::VariableWidth(ref var_block) = block {
5895            let mut info = var_block.block_info.0.write().unwrap();
5896            // Cardinality: exact value to avoid HLL estimation error
5897            info.insert(
5898                Stat::Cardinality,
5899                Arc::new(UInt64Array::from(vec![cardinality])),
5900            );
5901        }
5902
5903        block
5904    }
5905
5906    #[test]
5907    fn test_estimate_dict_size_fixed_width() {
5908        use crate::encodings::logical::primitive::dict::DICT_INDICES_BITS_PER_VALUE;
5909
5910        let bits_per_value = 128;
5911        let block = create_test_fixed_data_block(1000, 400, bits_per_value);
5912        let estimated_size =
5913            PrimitiveStructuralEncoder::estimate_dict_size(&block, LanceFileVersion::V2_1).unwrap();
5914
5915        // Dictionary: 400 * 16 bytes (128-bit values)
5916        // Indices: 1000 * 4 bytes (32-bit i32)
5917        let expected_dict_size = 400 * (bits_per_value / 8);
5918        let expected_indices_size = 1000 * (DICT_INDICES_BITS_PER_VALUE / 8);
5919        let expected_total = expected_dict_size + expected_indices_size;
5920
5921        assert_eq!(estimated_size, expected_total);
5922    }
5923
5924    #[test]
5925    fn test_estimate_dict_size_variable_width() {
5926        let block = create_test_variable_width_block(1000, 400);
5927        let estimated_size =
5928            PrimitiveStructuralEncoder::estimate_dict_size(&block, LanceFileVersion::V2_1).unwrap();
5929
5930        // Get actual data size
5931        let data_size = block.data_size();
5932        let avg_value_size = data_size / 1000;
5933
5934        let expected = 400 * avg_value_size + 400 * 4 + 1000 * 4;
5935
5936        assert_eq!(estimated_size, expected);
5937    }
5938
5939    #[test]
5940    fn test_should_dictionary_encode() {
5941        use crate::constants::DICT_SIZE_RATIO_META_KEY;
5942        use lance_core::datatypes::Field as LanceField;
5943
5944        // Create data where dict encoding saves space
5945        let block = create_test_variable_width_block(1000, 10);
5946
5947        let mut metadata = HashMap::new();
5948        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5949        let arrow_field =
5950            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5951        let field = LanceField::try_from(&arrow_field).unwrap();
5952
5953        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
5954            &block,
5955            &field,
5956            LanceFileVersion::V2_1,
5957        );
5958
5959        assert!(result, "Should use dictionary encode based on size");
5960    }
5961
5962    #[test]
5963    fn test_should_not_dictionary_encode() {
5964        use crate::constants::DICT_SIZE_RATIO_META_KEY;
5965        use lance_core::datatypes::Field as LanceField;
5966
5967        let block = create_test_fixed_data_block(1000, 1000, 128);
5968
5969        let mut metadata = HashMap::new();
5970        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5971        let arrow_field =
5972            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5973        let field = LanceField::try_from(&arrow_field).unwrap();
5974
5975        let result = PrimitiveStructuralEncoder::should_dictionary_encode(
5976            &block,
5977            &field,
5978            LanceFileVersion::V2_1,
5979        );
5980
5981        assert!(!result, "Should not use dictionary encode based on size");
5982    }
5983}