lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use crate::{
16    constants::{
17        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18    },
19    data::DictionaryDataBlock,
20    encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21    format::{
22        pb21::{self, compressive_encoding::Compression, CompressiveEncoding, PageLayout},
23        ProtobufUtils21,
24    },
25};
26use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
27use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
30use itertools::Itertools;
31use lance_arrow::deepcopy::deep_copy_nulls;
32use lance_core::{
33    cache::{CacheKey, Context, DeepSizeOf},
34    error::{Error, LanceOptionExt},
35    utils::bit::pad_bytes,
36};
37use log::trace;
38use snafu::location;
39
40use crate::{
41    compression::{
42        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
43    },
44    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
45    utils::bytepack::BytepackedIntegerEncoder,
46};
47use crate::{
48    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
49    encodings::logical::primitive::fullzip::PerValueDataBlock,
50};
51use crate::{
52    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
53};
54use crate::{
55    encodings::logical::primitive::miniblock::MiniBlockCompressed,
56    statistics::{ComputeStat, GetStat, Stat},
57};
58use crate::{
59    repdef::{
60        build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
61        ControlWordParser, DefinitionInterpretation, RepDefSlicer,
62    },
63    utils::accumulation::AccumulationQueue,
64};
65use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
66
67use crate::constants::DICT_SIZE_RATIO_META_KEY;
68use crate::encodings::logical::primitive::dict::{
69    DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
70};
71use crate::{
72    buffer::LanceBuffer,
73    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
74    decoder::{
75        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
76        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
77        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
78        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
79    },
80    encoder::{
81        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
82    },
83    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
84    EncodingsIo,
85};
86
87pub mod blob;
88pub mod dict;
89pub mod fullzip;
90pub mod miniblock;
91
92const FILL_BYTE: u8 = 0xFE;
93
94struct PageLoadTask {
95    decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
96    num_rows: u64,
97}
98
99/// A trait for figuring out how to schedule the data within
100/// a single page.
101trait StructuralPageScheduler: std::fmt::Debug + Send {
102    /// Fetches any metadata required for the page
103    fn initialize<'a>(
104        &'a mut self,
105        io: &Arc<dyn EncodingsIo>,
106    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
107    /// Loads metadata from a previous initialize call
108    fn load(&mut self, data: &Arc<dyn CachedPageData>);
109    /// Schedules the read of the given ranges in the page
110    ///
111    /// The read may be split into multiple "shards" if the page is extremely large.
112    /// Each shard maps to one or more rows and can be decoded independently.
113    ///
114    /// Note: this sharding is for splitting up very large pages into smaller reads to
115    /// avoid buffering too much data in memory.  It is not related to the batch size or
116    /// compute units in any way.
117    fn schedule_ranges(
118        &self,
119        ranges: &[Range<u64>],
120        io: &Arc<dyn EncodingsIo>,
121    ) -> Result<Vec<PageLoadTask>>;
122}
123
124/// Metadata describing the decoded size of a mini-block
125#[derive(Debug)]
126struct ChunkMeta {
127    num_values: u64,
128    chunk_size_bytes: u64,
129    offset_bytes: u64,
130}
131
132/// A mini-block chunk that has been decoded and decompressed
133#[derive(Debug)]
134struct DecodedMiniBlockChunk {
135    rep: Option<ScalarBuffer<u16>>,
136    def: Option<ScalarBuffer<u16>>,
137    values: DataBlock,
138}
139
140/// A task to decode a one or more mini-blocks of data into an output batch
141///
142/// Note: Two batches might share the same mini-block of data.  When this happens
143/// then each batch gets a copy of the block and each batch decodes the block independently.
144///
145/// This means we have duplicated work but it is necessary to avoid having to synchronize
146/// the decoding of the block. (TODO: test this theory)
147#[derive(Debug)]
148struct DecodeMiniBlockTask {
149    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
150    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
151    value_decompressor: Arc<dyn MiniBlockDecompressor>,
152    dictionary_data: Option<Arc<DataBlock>>,
153    def_meaning: Arc<[DefinitionInterpretation]>,
154    num_buffers: u64,
155    max_visible_level: u16,
156    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
157}
158
159impl DecodeMiniBlockTask {
160    fn decode_levels(
161        rep_decompressor: &dyn BlockDecompressor,
162        levels: LanceBuffer,
163        num_levels: u16,
164    ) -> Result<ScalarBuffer<u16>> {
165        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
166        let rep = rep.as_fixed_width().unwrap();
167        debug_assert_eq!(rep.num_values, num_levels as u64);
168        debug_assert_eq!(rep.bits_per_value, 16);
169        Ok(rep.data.borrow_to_typed_slice::<u16>())
170    }
171
172    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
173    // values from `level_buf` starting at `offset`.
174    //
175    // We need to handle both the case where `levels` is None (no nulls encountered
176    // yet) and the case where `level_buf` is None (the input we are copying from has
177    // no nulls)
178    fn extend_levels(
179        range: Range<u64>,
180        levels: &mut Option<LevelBuffer>,
181        level_buf: &Option<impl AsRef<[u16]>>,
182        dest_offset: usize,
183    ) {
184        if let Some(level_buf) = level_buf {
185            if levels.is_none() {
186                // This is the first non-empty def buf we've hit, fill in the past
187                // with 0 (valid)
188                let mut new_levels_vec =
189                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
190                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
191                *levels = Some(new_levels_vec);
192            }
193            levels.as_mut().unwrap().extend(
194                level_buf.as_ref()[range.start as usize..range.end as usize]
195                    .iter()
196                    .copied(),
197            );
198        } else if let Some(levels) = levels {
199            let num_values = (range.end - range.start) as usize;
200            // This is an all-valid level_buf but we had nulls earlier and so we
201            // need to materialize it
202            levels.extend(iter::repeat_n(0, num_values));
203        }
204    }
205
206    /// Maps a range of rows to a range of items and a range of levels
207    ///
208    /// If there is no repetition information this just returns the range as-is.
209    ///
210    /// If there is repetition information then we need to do some work to figure out what
211    /// range of items corresponds to the requested range of rows.
212    ///
213    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
214    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
215    ///
216    /// Idx: 0 1 2 3 4 5 6
217    /// Rep: 1 0 0 1 0 1 0
218    ///
219    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
220    ///
221    /// If there are invisible items then we don't count them when calculating the range of items we
222    /// are interested in but we do count them when calculating the range of levels we are interested
223    /// in.  As a result we have to return both the item range (first return value) and the level range
224    /// (second return value).
225    ///
226    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
227    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
228    ///
229    /// Idx: 0 1 2 3 4 5 6 7 8
230    /// Rep: 1 0 0 1 0 1 1 0 0
231    /// Def: 0 0 0 0 0 1 0 0 0
232    /// Itm: 1 2 3 4 5 6 7 8
233    ///
234    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
235    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
236    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
237    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
238    ///
239    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
240    ///
241    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
242    fn map_range(
243        range: Range<u64>,
244        rep: Option<&impl AsRef<[u16]>>,
245        def: Option<&impl AsRef<[u16]>>,
246        max_rep: u16,
247        max_visible_def: u16,
248        // The total number of items (not rows) in the chunk.  This is not quite the same as
249        // rep.len() / def.len() because it doesn't count invisible items
250        total_items: u64,
251        preamble_action: PreambleAction,
252    ) -> (Range<u64>, Range<u64>) {
253        if let Some(rep) = rep {
254            let mut rep = rep.as_ref();
255            // If there is a preamble and we need to skip it then do that first.  The work is the same
256            // whether there is def information or not
257            let mut items_in_preamble = 0_u64;
258            let first_row_start = match preamble_action {
259                PreambleAction::Skip | PreambleAction::Take => {
260                    let first_row_start = if let Some(def) = def.as_ref() {
261                        let mut first_row_start = None;
262                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
263                            if *rep == max_rep {
264                                first_row_start = Some(idx as u64);
265                                break;
266                            }
267                            if *def <= max_visible_def {
268                                items_in_preamble += 1;
269                            }
270                        }
271                        first_row_start
272                    } else {
273                        let first_row_start =
274                            rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
275                        items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
276                        first_row_start
277                    };
278                    // It is possible for a chunk to be entirely partial values but if it is then it
279                    // should never show up as a preamble to skip
280                    if first_row_start.is_none() {
281                        assert!(preamble_action == PreambleAction::Take);
282                        return (0..total_items, 0..rep.len() as u64);
283                    }
284                    let first_row_start = first_row_start.unwrap();
285                    rep = &rep[first_row_start as usize..];
286                    first_row_start
287                }
288                PreambleAction::Absent => {
289                    debug_assert!(rep[0] == max_rep);
290                    0
291                }
292            };
293
294            // We hit this case when all we needed was the preamble
295            if range.start == range.end {
296                debug_assert!(preamble_action == PreambleAction::Take);
297                debug_assert!(items_in_preamble <= total_items);
298                return (0..items_in_preamble, 0..first_row_start);
299            }
300            assert!(range.start < range.end);
301
302            let mut rows_seen = 0;
303            let mut new_start = 0;
304            let mut new_levels_start = 0;
305
306            if let Some(def) = def {
307                let def = &def.as_ref()[first_row_start as usize..];
308
309                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
310                let mut lead_invis_seen = 0;
311
312                if range.start > 0 {
313                    if def[0] > max_visible_def {
314                        lead_invis_seen += 1;
315                    }
316                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
317                        if *rep == max_rep {
318                            rows_seen += 1;
319                            if rows_seen == range.start {
320                                new_start = idx as u64 + 1 - lead_invis_seen;
321                                new_levels_start = idx as u64 + 1;
322                                break;
323                            }
324                        }
325                        if *def > max_visible_def {
326                            lead_invis_seen += 1;
327                        }
328                    }
329                }
330
331                rows_seen += 1;
332
333                let mut new_end = u64::MAX;
334                let mut new_levels_end = rep.len() as u64;
335                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
336                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
337                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
338                    .iter()
339                    .zip(&def[(new_levels_start + 1) as usize..])
340                    .enumerate()
341                {
342                    if *rep == max_rep {
343                        rows_seen += 1;
344                        if rows_seen == range.end + 1 {
345                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
346                            new_levels_end = idx as u64 + new_levels_start + 1;
347                            break;
348                        }
349                    }
350                    if *def > max_visible_def {
351                        tail_invis_seen += 1;
352                    }
353                }
354
355                if new_end == u64::MAX {
356                    new_levels_end = rep.len() as u64;
357                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
358                    new_end = rep.len() as u64 - total_invis_seen;
359                }
360
361                assert_ne!(new_end, u64::MAX);
362
363                // Adjust for any skipped preamble
364                if preamble_action == PreambleAction::Skip {
365                    new_start += items_in_preamble;
366                    new_end += items_in_preamble;
367                    new_levels_start += first_row_start;
368                    new_levels_end += first_row_start;
369                } else if preamble_action == PreambleAction::Take {
370                    debug_assert_eq!(new_start, 0);
371                    debug_assert_eq!(new_levels_start, 0);
372                    new_end += items_in_preamble;
373                    new_levels_end += first_row_start;
374                }
375
376                debug_assert!(new_end <= total_items);
377                (new_start..new_end, new_levels_start..new_levels_end)
378            } else {
379                // Easy case, there are no invisible items, so we don't need to check for them
380                // The items range and levels range will be the same.  We do still need to walk
381                // the rep levels to find the row boundaries
382
383                // range.start == 0 always maps to 0, otherwise we need to walk
384                if range.start > 0 {
385                    for (idx, rep) in rep.iter().skip(1).enumerate() {
386                        if *rep == max_rep {
387                            rows_seen += 1;
388                            if rows_seen == range.start {
389                                new_start = idx as u64 + 1;
390                                break;
391                            }
392                        }
393                    }
394                }
395                let mut new_end = rep.len() as u64;
396                // range.end == max_items always maps to rep.len(), otherwise we need to walk
397                if range.end < total_items {
398                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
399                        if *rep == max_rep {
400                            rows_seen += 1;
401                            if rows_seen == range.end {
402                                new_end = idx as u64 + new_start + 1;
403                                break;
404                            }
405                        }
406                    }
407                }
408
409                // Adjust for any skipped preamble
410                if preamble_action == PreambleAction::Skip {
411                    new_start += first_row_start;
412                    new_end += first_row_start;
413                } else if preamble_action == PreambleAction::Take {
414                    debug_assert_eq!(new_start, 0);
415                    new_end += first_row_start;
416                }
417
418                debug_assert!(new_end <= total_items);
419                (new_start..new_end, new_start..new_end)
420            }
421        } else {
422            // No repetition info, easy case, just use the range as-is and the item
423            // and level ranges are the same
424            (range.clone(), range)
425        }
426    }
427
428    // Unserialize a miniblock into a collection of vectors
429    fn decode_miniblock_chunk(
430        &self,
431        buf: &LanceBuffer,
432        items_in_chunk: u64,
433    ) -> Result<DecodedMiniBlockChunk> {
434        let mut offset = 0;
435        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
436        offset += 2;
437
438        let rep_size = if self.rep_decompressor.is_some() {
439            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
440            offset += 2;
441            Some(rep_size)
442        } else {
443            None
444        };
445        let def_size = if self.def_decompressor.is_some() {
446            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
447            offset += 2;
448            Some(def_size)
449        } else {
450            None
451        };
452        let buffer_sizes = (0..self.num_buffers)
453            .map(|_| {
454                let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
455                offset += 2;
456                size
457            })
458            .collect::<Vec<_>>();
459
460        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
461
462        let rep = rep_size.map(|rep_size| {
463            let rep = buf.slice_with_length(offset, rep_size as usize);
464            offset += rep_size as usize;
465            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
466            rep
467        });
468
469        let def = def_size.map(|def_size| {
470            let def = buf.slice_with_length(offset, def_size as usize);
471            offset += def_size as usize;
472            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
473            def
474        });
475
476        let buffers = buffer_sizes
477            .into_iter()
478            .map(|buf_size| {
479                let buf = buf.slice_with_length(offset, buf_size as usize);
480                offset += buf_size as usize;
481                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
482                buf
483            })
484            .collect::<Vec<_>>();
485
486        let values = self
487            .value_decompressor
488            .decompress(buffers, items_in_chunk)?;
489
490        let rep = rep
491            .map(|rep| {
492                Self::decode_levels(
493                    self.rep_decompressor.as_ref().unwrap().as_ref(),
494                    rep,
495                    num_levels,
496                )
497            })
498            .transpose()?;
499        let def = def
500            .map(|def| {
501                Self::decode_levels(
502                    self.def_decompressor.as_ref().unwrap().as_ref(),
503                    def,
504                    num_levels,
505                )
506            })
507            .transpose()?;
508
509        Ok(DecodedMiniBlockChunk { rep, def, values })
510    }
511}
512
513impl DecodePageTask for DecodeMiniBlockTask {
514    fn decode(self: Box<Self>) -> Result<DecodedPage> {
515        // First, we create output buffers for the rep and def and data
516        let mut repbuf: Option<LevelBuffer> = None;
517        let mut defbuf: Option<LevelBuffer> = None;
518
519        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
520
521        // This is probably an over-estimate but it's quick and easy to calculate
522        let estimated_size_bytes = self
523            .instructions
524            .iter()
525            .map(|(_, chunk)| chunk.data.len())
526            .sum::<usize>()
527            * 2;
528        let mut data_builder =
529            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
530
531        // We need to keep track of the offset into repbuf/defbuf that we are building up
532        let mut level_offset = 0;
533        // Now we iterate through each instruction and process it
534        for (instructions, chunk) in self.instructions.iter() {
535            // TODO: It's very possible that we have duplicate `buf` in self.instructions and we
536            // don't want to decode the buf again and again on the same thread.
537
538            let DecodedMiniBlockChunk { rep, def, values } =
539                self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
540
541            // Our instructions tell us which rows we want to take from this chunk
542            let row_range_start =
543                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
544            let row_range_end = row_range_start + instructions.rows_to_take;
545
546            // We use the rep info to map the row range to an item range / levels range
547            let (item_range, level_range) = Self::map_range(
548                row_range_start..row_range_end,
549                rep.as_ref(),
550                def.as_ref(),
551                max_rep,
552                self.max_visible_level,
553                chunk.items_in_chunk,
554                instructions.preamble_action,
555            );
556            if item_range.end - item_range.start > chunk.items_in_chunk {
557                return Err(lance_core::Error::Internal {
558                    message: format!(
559                        "Item range {:?} is greater than chunk items in chunk {:?}",
560                        item_range, chunk.items_in_chunk
561                    ),
562                    location: location!(),
563                });
564            }
565
566            // Now we append the data to the output buffers
567            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
568            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
569            level_offset += (level_range.end - level_range.start) as usize;
570            data_builder.append(&values, item_range);
571        }
572
573        let mut data = data_builder.finish();
574
575        let unraveler =
576            RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
577
578        if let Some(dictionary) = &self.dictionary_data {
579            // Don't decode here, that happens later (if needed)
580            let DataBlock::FixedWidth(indices) = data else {
581                return Err(lance_core::Error::Internal {
582                    message: format!(
583                        "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
584                        data
585                    ),
586                    location: location!(),
587                });
588            };
589            data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
590                indices,
591                dictionary.as_ref().clone(),
592            ));
593        }
594
595        Ok(DecodedPage {
596            data,
597            repdef: unraveler,
598        })
599    }
600}
601
602/// A chunk that has been loaded by the miniblock scheduler (but not
603/// yet decoded)
604#[derive(Debug)]
605struct LoadedChunk {
606    data: LanceBuffer,
607    items_in_chunk: u64,
608    byte_range: Range<u64>,
609    chunk_idx: usize,
610}
611
612impl Clone for LoadedChunk {
613    fn clone(&self) -> Self {
614        Self {
615            // Safe as we always create borrowed buffers here
616            data: self.data.clone(),
617            items_in_chunk: self.items_in_chunk,
618            byte_range: self.byte_range.clone(),
619            chunk_idx: self.chunk_idx,
620        }
621    }
622}
623
624/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
625/// details on the different layouts.
626#[derive(Debug)]
627struct MiniBlockDecoder {
628    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
629    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
630    value_decompressor: Arc<dyn MiniBlockDecompressor>,
631    def_meaning: Arc<[DefinitionInterpretation]>,
632    loaded_chunks: VecDeque<LoadedChunk>,
633    instructions: VecDeque<ChunkInstructions>,
634    offset_in_current_chunk: u64,
635    num_rows: u64,
636    num_buffers: u64,
637    dictionary: Option<Arc<DataBlock>>,
638}
639
640/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
641/// process for miniblock encoded data.
642impl StructuralPageDecoder for MiniBlockDecoder {
643    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
644        let mut items_desired = num_rows;
645        let mut need_preamble = false;
646        let mut skip_in_chunk = self.offset_in_current_chunk;
647        let mut drain_instructions = Vec::new();
648        while items_desired > 0 || need_preamble {
649            let (instructions, consumed) = self
650                .instructions
651                .front()
652                .unwrap()
653                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
654
655            while self.loaded_chunks.front().unwrap().chunk_idx
656                != instructions.chunk_instructions.chunk_idx
657            {
658                self.loaded_chunks.pop_front();
659            }
660            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
661            if consumed {
662                self.instructions.pop_front();
663            }
664        }
665        // We can throw away need_preamble here because it must be false.  If it were true it would mean
666        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
667        self.offset_in_current_chunk = skip_in_chunk;
668
669        let max_visible_level = self
670            .def_meaning
671            .iter()
672            .take_while(|l| !l.is_list())
673            .map(|l| l.num_def_levels())
674            .sum::<u16>();
675
676        Ok(Box::new(DecodeMiniBlockTask {
677            instructions: drain_instructions,
678            def_decompressor: self.def_decompressor.clone(),
679            rep_decompressor: self.rep_decompressor.clone(),
680            value_decompressor: self.value_decompressor.clone(),
681            dictionary_data: self.dictionary.clone(),
682            def_meaning: self.def_meaning.clone(),
683            num_buffers: self.num_buffers,
684            max_visible_level,
685        }))
686    }
687
688    fn num_rows(&self) -> u64 {
689        self.num_rows
690    }
691}
692
693#[derive(Debug)]
694struct CachedComplexAllNullState {
695    rep: Option<ScalarBuffer<u16>>,
696    def: Option<ScalarBuffer<u16>>,
697}
698
699impl DeepSizeOf for CachedComplexAllNullState {
700    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
701        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
702            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
703    }
704}
705
706impl CachedPageData for CachedComplexAllNullState {
707    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
708        self
709    }
710}
711
712/// A scheduler for all-null data that has repetition and definition levels
713///
714/// We still need to do some I/O in this case because we need to figure out what kind of null we
715/// are dealing with (null list, null struct, what level null struct, etc.)
716///
717/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
718/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
719/// and fast so...maybe not :)
720#[derive(Debug)]
721pub struct ComplexAllNullScheduler {
722    // Set from protobuf
723    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
724    def_meaning: Arc<[DefinitionInterpretation]>,
725    repdef: Option<Arc<CachedComplexAllNullState>>,
726    max_visible_level: u16,
727}
728
729impl ComplexAllNullScheduler {
730    pub fn new(
731        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
732        def_meaning: Arc<[DefinitionInterpretation]>,
733    ) -> Self {
734        let max_visible_level = def_meaning
735            .iter()
736            .take_while(|l| !l.is_list())
737            .map(|l| l.num_def_levels())
738            .sum::<u16>();
739        Self {
740            buffer_offsets_and_sizes,
741            def_meaning,
742            repdef: None,
743            max_visible_level,
744        }
745    }
746}
747
748impl StructuralPageScheduler for ComplexAllNullScheduler {
749    fn initialize<'a>(
750        &'a mut self,
751        io: &Arc<dyn EncodingsIo>,
752    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
753        // Fully load the rep & def buffers, as needed
754        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
755        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
756        let has_rep = rep_size > 0;
757        let has_def = def_size > 0;
758
759        let mut reads = Vec::with_capacity(2);
760        if has_rep {
761            reads.push(rep_pos..rep_pos + rep_size);
762        }
763        if has_def {
764            reads.push(def_pos..def_pos + def_size);
765        }
766
767        let data = io.submit_request(reads, 0);
768
769        async move {
770            let data = data.await?;
771            let mut data_iter = data.into_iter();
772
773            let rep = if has_rep {
774                let rep = data_iter.next().unwrap();
775                let rep = LanceBuffer::from_bytes(rep, 2);
776                let rep = rep.borrow_to_typed_slice::<u16>();
777                Some(rep)
778            } else {
779                None
780            };
781
782            let def = if has_def {
783                let def = data_iter.next().unwrap();
784                let def = LanceBuffer::from_bytes(def, 2);
785                let def = def.borrow_to_typed_slice::<u16>();
786                Some(def)
787            } else {
788                None
789            };
790
791            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
792
793            self.repdef = Some(repdef.clone());
794
795            Ok(repdef as Arc<dyn CachedPageData>)
796        }
797        .boxed()
798    }
799
800    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
801        self.repdef = Some(
802            data.clone()
803                .as_arc_any()
804                .downcast::<CachedComplexAllNullState>()
805                .unwrap(),
806        );
807    }
808
809    fn schedule_ranges(
810        &self,
811        ranges: &[Range<u64>],
812        _io: &Arc<dyn EncodingsIo>,
813    ) -> Result<Vec<PageLoadTask>> {
814        let ranges = VecDeque::from_iter(ranges.iter().cloned());
815        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
816        let decoder = Box::new(ComplexAllNullPageDecoder {
817            ranges,
818            rep: self.repdef.as_ref().unwrap().rep.clone(),
819            def: self.repdef.as_ref().unwrap().def.clone(),
820            num_rows,
821            def_meaning: self.def_meaning.clone(),
822            max_visible_level: self.max_visible_level,
823        }) as Box<dyn StructuralPageDecoder>;
824        let page_load_task = PageLoadTask {
825            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
826            num_rows,
827        };
828        Ok(vec![page_load_task])
829    }
830}
831
832#[derive(Debug)]
833pub struct ComplexAllNullPageDecoder {
834    ranges: VecDeque<Range<u64>>,
835    rep: Option<ScalarBuffer<u16>>,
836    def: Option<ScalarBuffer<u16>>,
837    num_rows: u64,
838    def_meaning: Arc<[DefinitionInterpretation]>,
839    max_visible_level: u16,
840}
841
842impl ComplexAllNullPageDecoder {
843    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
844        let mut rows_desired = num_rows;
845        let mut ranges = Vec::with_capacity(self.ranges.len());
846        while rows_desired > 0 {
847            let front = self.ranges.front_mut().unwrap();
848            let avail = front.end - front.start;
849            if avail > rows_desired {
850                ranges.push(front.start..front.start + rows_desired);
851                front.start += rows_desired;
852                rows_desired = 0;
853            } else {
854                ranges.push(self.ranges.pop_front().unwrap());
855                rows_desired -= avail;
856            }
857        }
858        ranges
859    }
860}
861
862impl StructuralPageDecoder for ComplexAllNullPageDecoder {
863    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
864        let drained_ranges = self.drain_ranges(num_rows);
865        Ok(Box::new(DecodeComplexAllNullTask {
866            ranges: drained_ranges,
867            rep: self.rep.clone(),
868            def: self.def.clone(),
869            def_meaning: self.def_meaning.clone(),
870            max_visible_level: self.max_visible_level,
871        }))
872    }
873
874    fn num_rows(&self) -> u64 {
875        self.num_rows
876    }
877}
878
879/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
880/// for the null data.
881#[derive(Debug)]
882pub struct DecodeComplexAllNullTask {
883    ranges: Vec<Range<u64>>,
884    rep: Option<ScalarBuffer<u16>>,
885    def: Option<ScalarBuffer<u16>>,
886    def_meaning: Arc<[DefinitionInterpretation]>,
887    max_visible_level: u16,
888}
889
890impl DecodeComplexAllNullTask {
891    fn decode_level(
892        &self,
893        levels: &Option<ScalarBuffer<u16>>,
894        num_values: u64,
895    ) -> Option<Vec<u16>> {
896        levels.as_ref().map(|levels| {
897            let mut referenced_levels = Vec::with_capacity(num_values as usize);
898            for range in &self.ranges {
899                referenced_levels.extend(
900                    levels[range.start as usize..range.end as usize]
901                        .iter()
902                        .copied(),
903                );
904            }
905            referenced_levels
906        })
907    }
908}
909
910impl DecodePageTask for DecodeComplexAllNullTask {
911    fn decode(self: Box<Self>) -> Result<DecodedPage> {
912        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
913        let rep = self.decode_level(&self.rep, num_values);
914        let def = self.decode_level(&self.def, num_values);
915
916        // If there are definition levels there may be empty / null lists which are not visible
917        // in the items array.  We need to account for that here to figure out how many values
918        // should be in the items array.
919        let num_values = if let Some(def) = &def {
920            def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
921        } else {
922            num_values
923        };
924
925        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
926        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
927        Ok(DecodedPage {
928            data,
929            repdef: unraveler,
930        })
931    }
932}
933
934/// A scheduler for simple all-null data
935///
936/// "simple" all-null data is data that is all null and only has a single level of definition and
937/// no repetition.  We don't need to read any data at all in this case.
938#[derive(Debug, Default)]
939pub struct SimpleAllNullScheduler {}
940
941impl StructuralPageScheduler for SimpleAllNullScheduler {
942    fn initialize<'a>(
943        &'a mut self,
944        _io: &Arc<dyn EncodingsIo>,
945    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
946        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
947    }
948
949    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
950
951    fn schedule_ranges(
952        &self,
953        ranges: &[Range<u64>],
954        _io: &Arc<dyn EncodingsIo>,
955    ) -> Result<Vec<PageLoadTask>> {
956        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
957        let decoder =
958            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
959        let page_load_task = PageLoadTask {
960            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
961            num_rows,
962        };
963        Ok(vec![page_load_task])
964    }
965}
966
967/// A page decode task for all-null data without any
968/// repetition and only a single level of definition
969#[derive(Debug)]
970struct SimpleAllNullDecodePageTask {
971    num_values: u64,
972}
973impl DecodePageTask for SimpleAllNullDecodePageTask {
974    fn decode(self: Box<Self>) -> Result<DecodedPage> {
975        let unraveler = RepDefUnraveler::new(
976            None,
977            Some(vec![1; self.num_values as usize]),
978            Arc::new([DefinitionInterpretation::NullableItem]),
979            self.num_values,
980        );
981        Ok(DecodedPage {
982            data: DataBlock::AllNull(AllNullDataBlock {
983                num_values: self.num_values,
984            }),
985            repdef: unraveler,
986        })
987    }
988}
989
990#[derive(Debug)]
991pub struct SimpleAllNullPageDecoder {
992    num_rows: u64,
993}
994
995impl StructuralPageDecoder for SimpleAllNullPageDecoder {
996    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
997        Ok(Box::new(SimpleAllNullDecodePageTask {
998            num_values: num_rows,
999        }))
1000    }
1001
1002    fn num_rows(&self) -> u64 {
1003        self.num_rows
1004    }
1005}
1006
1007#[derive(Debug, Clone)]
1008struct MiniBlockSchedulerDictionary {
1009    // These come from the protobuf
1010    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1011    dictionary_buf_position_and_size: (u64, u64),
1012    dictionary_data_alignment: u64,
1013    num_dictionary_items: u64,
1014}
1015
1016/// Individual block metadata within a MiniBlock repetition index.
1017#[derive(Debug)]
1018struct MiniBlockRepIndexBlock {
1019    // The index of the first row that starts after the beginning of this block.  If the block
1020    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1021    // then this will be a row that starts in some future block.
1022    first_row: u64,
1023    // The number of rows in the block, including the trailer but not the preamble.
1024    // Can be 0 if the block is entirely preamble
1025    starts_including_trailer: u64,
1026    // Whether the block has a preamble
1027    has_preamble: bool,
1028    // Whether the block has a trailer
1029    has_trailer: bool,
1030}
1031
1032impl DeepSizeOf for MiniBlockRepIndexBlock {
1033    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1034        0
1035    }
1036}
1037
1038/// Repetition index for MiniBlock encoding.
1039///
1040/// Stores block-level offset information to enable efficient random
1041/// access to nested data structures within mini-blocks.
1042#[derive(Debug)]
1043struct MiniBlockRepIndex {
1044    blocks: Vec<MiniBlockRepIndexBlock>,
1045}
1046
1047impl DeepSizeOf for MiniBlockRepIndex {
1048    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1049        self.blocks.deep_size_of_children(context)
1050    }
1051}
1052
1053impl MiniBlockRepIndex {
1054    /// Decode repetition index from chunk metadata using default values.
1055    ///
1056    /// This creates a repetition index where each chunk has no partial values
1057    /// and no trailers, suitable for simple sequential data layouts.
1058    pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1059        let mut blocks = Vec::with_capacity(chunks.len());
1060        let mut offset: u64 = 0;
1061
1062        for c in chunks {
1063            blocks.push(MiniBlockRepIndexBlock {
1064                first_row: offset,
1065                starts_including_trailer: c.num_values,
1066                has_preamble: false,
1067                has_trailer: false,
1068            });
1069
1070            offset += c.num_values;
1071        }
1072
1073        Self { blocks }
1074    }
1075
1076    /// Decode repetition index from raw bytes in little-endian format.
1077    ///
1078    /// The bytes should contain u64 values arranged in groups of `stride` elements,
1079    /// where the first two values of each group represent ends_count and partial_count.
1080    /// Returns an empty index if no bytes are provided.
1081    pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1082        // Convert bytes to u64 slice, handling alignment automatically
1083        let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1084        let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1085        let n = u64_slice.len() / stride;
1086
1087        let mut blocks = Vec::with_capacity(n);
1088        let mut chunk_has_preamble = false;
1089        let mut offset: u64 = 0;
1090
1091        // Extract first two values from each block: ends_count and partial_count
1092        for i in 0..n {
1093            let base_idx = i * stride;
1094            let ends = u64_slice[base_idx];
1095            let partial = u64_slice[base_idx + 1];
1096
1097            let has_trailer = partial > 0;
1098            // Convert branches to arithmetic for better compiler optimization
1099            let starts_including_trailer =
1100                ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1101
1102            blocks.push(MiniBlockRepIndexBlock {
1103                first_row: offset,
1104                starts_including_trailer,
1105                has_preamble: chunk_has_preamble,
1106                has_trailer,
1107            });
1108
1109            chunk_has_preamble = has_trailer;
1110            offset += starts_including_trailer;
1111        }
1112
1113        Self { blocks }
1114    }
1115}
1116
1117/// State that is loaded once and cached for future lookups
1118#[derive(Debug)]
1119struct MiniBlockCacheableState {
1120    /// Metadata that describes each chunk in the page
1121    chunk_meta: Vec<ChunkMeta>,
1122    /// The decoded repetition index
1123    rep_index: MiniBlockRepIndex,
1124    /// The dictionary for the page, if any
1125    dictionary: Option<Arc<DataBlock>>,
1126}
1127
1128impl DeepSizeOf for MiniBlockCacheableState {
1129    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1130        self.rep_index.deep_size_of_children(context)
1131            + self
1132                .dictionary
1133                .as_ref()
1134                .map(|dict| dict.data_size() as usize)
1135                .unwrap_or(0)
1136    }
1137}
1138
1139impl CachedPageData for MiniBlockCacheableState {
1140    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1141        self
1142    }
1143}
1144
1145/// A scheduler for a page that has been encoded with the mini-block layout
1146///
1147/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1148/// in practice.
1149///
1150/// First, during initialization, we load the chunk metadata, the repetition index,
1151/// and the dictionary (these last two may not be present)
1152///
1153/// Then, during scheduling, we use the user's requested row ranges and the repetition
1154/// index to determine which chunks we need and which rows we need from those chunks.
1155///
1156/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1157/// from the user is 40..60 then we need to:
1158///
1159///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1160///    then read 3 items for the 11th row of our range.
1161///  - Read the second chunk and read the remaining items in our 11th row and then read
1162///    the remaining 9 full rows.
1163///
1164/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1165/// The first two decode tasks will just need the first chunk.  The third decode task will
1166/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1167/// chunk.  The final decode task will just need the second chunk.
1168///
1169/// The above prose descriptions are what are represented by [`ChunkInstructions`] and
1170/// [`ChunkDrainInstructions`].
1171#[derive(Debug)]
1172pub struct MiniBlockScheduler {
1173    // These come from the protobuf
1174    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1175    priority: u64,
1176    items_in_page: u64,
1177    repetition_index_depth: u16,
1178    num_buffers: u64,
1179    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1180    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1181    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1182    def_meaning: Arc<[DefinitionInterpretation]>,
1183    dictionary: Option<MiniBlockSchedulerDictionary>,
1184    // This is set after initialization
1185    page_meta: Option<Arc<MiniBlockCacheableState>>,
1186}
1187
1188impl MiniBlockScheduler {
1189    fn try_new(
1190        buffer_offsets_and_sizes: &[(u64, u64)],
1191        priority: u64,
1192        items_in_page: u64,
1193        layout: &pb21::MiniBlockLayout,
1194        decompressors: &dyn DecompressionStrategy,
1195    ) -> Result<Self> {
1196        let rep_decompressor = layout
1197            .rep_compression
1198            .as_ref()
1199            .map(|rep_compression| {
1200                decompressors
1201                    .create_block_decompressor(rep_compression)
1202                    .map(Arc::from)
1203            })
1204            .transpose()?;
1205        let def_decompressor = layout
1206            .def_compression
1207            .as_ref()
1208            .map(|def_compression| {
1209                decompressors
1210                    .create_block_decompressor(def_compression)
1211                    .map(Arc::from)
1212            })
1213            .transpose()?;
1214        let def_meaning = layout
1215            .layers
1216            .iter()
1217            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1218            .collect::<Vec<_>>();
1219        let value_decompressor = decompressors.create_miniblock_decompressor(
1220            layout.value_compression.as_ref().unwrap(),
1221            decompressors,
1222        )?;
1223
1224        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1225            let num_dictionary_items = layout.num_dictionary_items;
1226            match dictionary_encoding.compression.as_ref().unwrap() {
1227                Compression::Variable(_) => Some(MiniBlockSchedulerDictionary {
1228                    dictionary_decompressor: decompressors
1229                        .create_block_decompressor(dictionary_encoding)?
1230                        .into(),
1231                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1232                    dictionary_data_alignment: 4,
1233                    num_dictionary_items,
1234                }),
1235                Compression::Flat(_) => Some(MiniBlockSchedulerDictionary {
1236                    dictionary_decompressor: decompressors
1237                        .create_block_decompressor(dictionary_encoding)?
1238                        .into(),
1239                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1240                    dictionary_data_alignment: 16,
1241                    num_dictionary_items,
1242                }),
1243                Compression::General(_) => Some(MiniBlockSchedulerDictionary {
1244                    dictionary_decompressor: decompressors
1245                        .create_block_decompressor(dictionary_encoding)?
1246                        .into(),
1247                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1248                    dictionary_data_alignment: 1,
1249                    num_dictionary_items,
1250                }),
1251                _ => unreachable!(
1252                    "Mini-block dictionary encoding must use Variable, Flat, or General compression"
1253                ),
1254            }
1255        } else {
1256            None
1257        };
1258
1259        Ok(Self {
1260            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1261            rep_decompressor,
1262            def_decompressor,
1263            value_decompressor: value_decompressor.into(),
1264            repetition_index_depth: layout.repetition_index_depth as u16,
1265            num_buffers: layout.num_buffers,
1266            priority,
1267            items_in_page,
1268            dictionary,
1269            def_meaning: def_meaning.into(),
1270            page_meta: None,
1271        })
1272    }
1273
1274    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1275        let page_meta = self.page_meta.as_ref().unwrap();
1276        chunk_indices
1277            .iter()
1278            .map(|&chunk_idx| {
1279                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1280                let bytes_start = chunk_meta.offset_bytes;
1281                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1282                LoadedChunk {
1283                    byte_range: bytes_start..bytes_end,
1284                    items_in_chunk: chunk_meta.num_values,
1285                    chunk_idx,
1286                    data: LanceBuffer::empty(),
1287                }
1288            })
1289            .collect()
1290    }
1291}
1292
1293#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1294enum PreambleAction {
1295    Take,
1296    Skip,
1297    Absent,
1298}
1299
1300// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1301// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1302// us how exactly to read from the chunk.
1303//
1304// Examples:
1305//
1306// | Chunk 0     | Chunk 1   | Chunk 2   | Chunk 3 |
1307// | xxxxyyyyzzz | zzzzzzzzz | zzzzzzzzz | aaabbcc |
1308//
1309// Full read (0..6)
1310//
1311// Chunk 0: (several rows, ends with trailer)
1312//   preamble: absent
1313//   rows_to_skip: 0
1314//   rows_to_take: 3 (x, y, z)
1315//   take_trailer: true
1316//
1317// Chunk 1: (all preamble, ends with trailer)
1318//   preamble: take
1319//   rows_to_skip: 0
1320//   rows_to_take: 0
1321//   take_trailer: true
1322//
1323// Chunk 2: (all preamble, no trailer)
1324//   preamble: take
1325//   rows_to_skip: 0
1326//   rows_to_take: 0
1327//   take_trailer: false
1328//
1329// Chunk 3: (several rows, no trailer or preamble)
1330//   preamble: absent
1331//   rows_to_skip: 0
1332//   rows_to_take: 3 (a, b, c)
1333//   take_trailer: false
1334#[derive(Clone, Debug, PartialEq, Eq)]
1335struct ChunkInstructions {
1336    // The index of the chunk to read
1337    chunk_idx: usize,
1338    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1339    // is no repetition index there is never a preamble.
1340    //
1341    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1342    // that spans several chunks.
1343    preamble: PreambleAction,
1344    // How many complete rows (not including the preamble or trailer) to skip
1345    //
1346    // If this is non-zero then premable must not be Take
1347    rows_to_skip: u64,
1348    // How many rows to take.  If a row splits across chunks then we will count the row in the first
1349    // chunk that contains the row.
1350    rows_to_take: u64,
1351    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1352    // never a trailer.
1353    //
1354    // A chunk that is all preamble may or may not have a trailer.
1355    //
1356    // If this is true then we want to include the trailer
1357    take_trailer: bool,
1358}
1359
1360// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1361// start decoding them, based on a batch size, which might not align with what we scheduled.
1362//
1363// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1364//
1365// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1366// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1367// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1368// then take 10 rows.
1369//
1370// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1371//  -no preamble, skip 5, take 10, take trailer
1372//  -take preamble, skip 0, take 50, no trailer
1373//
1374// and we are draining 20 rows then the drain instructions for the first batch will be:
1375//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1376//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1377#[derive(Debug, PartialEq, Eq)]
1378struct ChunkDrainInstructions {
1379    chunk_instructions: ChunkInstructions,
1380    rows_to_skip: u64,
1381    rows_to_take: u64,
1382    preamble_action: PreambleAction,
1383}
1384
1385impl ChunkInstructions {
1386    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1387    //
1388    // We assume that `user_ranges` are in sorted order and non-overlapping
1389    //
1390    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1391    fn schedule_instructions(
1392        rep_index: &MiniBlockRepIndex,
1393        user_ranges: &[Range<u64>],
1394    ) -> Vec<Self> {
1395        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1396        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1397        // per row which can happen with lists.
1398        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1399
1400        for user_range in user_ranges {
1401            let mut rows_needed = user_range.end - user_range.start;
1402            let mut need_preamble = false;
1403
1404            // Need to find the first chunk with a first row >= user_range.start.  If there are
1405            // multiple chunks with the same first row we need to take the first one.
1406            let mut block_index = match rep_index
1407                .blocks
1408                .binary_search_by_key(&user_range.start, |block| block.first_row)
1409            {
1410                Ok(idx) => {
1411                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1412                    // are grabbing first eligible chunk
1413                    let mut idx = idx;
1414                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1415                        idx -= 1;
1416                    }
1417                    idx
1418                }
1419                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1420                Err(idx) => idx - 1,
1421            };
1422
1423            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1424
1425            while rows_needed > 0 || need_preamble {
1426                // Check if we've gone past the last block (should not happen)
1427                if block_index >= rep_index.blocks.len() {
1428                    log::warn!("schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early");
1429                    break;
1430                }
1431
1432                let chunk = &rep_index.blocks[block_index];
1433                let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1434
1435                // Handle blocks that are entirely preamble (rows_avail = 0)
1436                // These blocks have no rows to take but may have a preamble we need
1437                // We only look for preamble if to_skip == 0 (we're not skipping rows)
1438                if rows_avail == 0 && to_skip == 0 {
1439                    // Only process if this chunk has a preamble we need
1440                    if chunk.has_preamble && need_preamble {
1441                        chunk_instructions.push(Self {
1442                            chunk_idx: block_index,
1443                            preamble: PreambleAction::Take,
1444                            rows_to_skip: 0,
1445                            rows_to_take: 0,
1446                            // We still need to look at has_trailer to distinguish between "all preamble
1447                            // and row ends at end of chunk" and "all preamble and row bleeds into next
1448                            // chunk".  Both cases will have 0 rows available.
1449                            take_trailer: chunk.has_trailer,
1450                        });
1451                        // Only set need_preamble = false if the chunk has at least one row,
1452                        // Or we are reaching the last block,
1453                        // Otherwise, the chunk is entirely preamble and we need the next chunk's preamble too
1454                        if chunk.starts_including_trailer > 0
1455                            || block_index == rep_index.blocks.len() - 1
1456                        {
1457                            need_preamble = false;
1458                        }
1459                    }
1460                    // Move to next block
1461                    block_index += 1;
1462                    continue;
1463                }
1464
1465                // Edge case: if rows_avail == 0 but to_skip > 0
1466                // This theoretically shouldn't happen (binary search should avoid it)
1467                // but handle it for safety
1468                if rows_avail == 0 && to_skip > 0 {
1469                    // This block doesn't have enough rows to skip, move to next block
1470                    // Adjust to_skip by the number of rows in this block
1471                    to_skip -= chunk.starts_including_trailer;
1472                    block_index += 1;
1473                    continue;
1474                }
1475
1476                let rows_to_take = rows_avail.min(rows_needed);
1477                rows_needed -= rows_to_take;
1478
1479                let mut take_trailer = false;
1480                let preamble = if chunk.has_preamble {
1481                    if need_preamble {
1482                        PreambleAction::Take
1483                    } else {
1484                        PreambleAction::Skip
1485                    }
1486                } else {
1487                    PreambleAction::Absent
1488                };
1489
1490                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1491                if rows_to_take == rows_avail && chunk.has_trailer {
1492                    take_trailer = true;
1493                    need_preamble = true;
1494                } else {
1495                    need_preamble = false;
1496                };
1497
1498                chunk_instructions.push(Self {
1499                    preamble,
1500                    chunk_idx: block_index,
1501                    rows_to_skip: to_skip,
1502                    rows_to_take,
1503                    take_trailer,
1504                });
1505
1506                to_skip = 0;
1507                block_index += 1;
1508            }
1509        }
1510
1511        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1512        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1513        // rows of chunk 0 starting at 0")
1514        if user_ranges.len() > 1 {
1515            // TODO: Could probably optimize this allocation away
1516            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1517            let mut instructions_iter = chunk_instructions.into_iter();
1518            merged_instructions.push(instructions_iter.next().unwrap());
1519            for instruction in instructions_iter {
1520                let last = merged_instructions.last_mut().unwrap();
1521                if last.chunk_idx == instruction.chunk_idx
1522                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1523                {
1524                    last.rows_to_take += instruction.rows_to_take;
1525                    last.take_trailer |= instruction.take_trailer;
1526                } else {
1527                    merged_instructions.push(instruction);
1528                }
1529            }
1530            merged_instructions
1531        } else {
1532            chunk_instructions
1533        }
1534    }
1535
1536    fn drain_from_instruction(
1537        &self,
1538        rows_desired: &mut u64,
1539        need_preamble: &mut bool,
1540        skip_in_chunk: &mut u64,
1541    ) -> (ChunkDrainInstructions, bool) {
1542        // If we need the premable then we shouldn't be skipping anything
1543        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1544        let rows_avail = self.rows_to_take - *skip_in_chunk;
1545        let has_preamble = self.preamble != PreambleAction::Absent;
1546        let preamble_action = match (*need_preamble, has_preamble) {
1547            (true, true) => PreambleAction::Take,
1548            (true, false) => panic!("Need preamble but there isn't one"),
1549            (false, true) => PreambleAction::Skip,
1550            (false, false) => PreambleAction::Absent,
1551        };
1552
1553        // How many rows are we actually taking in this take step (including the preamble
1554        // and trailer both as individual rows)
1555        let rows_taking = if *rows_desired >= rows_avail {
1556            // We want all the rows.  If there is a trailer we are grabbing it and will need
1557            // the preamble of the next chunk
1558            // If there is a trailer and we are taking all the rows then we need the preamble
1559            // of the next chunk.
1560            //
1561            // Also, if this chunk is entirely preamble (rows_avail == 0 && !take_trailer) then we
1562            // need the preamble of the next chunk.
1563            *need_preamble = self.take_trailer;
1564            rows_avail
1565        } else {
1566            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1567            // it so we will not need the preamble
1568            *need_preamble = false;
1569            *rows_desired
1570        };
1571        let rows_skipped = *skip_in_chunk;
1572
1573        // Update the state for the next iteration
1574        let consumed_chunk = if *rows_desired >= rows_avail {
1575            *rows_desired -= rows_avail;
1576            *skip_in_chunk = 0;
1577            true
1578        } else {
1579            *skip_in_chunk += *rows_desired;
1580            *rows_desired = 0;
1581            false
1582        };
1583
1584        (
1585            ChunkDrainInstructions {
1586                chunk_instructions: self.clone(),
1587                rows_to_skip: rows_skipped,
1588                rows_to_take: rows_taking,
1589                preamble_action,
1590            },
1591            consumed_chunk,
1592        )
1593    }
1594}
1595
1596impl StructuralPageScheduler for MiniBlockScheduler {
1597    fn initialize<'a>(
1598        &'a mut self,
1599        io: &Arc<dyn EncodingsIo>,
1600    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1601        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1602        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1603        // need.
1604        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1605        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1606        let mut bufs_needed = 1;
1607        if self.dictionary.is_some() {
1608            bufs_needed += 1;
1609        }
1610        if self.repetition_index_depth > 0 {
1611            bufs_needed += 1;
1612        }
1613        let mut required_ranges = Vec::with_capacity(bufs_needed);
1614        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1615        if let Some(ref dictionary) = self.dictionary {
1616            required_ranges.push(
1617                dictionary.dictionary_buf_position_and_size.0
1618                    ..dictionary.dictionary_buf_position_and_size.0
1619                        + dictionary.dictionary_buf_position_and_size.1,
1620            );
1621        }
1622        if self.repetition_index_depth > 0 {
1623            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1624            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1625        }
1626        let io_req = io.submit_request(required_ranges, 0);
1627
1628        async move {
1629            let mut buffers = io_req.await?.into_iter().fuse();
1630            let meta_bytes = buffers.next().unwrap();
1631            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1632            let rep_index_bytes = buffers.next();
1633
1634            // Parse the metadata and build the chunk meta
1635            assert!(meta_bytes.len() % 2 == 0);
1636            let bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1637            let words = bytes.borrow_to_typed_slice::<u16>();
1638            let words = words.as_ref();
1639
1640            let mut chunk_meta = Vec::with_capacity(words.len());
1641
1642            let mut rows_counter = 0;
1643            let mut offset_bytes = value_buf_position;
1644            for (word_idx, word) in words.iter().enumerate() {
1645                let log_num_values = word & 0x0F;
1646                let divided_bytes = word >> 4;
1647                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1648                debug_assert!(num_bytes > 0);
1649                let num_values = if word_idx < words.len() - 1 {
1650                    debug_assert!(log_num_values > 0);
1651                    1 << log_num_values
1652                } else {
1653                    debug_assert!(
1654                        log_num_values == 0
1655                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1656                    );
1657                    self.items_in_page - rows_counter
1658                };
1659                rows_counter += num_values;
1660
1661                chunk_meta.push(ChunkMeta {
1662                    num_values,
1663                    chunk_size_bytes: num_bytes as u64,
1664                    offset_bytes,
1665                });
1666                offset_bytes += num_bytes as u64;
1667            }
1668
1669            // Build the repetition index
1670            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1671                assert!(rep_index_data.len() % 8 == 0);
1672                let stride = self.repetition_index_depth as usize + 1;
1673                MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1674            } else {
1675                MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1676            };
1677
1678            let mut page_meta = MiniBlockCacheableState {
1679                chunk_meta,
1680                rep_index,
1681                dictionary: None,
1682            };
1683
1684            // decode dictionary
1685            if let Some(ref mut dictionary) = self.dictionary {
1686                let dictionary_data = dictionary_bytes.unwrap();
1687                page_meta.dictionary =
1688                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1689                        LanceBuffer::from_bytes(
1690                            dictionary_data,
1691                            dictionary.dictionary_data_alignment,
1692                        ),
1693                        dictionary.num_dictionary_items,
1694                    )?));
1695            };
1696            let page_meta = Arc::new(page_meta);
1697            self.page_meta = Some(page_meta.clone());
1698            Ok(page_meta as Arc<dyn CachedPageData>)
1699        }
1700        .boxed()
1701    }
1702
1703    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1704        self.page_meta = Some(
1705            data.clone()
1706                .as_arc_any()
1707                .downcast::<MiniBlockCacheableState>()
1708                .unwrap(),
1709        );
1710    }
1711
1712    fn schedule_ranges(
1713        &self,
1714        ranges: &[Range<u64>],
1715        io: &Arc<dyn EncodingsIo>,
1716    ) -> Result<Vec<PageLoadTask>> {
1717        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1718
1719        let page_meta = self.page_meta.as_ref().unwrap();
1720
1721        let chunk_instructions =
1722            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1723
1724        debug_assert_eq!(
1725            num_rows,
1726            chunk_instructions
1727                .iter()
1728                .map(|ci| ci.rows_to_take)
1729                .sum::<u64>()
1730        );
1731
1732        let chunks_needed = chunk_instructions
1733            .iter()
1734            .map(|ci| ci.chunk_idx)
1735            .unique()
1736            .collect::<Vec<_>>();
1737
1738        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1739        let chunk_ranges = loaded_chunks
1740            .iter()
1741            .map(|c| c.byte_range.clone())
1742            .collect::<Vec<_>>();
1743        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1744
1745        let rep_decompressor = self.rep_decompressor.clone();
1746        let def_decompressor = self.def_decompressor.clone();
1747        let value_decompressor = self.value_decompressor.clone();
1748        let num_buffers = self.num_buffers;
1749        let dictionary = page_meta
1750            .dictionary
1751            .as_ref()
1752            .map(|dictionary| dictionary.clone());
1753        let def_meaning = self.def_meaning.clone();
1754
1755        let res = async move {
1756            let loaded_chunk_data = loaded_chunk_data.await?;
1757            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1758                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1759            }
1760
1761            Ok(Box::new(MiniBlockDecoder {
1762                rep_decompressor,
1763                def_decompressor,
1764                value_decompressor,
1765                def_meaning,
1766                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1767                instructions: VecDeque::from(chunk_instructions),
1768                offset_in_current_chunk: 0,
1769                dictionary,
1770                num_rows,
1771                num_buffers,
1772            }) as Box<dyn StructuralPageDecoder>)
1773        }
1774        .boxed();
1775        let page_load_task = PageLoadTask {
1776            decoder_fut: res,
1777            num_rows,
1778        };
1779        Ok(vec![page_load_task])
1780    }
1781}
1782
1783#[derive(Debug, Clone, Copy)]
1784struct FullZipRepIndexDetails {
1785    buf_position: u64,
1786    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1787}
1788
1789#[derive(Debug)]
1790enum PerValueDecompressor {
1791    Fixed(Arc<dyn FixedPerValueDecompressor>),
1792    Variable(Arc<dyn VariablePerValueDecompressor>),
1793}
1794
1795#[derive(Debug)]
1796struct FullZipDecodeDetails {
1797    value_decompressor: PerValueDecompressor,
1798    def_meaning: Arc<[DefinitionInterpretation]>,
1799    ctrl_word_parser: ControlWordParser,
1800    max_rep: u16,
1801    max_visible_def: u16,
1802}
1803
1804/// A scheduler for full-zip encoded data
1805///
1806/// When the data type has a fixed-width then we simply need to map from
1807/// row ranges to byte ranges using the fixed-width of the data type.
1808///
1809/// When the data type is variable-width or has any repetition then a
1810/// repetition index is required.
1811#[derive(Debug)]
1812pub struct FullZipScheduler {
1813    data_buf_position: u64,
1814    rep_index: Option<FullZipRepIndexDetails>,
1815    priority: u64,
1816    rows_in_page: u64,
1817    bits_per_offset: u8,
1818    details: Arc<FullZipDecodeDetails>,
1819    /// Cached state containing the decoded repetition index
1820    cached_state: Option<Arc<FullZipCacheableState>>,
1821    /// Whether to enable caching of repetition indices
1822    enable_cache: bool,
1823}
1824
1825impl FullZipScheduler {
1826    fn try_new(
1827        buffer_offsets_and_sizes: &[(u64, u64)],
1828        priority: u64,
1829        rows_in_page: u64,
1830        layout: &pb21::FullZipLayout,
1831        decompressors: &dyn DecompressionStrategy,
1832    ) -> Result<Self> {
1833        // We don't need the data_buf_size because either the data type is
1834        // fixed-width (and we can tell size from rows_in_page) or it is not
1835        // and we have a repetition index.
1836        let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1837        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1838            let num_reps = rows_in_page + 1;
1839            let bytes_per_rep = len / num_reps;
1840            debug_assert_eq!(len % num_reps, 0);
1841            debug_assert!(
1842                bytes_per_rep == 1
1843                    || bytes_per_rep == 2
1844                    || bytes_per_rep == 4
1845                    || bytes_per_rep == 8
1846            );
1847            FullZipRepIndexDetails {
1848                buf_position: *pos,
1849                bytes_per_value: bytes_per_rep,
1850            }
1851        });
1852
1853        let value_decompressor = match layout.details {
1854            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
1855                let decompressor = decompressors.create_fixed_per_value_decompressor(
1856                    layout.value_compression.as_ref().unwrap(),
1857                )?;
1858                PerValueDecompressor::Fixed(decompressor.into())
1859            }
1860            Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
1861                let decompressor = decompressors.create_variable_per_value_decompressor(
1862                    layout.value_compression.as_ref().unwrap(),
1863                )?;
1864                PerValueDecompressor::Variable(decompressor.into())
1865            }
1866            None => {
1867                panic!("Full-zip layout must have a `details` field");
1868            }
1869        };
1870        let ctrl_word_parser = ControlWordParser::new(
1871            layout.bits_rep.try_into().unwrap(),
1872            layout.bits_def.try_into().unwrap(),
1873        );
1874        let def_meaning = layout
1875            .layers
1876            .iter()
1877            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1878            .collect::<Vec<_>>();
1879
1880        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1881        let max_visible_def = def_meaning
1882            .iter()
1883            .filter(|d| !d.is_list())
1884            .map(|d| d.num_def_levels())
1885            .sum();
1886
1887        let bits_per_offset = match layout.details {
1888            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
1889            Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1890                bits_per_offset as u8
1891            }
1892            None => panic!("Full-zip layout must have a `details` field"),
1893        };
1894
1895        let details = Arc::new(FullZipDecodeDetails {
1896            value_decompressor,
1897            def_meaning: def_meaning.into(),
1898            ctrl_word_parser,
1899            max_rep,
1900            max_visible_def,
1901        });
1902        Ok(Self {
1903            data_buf_position,
1904            rep_index,
1905            details,
1906            priority,
1907            rows_in_page,
1908            bits_per_offset,
1909            cached_state: None,
1910            enable_cache: false, // Default to false, will be set later
1911        })
1912    }
1913
1914    /// Creates a decoder from the loaded data
1915    fn create_decoder(
1916        details: Arc<FullZipDecodeDetails>,
1917        data: VecDeque<LanceBuffer>,
1918        num_rows: u64,
1919        bits_per_offset: u8,
1920    ) -> Result<Box<dyn StructuralPageDecoder>> {
1921        match &details.value_decompressor {
1922            PerValueDecompressor::Fixed(decompressor) => {
1923                let bits_per_value = decompressor.bits_per_value();
1924                if bits_per_value == 0 {
1925                    return Err(lance_core::Error::Internal {
1926                        message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1927                        location: location!(),
1928                    });
1929                }
1930                if bits_per_value % 8 != 0 {
1931                    return Err(lance_core::Error::NotSupported {
1932                        source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1933                        location: location!(),
1934                    });
1935                }
1936                let bytes_per_value = bits_per_value / 8;
1937                let total_bytes_per_value =
1938                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1939                Ok(Box::new(FixedFullZipDecoder {
1940                    details,
1941                    data,
1942                    num_rows,
1943                    offset_in_current: 0,
1944                    bytes_per_value: bytes_per_value as usize,
1945                    total_bytes_per_value,
1946                }) as Box<dyn StructuralPageDecoder>)
1947            }
1948            PerValueDecompressor::Variable(_decompressor) => {
1949                Ok(Box::new(VariableFullZipDecoder::new(
1950                    details,
1951                    data,
1952                    num_rows,
1953                    bits_per_offset,
1954                    bits_per_offset,
1955                )))
1956            }
1957        }
1958    }
1959
1960    /// Extracts byte ranges from a repetition index buffer
1961    /// The buffer contains pairs of (start, end) values for each range
1962    fn extract_byte_ranges_from_pairs(
1963        buffer: LanceBuffer,
1964        bytes_per_value: u64,
1965        data_buf_position: u64,
1966    ) -> Vec<Range<u64>> {
1967        ByteUnpacker::new(buffer, bytes_per_value as usize)
1968            .chunks(2)
1969            .into_iter()
1970            .map(|mut c| {
1971                let start = c.next().unwrap() + data_buf_position;
1972                let end = c.next().unwrap() + data_buf_position;
1973                start..end
1974            })
1975            .collect::<Vec<_>>()
1976    }
1977
1978    /// Extracts byte ranges from a cached repetition index buffer
1979    /// The buffer contains all values and we need to extract specific ranges
1980    fn extract_byte_ranges_from_cached(
1981        buffer: &LanceBuffer,
1982        ranges: &[Range<u64>],
1983        bytes_per_value: u64,
1984        data_buf_position: u64,
1985    ) -> Vec<Range<u64>> {
1986        ranges
1987            .iter()
1988            .map(|r| {
1989                let start_offset = (r.start * bytes_per_value) as usize;
1990                let end_offset = (r.end * bytes_per_value) as usize;
1991
1992                let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
1993                let start_val =
1994                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
1995                        .next()
1996                        .unwrap();
1997
1998                let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
1999                let end_val =
2000                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2001                        .next()
2002                        .unwrap();
2003
2004                (data_buf_position + start_val)..(data_buf_position + end_val)
2005            })
2006            .collect()
2007    }
2008
2009    /// Computes the ranges in the repetition index that need to be loaded
2010    fn compute_rep_index_ranges(
2011        ranges: &[Range<u64>],
2012        rep_index: &FullZipRepIndexDetails,
2013    ) -> Vec<Range<u64>> {
2014        ranges
2015            .iter()
2016            .flat_map(|r| {
2017                let first_val_start =
2018                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2019                let first_val_end = first_val_start + rep_index.bytes_per_value;
2020                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2021                let last_val_end = last_val_start + rep_index.bytes_per_value;
2022                [first_val_start..first_val_end, last_val_start..last_val_end]
2023            })
2024            .collect()
2025    }
2026
2027    /// Resolves byte ranges from repetition index (either from cache or disk)
2028    async fn resolve_byte_ranges(
2029        data_buf_position: u64,
2030        ranges: &[Range<u64>],
2031        io: &Arc<dyn EncodingsIo>,
2032        rep_index: &FullZipRepIndexDetails,
2033        cached_state: Option<&Arc<FullZipCacheableState>>,
2034        priority: u64,
2035    ) -> Result<Vec<Range<u64>>> {
2036        if let Some(cached_state) = cached_state {
2037            // Use cached repetition index
2038            Ok(Self::extract_byte_ranges_from_cached(
2039                &cached_state.rep_index_buffer,
2040                ranges,
2041                rep_index.bytes_per_value,
2042                data_buf_position,
2043            ))
2044        } else {
2045            // Load from disk
2046            let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
2047            let rep_data = io.submit_request(rep_ranges, priority).await?;
2048            let rep_buffer = LanceBuffer::concat(
2049                &rep_data
2050                    .into_iter()
2051                    .map(|d| LanceBuffer::from_bytes(d, 1))
2052                    .collect::<Vec<_>>(),
2053            );
2054            Ok(Self::extract_byte_ranges_from_pairs(
2055                rep_buffer,
2056                rep_index.bytes_per_value,
2057                data_buf_position,
2058            ))
2059        }
2060    }
2061
2062    /// Schedules ranges in the presence of a repetition index
2063    fn schedule_ranges_rep(
2064        &self,
2065        ranges: &[Range<u64>],
2066        io: &Arc<dyn EncodingsIo>,
2067        rep_index: FullZipRepIndexDetails,
2068    ) -> Result<Vec<PageLoadTask>> {
2069        // Copy necessary fields to avoid lifetime issues
2070        let data_buf_position = self.data_buf_position;
2071        let cached_state = self.cached_state.clone();
2072        let priority = self.priority;
2073        let details = self.details.clone();
2074        let bits_per_offset = self.bits_per_offset;
2075        let ranges = ranges.to_vec();
2076        let io_clone = io.clone();
2077        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2078
2079        let load_task = async move {
2080            // Step 1: Resolve byte ranges from repetition index
2081            let byte_ranges = Self::resolve_byte_ranges(
2082                data_buf_position,
2083                &ranges,
2084                &io_clone,
2085                &rep_index,
2086                cached_state.as_ref(),
2087                priority,
2088            )
2089            .await?;
2090
2091            // Step 2: Load data
2092            let data = io_clone.submit_request(byte_ranges, priority).await?;
2093            let data = data
2094                .into_iter()
2095                .map(|d| LanceBuffer::from_bytes(d, 1))
2096                .collect::<VecDeque<_>>();
2097
2098            // Step 3: Calculate total rows
2099            let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2100
2101            // Step 4: Create decoder
2102            Self::create_decoder(details, data, num_rows, bits_per_offset)
2103        }
2104        .boxed();
2105        let page_load_task = PageLoadTask {
2106            decoder_fut: load_task,
2107            num_rows,
2108        };
2109        Ok(vec![page_load_task])
2110    }
2111
2112    // In the simple case there is no repetition and we just have large fixed-width
2113    // rows of data.  We can just map row ranges to byte ranges directly using the
2114    // fixed-width of the data type.
2115    fn schedule_ranges_simple(
2116        &self,
2117        ranges: &[Range<u64>],
2118        io: &dyn EncodingsIo,
2119    ) -> Result<Vec<PageLoadTask>> {
2120        // Convert row ranges to item ranges (i.e. multiply by items per row)
2121        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2122
2123        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2124            unreachable!()
2125        };
2126
2127        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2128        let bits_per_value = decompressor.bits_per_value();
2129        assert_eq!(bits_per_value % 8, 0);
2130        let bytes_per_value = bits_per_value / 8;
2131        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2132        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2133        let byte_ranges = ranges.iter().map(|r| {
2134            debug_assert!(r.end <= self.rows_in_page);
2135            let start = self.data_buf_position + r.start * total_bytes_per_value;
2136            let end = self.data_buf_position + r.end * total_bytes_per_value;
2137            start..end
2138        });
2139
2140        // Request byte ranges
2141        let data = io.submit_request(byte_ranges.collect(), self.priority);
2142
2143        let details = self.details.clone();
2144
2145        let load_task = async move {
2146            let data = data.await?;
2147            let data = data
2148                .into_iter()
2149                .map(|d| LanceBuffer::from_bytes(d, 1))
2150                .collect();
2151            Ok(Box::new(FixedFullZipDecoder {
2152                details,
2153                data,
2154                num_rows,
2155                offset_in_current: 0,
2156                bytes_per_value: bytes_per_value as usize,
2157                total_bytes_per_value: total_bytes_per_value as usize,
2158            }) as Box<dyn StructuralPageDecoder>)
2159        }
2160        .boxed();
2161        let page_load_task = PageLoadTask {
2162            decoder_fut: load_task,
2163            num_rows,
2164        };
2165        Ok(vec![page_load_task])
2166    }
2167}
2168
2169/// Cacheable state for FullZip encoding, storing the decoded repetition index
2170#[derive(Debug)]
2171struct FullZipCacheableState {
2172    /// The raw repetition index buffer for future decoding
2173    rep_index_buffer: LanceBuffer,
2174}
2175
2176impl DeepSizeOf for FullZipCacheableState {
2177    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2178        self.rep_index_buffer.len()
2179    }
2180}
2181
2182impl CachedPageData for FullZipCacheableState {
2183    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2184        self
2185    }
2186}
2187
2188impl StructuralPageScheduler for FullZipScheduler {
2189    /// Initializes the scheduler. If there's a repetition index, loads and caches it.
2190    /// Otherwise returns NoCachedPageData.
2191    fn initialize<'a>(
2192        &'a mut self,
2193        io: &Arc<dyn EncodingsIo>,
2194    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2195        // Check if caching is enabled and we have a repetition index
2196        if self.enable_cache && self.rep_index.is_some() {
2197            let rep_index = self.rep_index.as_ref().unwrap();
2198            // Calculate the total size of the repetition index
2199            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2200            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2201
2202            // Load the repetition index buffer
2203            let io_clone = io.clone();
2204            let future = async move {
2205                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2206                let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2207
2208                // Create and return the cacheable state
2209                Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2210            };
2211
2212            future.boxed()
2213        } else {
2214            // Caching disabled or no repetition index, skip caching
2215            std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2216        }
2217    }
2218
2219    /// Loads previously cached repetition index data from the cache system.
2220    /// This method is called when a scheduler instance needs to use cached data
2221    /// that was initialized by another instance or in a previous operation.
2222    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2223        // Try to downcast to our specific cache type
2224        if let Ok(cached_state) = cache
2225            .clone()
2226            .as_arc_any()
2227            .downcast::<FullZipCacheableState>()
2228        {
2229            // Store the cached state for use in schedule_ranges
2230            self.cached_state = Some(cached_state);
2231        }
2232    }
2233
2234    fn schedule_ranges(
2235        &self,
2236        ranges: &[Range<u64>],
2237        io: &Arc<dyn EncodingsIo>,
2238    ) -> Result<Vec<PageLoadTask>> {
2239        if let Some(rep_index) = self.rep_index {
2240            self.schedule_ranges_rep(ranges, io, rep_index)
2241        } else {
2242            self.schedule_ranges_simple(ranges, io.as_ref())
2243        }
2244    }
2245}
2246
2247/// A decoder for full-zip encoded data when the data has a fixed-width
2248///
2249/// Here we need to unzip the control words from the values themselves and
2250/// then decompress the requested values.
2251///
2252/// We use a PerValueDecompressor because we will only be decompressing the
2253/// requested data.  This decoder / scheduler does not do any read amplification.
2254#[derive(Debug)]
2255struct FixedFullZipDecoder {
2256    details: Arc<FullZipDecodeDetails>,
2257    data: VecDeque<LanceBuffer>,
2258    offset_in_current: usize,
2259    bytes_per_value: usize,
2260    total_bytes_per_value: usize,
2261    num_rows: u64,
2262}
2263
2264impl FixedFullZipDecoder {
2265    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2266        debug_assert!(num_rows > 0);
2267        let cur_buf = self.data.front_mut().unwrap();
2268        let start = self.offset_in_current;
2269        if self.details.ctrl_word_parser.has_rep() {
2270            // This is a slightly slower path.  In order to figure out where to split we need to
2271            // examine the rep index so we can convert num_lists to num_rows
2272            let mut rows_started = 0;
2273            // We always need at least one value.  Now loop through until we have passed num_rows
2274            // values
2275            let mut num_items = 0;
2276            while self.offset_in_current < cur_buf.len() {
2277                let control = self.details.ctrl_word_parser.parse_desc(
2278                    &cur_buf[self.offset_in_current..],
2279                    self.details.max_rep,
2280                    self.details.max_visible_def,
2281                );
2282                if control.is_new_row {
2283                    if rows_started == num_rows {
2284                        break;
2285                    }
2286                    rows_started += 1;
2287                }
2288                num_items += 1;
2289                if control.is_visible {
2290                    self.offset_in_current += self.total_bytes_per_value;
2291                } else {
2292                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2293                }
2294            }
2295
2296            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2297            if self.offset_in_current == cur_buf.len() {
2298                self.data.pop_front();
2299                self.offset_in_current = 0;
2300            }
2301
2302            FullZipDecodeTaskItem {
2303                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2304                    data: task_slice,
2305                    bits_per_value: self.bytes_per_value as u64 * 8,
2306                    num_values: num_items,
2307                    block_info: BlockInfo::new(),
2308                }),
2309                rows_in_buf: rows_started,
2310            }
2311        } else {
2312            // If there's no repetition we can calculate the slicing point by just multiplying
2313            // the number of rows by the total bytes per value
2314            let cur_buf = self.data.front_mut().unwrap();
2315            let bytes_avail = cur_buf.len() - self.offset_in_current;
2316            let offset_in_cur = self.offset_in_current;
2317
2318            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2319            let mut rows_taken = num_rows;
2320            let task_slice = if bytes_needed >= bytes_avail {
2321                self.offset_in_current = 0;
2322                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2323                self.data
2324                    .pop_front()
2325                    .unwrap()
2326                    .slice_with_length(offset_in_cur, bytes_avail)
2327            } else {
2328                self.offset_in_current += bytes_needed;
2329                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2330            };
2331            FullZipDecodeTaskItem {
2332                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2333                    data: task_slice,
2334                    bits_per_value: self.bytes_per_value as u64 * 8,
2335                    num_values: rows_taken,
2336                    block_info: BlockInfo::new(),
2337                }),
2338                rows_in_buf: rows_taken,
2339            }
2340        }
2341    }
2342}
2343
2344impl StructuralPageDecoder for FixedFullZipDecoder {
2345    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2346        let mut task_data = Vec::with_capacity(self.data.len());
2347        let mut remaining = num_rows;
2348        while remaining > 0 {
2349            let task_item = self.slice_next_task(remaining);
2350            remaining -= task_item.rows_in_buf;
2351            task_data.push(task_item);
2352        }
2353        Ok(Box::new(FixedFullZipDecodeTask {
2354            details: self.details.clone(),
2355            data: task_data,
2356            bytes_per_value: self.bytes_per_value,
2357            num_rows: num_rows as usize,
2358        }))
2359    }
2360
2361    fn num_rows(&self) -> u64 {
2362        self.num_rows
2363    }
2364}
2365
2366/// A decoder for full-zip encoded data when the data has a variable-width
2367///
2368/// Here we need to unzip the control words AND lengths from the values and
2369/// then decompress the requested values.
2370#[derive(Debug)]
2371struct VariableFullZipDecoder {
2372    details: Arc<FullZipDecodeDetails>,
2373    decompressor: Arc<dyn VariablePerValueDecompressor>,
2374    data: LanceBuffer,
2375    offsets: LanceBuffer,
2376    rep: ScalarBuffer<u16>,
2377    def: ScalarBuffer<u16>,
2378    repdef_starts: Vec<usize>,
2379    data_starts: Vec<usize>,
2380    offset_starts: Vec<usize>,
2381    visible_item_counts: Vec<u64>,
2382    bits_per_offset: u8,
2383    current_idx: usize,
2384    num_rows: u64,
2385}
2386
2387impl VariableFullZipDecoder {
2388    fn new(
2389        details: Arc<FullZipDecodeDetails>,
2390        data: VecDeque<LanceBuffer>,
2391        num_rows: u64,
2392        in_bits_per_length: u8,
2393        out_bits_per_offset: u8,
2394    ) -> Self {
2395        let decompressor = match details.value_decompressor {
2396            PerValueDecompressor::Variable(ref d) => d.clone(),
2397            _ => unreachable!(),
2398        };
2399
2400        assert_eq!(in_bits_per_length % 8, 0);
2401        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2402
2403        let mut decoder = Self {
2404            details,
2405            decompressor,
2406            data: LanceBuffer::empty(),
2407            offsets: LanceBuffer::empty(),
2408            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2409            def: LanceBuffer::empty().borrow_to_typed_slice(),
2410            bits_per_offset: out_bits_per_offset,
2411            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2412            data_starts: Vec::with_capacity(num_rows as usize + 1),
2413            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2414            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2415            current_idx: 0,
2416            num_rows,
2417        };
2418
2419        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2420        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2421        // unpacked to know where the values start and end.
2422        //
2423        // We don't want to unzip on the decode thread because that is a single-threaded path
2424        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2425        //
2426        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2427        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2428        // we load all the data into memory and then throw it away only to load it all into memory again during
2429        // the decode.
2430        //
2431        // There are some alternatives to investigate:
2432        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2433        //     range in between.  This will give us the break points that we need for slicing and won't increase
2434        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2435        //     even when doing a full scan.
2436        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2437        //     has to do more work but the work is all fused.
2438        //   - We could just try doing this work on the decode thread and see if it is a problem.
2439        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2440
2441        decoder
2442    }
2443
2444    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2445        match bits_per_offset {
2446            8 => *data.get_unchecked(0) as u64,
2447            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2448            32 => u32::from_le_bytes([
2449                *data.get_unchecked(0),
2450                *data.get_unchecked(1),
2451                *data.get_unchecked(2),
2452                *data.get_unchecked(3),
2453            ]) as u64,
2454            64 => u64::from_le_bytes([
2455                *data.get_unchecked(0),
2456                *data.get_unchecked(1),
2457                *data.get_unchecked(2),
2458                *data.get_unchecked(3),
2459                *data.get_unchecked(4),
2460                *data.get_unchecked(5),
2461                *data.get_unchecked(6),
2462                *data.get_unchecked(7),
2463            ]),
2464            _ => unreachable!(),
2465        }
2466    }
2467
2468    fn unzip(
2469        &mut self,
2470        data: VecDeque<LanceBuffer>,
2471        in_bits_per_length: u8,
2472        out_bits_per_offset: u8,
2473        num_rows: u64,
2474    ) {
2475        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2476        let mut rep = Vec::with_capacity(num_rows as usize);
2477        let mut def = Vec::with_capacity(num_rows as usize);
2478        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2479
2480        // This undercounts if there are lists
2481        // It can also overcount if there are invisible items
2482        let bytes_per_offset = out_bits_per_offset as usize / 8;
2483        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2484        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2485
2486        let bytes_per_length = in_bits_per_length as usize / 8;
2487        let bytes_lengths = bytes_per_length * num_rows as usize;
2488
2489        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2490        // This overcounts since bytes_lengths and bytes_cw are undercounts
2491        // It can also undercount if there are invisible items (hence the saturating_sub)
2492        let mut unzipped_data =
2493            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2494
2495        let mut current_offset = 0_u64;
2496        let mut visible_item_count = 0_u64;
2497        for databuf in data.into_iter() {
2498            let mut databuf = databuf.as_ref();
2499            while !databuf.is_empty() {
2500                let data_start = unzipped_data.len();
2501                let offset_start = offsets_data.len();
2502                // We might have only-rep or only-def, neither, or both.  They move at the same
2503                // speed though so we only need one index into it
2504                let repdef_start = rep.len().max(def.len());
2505                // TODO: Kind of inefficient we parse the control word twice here
2506                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2507                    databuf,
2508                    self.details.max_rep,
2509                    self.details.max_visible_def,
2510                );
2511                self.details
2512                    .ctrl_word_parser
2513                    .parse(databuf, &mut rep, &mut def);
2514                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2515
2516                if ctrl_desc.is_new_row {
2517                    self.repdef_starts.push(repdef_start);
2518                    self.data_starts.push(data_start);
2519                    self.offset_starts.push(offset_start);
2520                    self.visible_item_counts.push(visible_item_count);
2521                }
2522                if ctrl_desc.is_visible {
2523                    visible_item_count += 1;
2524                    if ctrl_desc.is_valid_item {
2525                        // Safety: Data should have at least bytes_per_length bytes remaining
2526                        debug_assert!(databuf.len() >= bytes_per_length);
2527                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2528                        match out_bits_per_offset {
2529                            32 => offsets_data
2530                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2531                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2532                            _ => unreachable!(),
2533                        };
2534                        databuf = &databuf[bytes_per_offset..];
2535                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2536                        databuf = &databuf[length as usize..];
2537                        current_offset += length;
2538                    } else {
2539                        // Null items still get an offset
2540                        match out_bits_per_offset {
2541                            32 => offsets_data
2542                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2543                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2544                            _ => unreachable!(),
2545                        }
2546                    }
2547                }
2548            }
2549        }
2550        self.repdef_starts.push(rep.len().max(def.len()));
2551        self.data_starts.push(unzipped_data.len());
2552        self.offset_starts.push(offsets_data.len());
2553        self.visible_item_counts.push(visible_item_count);
2554        match out_bits_per_offset {
2555            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2556            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2557            _ => unreachable!(),
2558        };
2559        self.rep = ScalarBuffer::from(rep);
2560        self.def = ScalarBuffer::from(def);
2561        self.data = LanceBuffer::from(unzipped_data);
2562        self.offsets = LanceBuffer::from(offsets_data);
2563    }
2564}
2565
2566impl StructuralPageDecoder for VariableFullZipDecoder {
2567    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2568        let start = self.current_idx;
2569        let end = start + num_rows as usize;
2570
2571        // This might seem a little peculiar.  We are returning the entire data for every single
2572        // batch.  This is because the offsets are relative to the start of the data.  In other words
2573        // imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
2574        // and we return in batches of two.  The second set of offsets will be [20, 30, 40].
2575        //
2576        // So either we pay for a copy to normalize the offsets or we just return the entire data buffer
2577        // which is slightly cheaper.
2578        let data = self.data.clone();
2579
2580        let offset_start = self.offset_starts[start];
2581        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2582        let offsets = self
2583            .offsets
2584            .slice_with_length(offset_start, offset_end - offset_start);
2585
2586        let repdef_start = self.repdef_starts[start];
2587        let repdef_end = self.repdef_starts[end];
2588        let rep = if self.rep.is_empty() {
2589            self.rep.clone()
2590        } else {
2591            self.rep.slice(repdef_start, repdef_end - repdef_start)
2592        };
2593        let def = if self.def.is_empty() {
2594            self.def.clone()
2595        } else {
2596            self.def.slice(repdef_start, repdef_end - repdef_start)
2597        };
2598
2599        let visible_item_counts_start = self.visible_item_counts[start];
2600        let visible_item_counts_end = self.visible_item_counts[end];
2601        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2602
2603        self.current_idx += num_rows as usize;
2604
2605        Ok(Box::new(VariableFullZipDecodeTask {
2606            details: self.details.clone(),
2607            decompressor: self.decompressor.clone(),
2608            data,
2609            offsets,
2610            bits_per_offset: self.bits_per_offset,
2611            num_visible_items,
2612            rep,
2613            def,
2614        }))
2615    }
2616
2617    fn num_rows(&self) -> u64 {
2618        self.num_rows
2619    }
2620}
2621
2622#[derive(Debug)]
2623struct VariableFullZipDecodeTask {
2624    details: Arc<FullZipDecodeDetails>,
2625    decompressor: Arc<dyn VariablePerValueDecompressor>,
2626    data: LanceBuffer,
2627    offsets: LanceBuffer,
2628    bits_per_offset: u8,
2629    num_visible_items: u64,
2630    rep: ScalarBuffer<u16>,
2631    def: ScalarBuffer<u16>,
2632}
2633
2634impl DecodePageTask for VariableFullZipDecodeTask {
2635    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2636        let block = VariableWidthBlock {
2637            data: self.data,
2638            offsets: self.offsets,
2639            bits_per_offset: self.bits_per_offset,
2640            num_values: self.num_visible_items,
2641            block_info: BlockInfo::new(),
2642        };
2643        let decomopressed = self.decompressor.decompress(block)?;
2644        let rep = if self.rep.is_empty() {
2645            None
2646        } else {
2647            Some(self.rep.to_vec())
2648        };
2649        let def = if self.def.is_empty() {
2650            None
2651        } else {
2652            Some(self.def.to_vec())
2653        };
2654        let unraveler = RepDefUnraveler::new(
2655            rep,
2656            def,
2657            self.details.def_meaning.clone(),
2658            self.num_visible_items,
2659        );
2660        Ok(DecodedPage {
2661            data: decomopressed,
2662            repdef: unraveler,
2663        })
2664    }
2665}
2666
2667#[derive(Debug)]
2668struct FullZipDecodeTaskItem {
2669    data: PerValueDataBlock,
2670    rows_in_buf: u64,
2671}
2672
2673/// A task to unzip and decompress full-zip encoded data when that data
2674/// has a fixed-width.
2675#[derive(Debug)]
2676struct FixedFullZipDecodeTask {
2677    details: Arc<FullZipDecodeDetails>,
2678    data: Vec<FullZipDecodeTaskItem>,
2679    num_rows: usize,
2680    bytes_per_value: usize,
2681}
2682
2683impl DecodePageTask for FixedFullZipDecodeTask {
2684    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2685        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2686        let estimated_size_bytes = self
2687            .data
2688            .iter()
2689            .map(|task_item| task_item.data.data_size() as usize)
2690            .sum::<usize>()
2691            * 2;
2692        let mut data_builder =
2693            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2694
2695        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2696            // Fast path, no need to unzip because there is no rep/def
2697            //
2698            // We decompress each buffer and add it to our output buffer
2699            for task_item in self.data.into_iter() {
2700                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2701                    unreachable!()
2702                };
2703                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2704                else {
2705                    unreachable!()
2706                };
2707                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2708                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2709                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2710            }
2711
2712            let unraveler = RepDefUnraveler::new(
2713                None,
2714                None,
2715                self.details.def_meaning.clone(),
2716                self.num_rows as u64,
2717            );
2718
2719            Ok(DecodedPage {
2720                data: data_builder.finish(),
2721                repdef: unraveler,
2722            })
2723        } else {
2724            // Slow path, unzipping needed
2725            let mut rep = Vec::with_capacity(self.num_rows);
2726            let mut def = Vec::with_capacity(self.num_rows);
2727
2728            for task_item in self.data.into_iter() {
2729                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2730                    unreachable!()
2731                };
2732                let mut buf_slice = fixed_data.data.as_ref();
2733                let num_values = fixed_data.num_values as usize;
2734                // We will be unzipping repdef in to `rep` and `def` and the
2735                // values into `values` (which contains the compressed values)
2736                let mut values = Vec::with_capacity(
2737                    fixed_data.data.len()
2738                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2739                );
2740                let mut visible_items = 0;
2741                for _ in 0..num_values {
2742                    // Extract rep/def
2743                    self.details
2744                        .ctrl_word_parser
2745                        .parse(buf_slice, &mut rep, &mut def);
2746                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2747
2748                    let is_visible = def
2749                        .last()
2750                        .map(|d| *d <= self.details.max_visible_def)
2751                        .unwrap_or(true);
2752                    if is_visible {
2753                        // Extract value
2754                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2755                        buf_slice = &buf_slice[self.bytes_per_value..];
2756                        visible_items += 1;
2757                    }
2758                }
2759
2760                // Finally, we decompress the values and add them to our output buffer
2761                let values_buf = LanceBuffer::from(values);
2762                let fixed_data = FixedWidthDataBlock {
2763                    bits_per_value: self.bytes_per_value as u64 * 8,
2764                    block_info: BlockInfo::new(),
2765                    data: values_buf,
2766                    num_values: visible_items,
2767                };
2768                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2769                else {
2770                    unreachable!()
2771                };
2772                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2773                data_builder.append(&decompressed, 0..visible_items);
2774            }
2775
2776            let repetition = if rep.is_empty() { None } else { Some(rep) };
2777            let definition = if def.is_empty() { None } else { Some(def) };
2778
2779            let unraveler = RepDefUnraveler::new(
2780                repetition,
2781                definition,
2782                self.details.def_meaning.clone(),
2783                self.num_rows as u64,
2784            );
2785            let data = data_builder.finish();
2786
2787            Ok(DecodedPage {
2788                data,
2789                repdef: unraveler,
2790            })
2791        }
2792    }
2793}
2794
2795#[derive(Debug)]
2796struct StructuralPrimitiveFieldSchedulingJob<'a> {
2797    scheduler: &'a StructuralPrimitiveFieldScheduler,
2798    ranges: Vec<Range<u64>>,
2799    page_idx: usize,
2800    range_idx: usize,
2801    global_row_offset: u64,
2802}
2803
2804impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2805    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2806        Self {
2807            scheduler,
2808            ranges,
2809            page_idx: 0,
2810            range_idx: 0,
2811            global_row_offset: 0,
2812        }
2813    }
2814}
2815
2816impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2817    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
2818        if self.range_idx >= self.ranges.len() {
2819            return Ok(Vec::new());
2820        }
2821        // Get our current range
2822        let mut range = self.ranges[self.range_idx].clone();
2823        let priority = range.start;
2824
2825        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2826        trace!(
2827            "Current range is {:?} and current page has {} rows",
2828            range,
2829            cur_page.num_rows
2830        );
2831        // Skip entire pages until we have some overlap with our next range
2832        while cur_page.num_rows + self.global_row_offset <= range.start {
2833            self.global_row_offset += cur_page.num_rows;
2834            self.page_idx += 1;
2835            trace!("Skipping entire page of {} rows", cur_page.num_rows);
2836            cur_page = &self.scheduler.page_schedulers[self.page_idx];
2837        }
2838
2839        // Now the cur_page has overlap with range.  Continue looping through ranges
2840        // until we find a range that exceeds the current page
2841
2842        let mut ranges_in_page = Vec::new();
2843        while cur_page.num_rows + self.global_row_offset > range.start {
2844            range.start = range.start.max(self.global_row_offset);
2845            let start_in_page = range.start - self.global_row_offset;
2846            let end_in_page = start_in_page + (range.end - range.start);
2847            let end_in_page = end_in_page.min(cur_page.num_rows);
2848            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2849
2850            ranges_in_page.push(start_in_page..end_in_page);
2851            if last_in_range {
2852                self.range_idx += 1;
2853                if self.range_idx == self.ranges.len() {
2854                    break;
2855                }
2856                range = self.ranges[self.range_idx].clone();
2857            } else {
2858                break;
2859            }
2860        }
2861
2862        trace!(
2863            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2864            ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
2865            ranges_in_page.len(),
2866            cur_page.num_rows,
2867            priority,
2868            self.scheduler.column_index,
2869            cur_page.page_index,
2870        );
2871
2872        self.global_row_offset += cur_page.num_rows;
2873        self.page_idx += 1;
2874
2875        let page_decoders = cur_page
2876            .scheduler
2877            .schedule_ranges(&ranges_in_page, context.io())?;
2878
2879        let cur_path = context.current_path();
2880        page_decoders
2881            .into_iter()
2882            .map(|page_load_task| {
2883                let cur_path = cur_path.clone();
2884                let page_decoder = page_load_task.decoder_fut;
2885                let unloaded_page = async move {
2886                    let page_decoder = page_decoder.await?;
2887                    Ok(LoadedPageShard {
2888                        decoder: page_decoder,
2889                        path: cur_path,
2890                    })
2891                }
2892                .boxed();
2893                Ok(ScheduledScanLine {
2894                    decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
2895                    rows_scheduled: page_load_task.num_rows,
2896                })
2897            })
2898            .collect::<Result<Vec<_>>>()
2899    }
2900}
2901
2902#[derive(Debug)]
2903struct PageInfoAndScheduler {
2904    page_index: usize,
2905    num_rows: u64,
2906    scheduler: Box<dyn StructuralPageScheduler>,
2907}
2908
2909/// A scheduler for a leaf node
2910///
2911/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
2912/// appropriate for the layout of the page.
2913#[derive(Debug)]
2914pub struct StructuralPrimitiveFieldScheduler {
2915    page_schedulers: Vec<PageInfoAndScheduler>,
2916    column_index: u32,
2917}
2918
2919impl StructuralPrimitiveFieldScheduler {
2920    pub fn try_new(
2921        column_info: &ColumnInfo,
2922        decompressors: &dyn DecompressionStrategy,
2923        cache_repetition_index: bool,
2924        target_field: &Field,
2925    ) -> Result<Self> {
2926        let page_schedulers = column_info
2927            .page_infos
2928            .iter()
2929            .enumerate()
2930            .map(|(page_index, page_info)| {
2931                Self::page_info_to_scheduler(
2932                    page_info,
2933                    page_index,
2934                    decompressors,
2935                    cache_repetition_index,
2936                    target_field,
2937                )
2938            })
2939            .collect::<Result<Vec<_>>>()?;
2940        Ok(Self {
2941            page_schedulers,
2942            column_index: column_info.index,
2943        })
2944    }
2945
2946    fn page_layout_to_scheduler(
2947        page_info: &PageInfo,
2948        page_layout: &PageLayout,
2949        decompressors: &dyn DecompressionStrategy,
2950        cache_repetition_index: bool,
2951        target_field: &Field,
2952    ) -> Result<Box<dyn StructuralPageScheduler>> {
2953        use pb21::page_layout::Layout;
2954        Ok(match page_layout.layout.as_ref().expect_ok()? {
2955            Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
2956                &page_info.buffer_offsets_and_sizes,
2957                page_info.priority,
2958                mini_block.num_items,
2959                mini_block,
2960                decompressors,
2961            )?),
2962            Layout::FullZipLayout(full_zip) => {
2963                let mut scheduler = FullZipScheduler::try_new(
2964                    &page_info.buffer_offsets_and_sizes,
2965                    page_info.priority,
2966                    page_info.num_rows,
2967                    full_zip,
2968                    decompressors,
2969                )?;
2970                scheduler.enable_cache = cache_repetition_index;
2971                Box::new(scheduler)
2972            }
2973            Layout::AllNullLayout(all_null) => {
2974                let def_meaning = all_null
2975                    .layers
2976                    .iter()
2977                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2978                    .collect::<Vec<_>>();
2979                if def_meaning.len() == 1
2980                    && def_meaning[0] == DefinitionInterpretation::NullableItem
2981                {
2982                    Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
2983                } else {
2984                    Box::new(ComplexAllNullScheduler::new(
2985                        page_info.buffer_offsets_and_sizes.clone(),
2986                        def_meaning.into(),
2987                    )) as Box<dyn StructuralPageScheduler>
2988                }
2989            }
2990            Layout::BlobLayout(blob) => {
2991                let inner_scheduler = Self::page_layout_to_scheduler(
2992                    page_info,
2993                    blob.inner_layout.as_ref().expect_ok()?.as_ref(),
2994                    decompressors,
2995                    cache_repetition_index,
2996                    target_field,
2997                )?;
2998                let def_meaning = blob
2999                    .layers
3000                    .iter()
3001                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3002                    .collect::<Vec<_>>();
3003                if matches!(target_field.data_type(), DataType::Struct(_)) {
3004                    // User wants to decode blob into struct
3005                    Box::new(BlobDescriptionPageScheduler::new(
3006                        inner_scheduler,
3007                        def_meaning.into(),
3008                    ))
3009                } else {
3010                    // User wants to decode blob into binary data
3011                    Box::new(BlobPageScheduler::new(
3012                        inner_scheduler,
3013                        page_info.priority,
3014                        page_info.num_rows,
3015                        def_meaning.into(),
3016                    ))
3017                }
3018            }
3019        })
3020    }
3021
3022    fn page_info_to_scheduler(
3023        page_info: &PageInfo,
3024        page_index: usize,
3025        decompressors: &dyn DecompressionStrategy,
3026        cache_repetition_index: bool,
3027        target_field: &Field,
3028    ) -> Result<PageInfoAndScheduler> {
3029        let page_layout = page_info.encoding.as_structural();
3030        let scheduler = Self::page_layout_to_scheduler(
3031            page_info,
3032            page_layout,
3033            decompressors,
3034            cache_repetition_index,
3035            target_field,
3036        )?;
3037        Ok(PageInfoAndScheduler {
3038            page_index,
3039            num_rows: page_info.num_rows,
3040            scheduler,
3041        })
3042    }
3043}
3044
3045pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3046    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3047}
3048
3049pub struct NoCachedPageData;
3050
3051impl DeepSizeOf for NoCachedPageData {
3052    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3053        0
3054    }
3055}
3056impl CachedPageData for NoCachedPageData {
3057    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3058        self
3059    }
3060}
3061
3062pub struct CachedFieldData {
3063    pages: Vec<Arc<dyn CachedPageData>>,
3064}
3065
3066impl DeepSizeOf for CachedFieldData {
3067    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3068        self.pages.deep_size_of_children(ctx)
3069    }
3070}
3071
3072// Cache key for field data
3073#[derive(Debug, Clone)]
3074pub struct FieldDataCacheKey {
3075    pub column_index: u32,
3076}
3077
3078impl CacheKey for FieldDataCacheKey {
3079    type ValueType = CachedFieldData;
3080
3081    fn key(&self) -> std::borrow::Cow<'_, str> {
3082        self.column_index.to_string().into()
3083    }
3084}
3085
3086impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3087    fn initialize<'a>(
3088        &'a mut self,
3089        _filter: &'a FilterExpression,
3090        context: &'a SchedulerContext,
3091    ) -> BoxFuture<'a, Result<()>> {
3092        let cache_key = FieldDataCacheKey {
3093            column_index: self.column_index,
3094        };
3095        let cache = context.cache().clone();
3096
3097        async move {
3098            if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3099                self.page_schedulers
3100                    .iter_mut()
3101                    .zip(cached_data.pages.iter())
3102                    .for_each(|(page_scheduler, cached_data)| {
3103                        page_scheduler.scheduler.load(cached_data);
3104                    });
3105                return Ok(());
3106            }
3107
3108            let page_data = self
3109                .page_schedulers
3110                .iter_mut()
3111                .map(|s| s.scheduler.initialize(context.io()))
3112                .collect::<FuturesOrdered<_>>();
3113
3114            let page_data = page_data.try_collect::<Vec<_>>().await?;
3115            let cached_data = Arc::new(CachedFieldData { pages: page_data });
3116            cache.insert_with_key(&cache_key, cached_data).await;
3117            Ok(())
3118        }
3119        .boxed()
3120    }
3121
3122    fn schedule_ranges<'a>(
3123        &'a self,
3124        ranges: &[Range<u64>],
3125        _filter: &FilterExpression,
3126    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3127        let ranges = ranges.to_vec();
3128        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3129            self, ranges,
3130        )))
3131    }
3132}
3133
3134/// Takes the output from several pages decoders and
3135/// concatenates them.
3136#[derive(Debug)]
3137pub struct StructuralCompositeDecodeArrayTask {
3138    tasks: Vec<Box<dyn DecodePageTask>>,
3139    should_validate: bool,
3140    data_type: DataType,
3141}
3142
3143impl StructuralCompositeDecodeArrayTask {
3144    fn restore_validity(
3145        array: Arc<dyn Array>,
3146        unraveler: &mut CompositeRepDefUnraveler,
3147    ) -> Arc<dyn Array> {
3148        let validity = unraveler.unravel_validity(array.len());
3149        let Some(validity) = validity else {
3150            return array;
3151        };
3152        if array.data_type() == &DataType::Null {
3153            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3154            return array;
3155        }
3156        assert_eq!(validity.len(), array.len());
3157        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3158        // adding null buffers to the array here
3159        make_array(unsafe {
3160            array
3161                .to_data()
3162                .into_builder()
3163                .nulls(Some(validity))
3164                .build_unchecked()
3165        })
3166    }
3167}
3168
3169impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3170    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3171        let mut arrays = Vec::with_capacity(self.tasks.len());
3172        let mut unravelers = Vec::with_capacity(self.tasks.len());
3173        for task in self.tasks {
3174            let decoded = task.decode()?;
3175            unravelers.push(decoded.repdef);
3176
3177            let array = make_array(
3178                decoded
3179                    .data
3180                    .into_arrow(self.data_type.clone(), self.should_validate)?,
3181            );
3182
3183            arrays.push(array);
3184        }
3185        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3186        let array = arrow_select::concat::concat(&array_refs)?;
3187        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3188
3189        let array = Self::restore_validity(array, &mut repdef);
3190
3191        Ok(DecodedArray { array, repdef })
3192    }
3193}
3194
3195#[derive(Debug)]
3196pub struct StructuralPrimitiveFieldDecoder {
3197    field: Arc<ArrowField>,
3198    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3199    should_validate: bool,
3200    rows_drained_in_current: u64,
3201}
3202
3203impl StructuralPrimitiveFieldDecoder {
3204    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3205        Self {
3206            field: field.clone(),
3207            page_decoders: VecDeque::new(),
3208            should_validate,
3209            rows_drained_in_current: 0,
3210        }
3211    }
3212}
3213
3214impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3215    fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3216        assert!(child.path.is_empty());
3217        self.page_decoders.push_back(child.decoder);
3218        Ok(())
3219    }
3220
3221    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3222        let mut remaining = num_rows;
3223        let mut tasks = Vec::new();
3224        while remaining > 0 {
3225            let cur_page = self.page_decoders.front_mut().unwrap();
3226            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3227            let to_take = num_in_page.min(remaining);
3228
3229            let task = cur_page.drain(to_take)?;
3230            tasks.push(task);
3231
3232            if to_take == num_in_page {
3233                self.page_decoders.pop_front();
3234                self.rows_drained_in_current = 0;
3235            } else {
3236                self.rows_drained_in_current += to_take;
3237            }
3238
3239            remaining -= to_take;
3240        }
3241        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3242            tasks,
3243            should_validate: self.should_validate,
3244            data_type: self.field.data_type().clone(),
3245        }))
3246    }
3247
3248    fn data_type(&self) -> &DataType {
3249        self.field.data_type()
3250    }
3251}
3252
3253/// The serialized representation of full-zip data
3254struct SerializedFullZip {
3255    /// The zipped values buffer
3256    values: LanceBuffer,
3257    /// The repetition index (only present if there is repetition)
3258    repetition_index: Option<LanceBuffer>,
3259}
3260
3261// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3262// to allow us to store a chunk size in 12 bits.
3263//
3264// If we directly record the size in bytes with 12 bits we would be limited to
3265// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3266// words we can store the # of words instead which gives us 32KiB.  We want
3267// at least 24KiB so we can handle even the worst case of
3268// - 4Ki values compressed into an 8186 byte buffer
3269// - 4 bytes to describe rep & def lengths
3270// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3271//   plan for it)
3272//
3273// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3274// values like offsets to be stored in a mini-block and safely read back out.  It also
3275// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3276// needed).
3277//
3278// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3279// the page" and "aligned to 8 bytes from the start of the file."
3280const MINIBLOCK_ALIGNMENT: usize = 8;
3281
3282/// An encoder for primitive (leaf) arrays
3283///
3284/// This encoder is fairly complicated and follows a number of paths depending
3285/// on the data.
3286///
3287/// First, we convert the validity & offsets information into repetition and
3288/// definition levels.  Then we compress the data itself into a single buffer.
3289///
3290/// If the data is narrow then we encode the data in small chunks (each chunk
3291/// should be a few disk sectors and contains a buffer of repetition, a buffer
3292/// of definition, and a buffer of value data).  This approach is called
3293/// "mini-block".  These mini-blocks are stored into a single data buffer.
3294///
3295/// If the data is wide then we zip together the repetition and definition value
3296/// with the value data into a single buffer.  This approach is called "zipped".
3297///
3298/// If there is any repetition information then we create a repetition index
3299///
3300/// In addition, the compression process may create zero or more metadata buffers.
3301/// For example, a dictionary compression will create dictionary metadata.  Any
3302/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3303/// stored in a separate buffer on disk and read at initialization time.
3304///
3305/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3306/// at (roughly) the end of the file so there is, at most, one read per column of
3307/// metadata per file.
3308pub struct PrimitiveStructuralEncoder {
3309    // Accumulates arrays until we have enough data to justify a disk page
3310    accumulation_queue: AccumulationQueue,
3311
3312    keep_original_array: bool,
3313    accumulated_repdefs: Vec<RepDefBuilder>,
3314    // The compression strategy we will use to compress the data
3315    compression_strategy: Arc<dyn CompressionStrategy>,
3316    column_index: u32,
3317    field: Field,
3318    encoding_metadata: Arc<HashMap<String, String>>,
3319}
3320
3321struct CompressedLevelsChunk {
3322    data: LanceBuffer,
3323    num_levels: u16,
3324}
3325
3326struct CompressedLevels {
3327    data: Vec<CompressedLevelsChunk>,
3328    compression: CompressiveEncoding,
3329    rep_index: Option<LanceBuffer>,
3330}
3331
3332struct SerializedMiniBlockPage {
3333    num_buffers: u64,
3334    data: LanceBuffer,
3335    metadata: LanceBuffer,
3336}
3337
3338impl PrimitiveStructuralEncoder {
3339    pub fn try_new(
3340        options: &EncodingOptions,
3341        compression_strategy: Arc<dyn CompressionStrategy>,
3342        column_index: u32,
3343        field: Field,
3344        encoding_metadata: Arc<HashMap<String, String>>,
3345    ) -> Result<Self> {
3346        Ok(Self {
3347            accumulation_queue: AccumulationQueue::new(
3348                options.cache_bytes_per_column,
3349                column_index,
3350                options.keep_original_array,
3351            ),
3352            keep_original_array: options.keep_original_array,
3353            accumulated_repdefs: Vec::new(),
3354            column_index,
3355            compression_strategy,
3356            field,
3357            encoding_metadata,
3358        })
3359    }
3360
3361    // TODO: This is a heuristic we may need to tune at some point
3362    //
3363    // As data gets narrow then the "zipping" process gets too expensive
3364    //   and we prefer mini-block
3365    // As data gets wide then the # of values per block shrinks (very wide)
3366    //   data doesn't even fit in a mini-block and the block overhead gets
3367    //   too large and we prefer zipped.
3368    fn is_narrow(data_block: &DataBlock) -> bool {
3369        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3370
3371        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3372            let max_len_array = max_len_array
3373                .as_any()
3374                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3375                .unwrap();
3376            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3377                return true;
3378            }
3379        }
3380        false
3381    }
3382
3383    fn prefers_miniblock(
3384        data_block: &DataBlock,
3385        encoding_metadata: &HashMap<String, String>,
3386    ) -> bool {
3387        // If the user specifically requested miniblock then use it
3388        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3389            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3390        }
3391        // Otherwise only use miniblock if it is narrow
3392        Self::is_narrow(data_block)
3393    }
3394
3395    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3396        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3397        // user specifically requested not to use it (in which case we're probably going
3398        // to emit an error)
3399        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3400            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3401        }
3402        true
3403    }
3404
3405    // Converts value data, repetition levels, and definition levels into a single
3406    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3407    // which tells us the size of each block.  Finally, if repetition is present then
3408    // we also create a buffer for the repetition index.
3409    //
3410    // Each chunk is serialized as:
3411    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3412    //
3413    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3414    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3415    //
3416    // Each block has a u16 word of metadata.  The upper 12 bits contain the
3417    // # of 8-byte words in the block (if the block does not fill the final word
3418    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3419    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3420    // 0xA)  All blocks except the last must have power-of-two number of values.
3421    // This not only makes metadata smaller but it makes decoding easier since
3422    // batch sizes are typically a power of 2.  4 bits would allow us to express
3423    // up to 16Ki values but we restrict this further to 4Ki values.
3424    //
3425    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3426    //
3427    // All metadata words are serialized (as little endian) into a single buffer
3428    // of metadata values.
3429    //
3430    // If there is repetition then we also create a repetition index.  This is a
3431    // single buffer of integer vectors (stored in row major order).  There is one
3432    // entry for each chunk.  The size of the vector is based on the depth of random
3433    // access we want to support.
3434    //
3435    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3436    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3437    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3438    // levels of nested access and so on.
3439    //
3440    // The first number in the vector is the number of top-level rows that complete in
3441    // the chunk.  The second number is the number of second-level rows that complete
3442    // after the final top-level row completed (or beginning of the chunk if no top-level
3443    // row completes in the chunk).  And so on.  The final number in the vector is always
3444    // the number of leftover items not covered by earlier entries in the vector.
3445    //
3446    // Currently we are limited to 0 levels of nested access but that will change in the
3447    // future.
3448    //
3449    // The repetition index and the chunk metadata are read at initialization time and
3450    // cached in memory.
3451    fn serialize_miniblocks(
3452        miniblocks: MiniBlockCompressed,
3453        rep: Option<Vec<CompressedLevelsChunk>>,
3454        def: Option<Vec<CompressedLevelsChunk>>,
3455    ) -> SerializedMiniBlockPage {
3456        let bytes_rep = rep
3457            .as_ref()
3458            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3459            .unwrap_or(0);
3460        let bytes_def = def
3461            .as_ref()
3462            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3463            .unwrap_or(0);
3464        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3465        let mut num_buffers = miniblocks.data.len();
3466        if rep.is_some() {
3467            num_buffers += 1;
3468        }
3469        if def.is_some() {
3470            num_buffers += 1;
3471        }
3472        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3473        let max_extra = 9 * num_buffers;
3474        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3475        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3476
3477        let mut rep_iter = rep.map(|r| r.into_iter());
3478        let mut def_iter = def.map(|d| d.into_iter());
3479
3480        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3481        for chunk in miniblocks.chunks {
3482            let start_pos = data_buffer.len();
3483            // Start of chunk should be aligned
3484            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3485
3486            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3487            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3488
3489            // Write the number of levels, or 0 if there is no rep/def
3490            let num_levels = rep
3491                .as_ref()
3492                .map(|r| r.num_levels)
3493                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3494            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3495
3496            // Write the buffer lengths
3497            if let Some(rep) = rep.as_ref() {
3498                let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3499                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3500            }
3501            if let Some(def) = def.as_ref() {
3502                let bytes_def = u16::try_from(def.data.len()).unwrap();
3503                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3504            }
3505
3506            for buffer_size in &chunk.buffer_sizes {
3507                let bytes = *buffer_size;
3508                data_buffer.extend_from_slice(&bytes.to_le_bytes());
3509            }
3510
3511            // Pad
3512            let add_padding = |data_buffer: &mut Vec<u8>| {
3513                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3514                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3515            };
3516            add_padding(&mut data_buffer);
3517
3518            // Write the buffers themselves
3519            if let Some(rep) = rep.as_ref() {
3520                data_buffer.extend_from_slice(&rep.data);
3521                add_padding(&mut data_buffer);
3522            }
3523            if let Some(def) = def.as_ref() {
3524                data_buffer.extend_from_slice(&def.data);
3525                add_padding(&mut data_buffer);
3526            }
3527            for (buffer_size, (buffer, buffer_offset)) in chunk
3528                .buffer_sizes
3529                .iter()
3530                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3531            {
3532                let start = *buffer_offset;
3533                let end = start + *buffer_size as usize;
3534                *buffer_offset += *buffer_size as usize;
3535                data_buffer.extend_from_slice(&buffer[start..end]);
3536                add_padding(&mut data_buffer);
3537            }
3538
3539            let chunk_bytes = data_buffer.len() - start_pos;
3540            assert!(chunk_bytes <= 32 * 1024);
3541            assert!(chunk_bytes > 0);
3542            assert_eq!(chunk_bytes % 8, 0);
3543            // We subtract 1 here from chunk_bytes because we want to be able to express
3544            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3545            // 0xFFF
3546            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3547            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3548
3549            let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3550            meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3551        }
3552
3553        let data_buffer = LanceBuffer::from(data_buffer);
3554        let metadata_buffer = LanceBuffer::from(meta_buffer);
3555
3556        SerializedMiniBlockPage {
3557            num_buffers: miniblocks.data.len() as u64,
3558            data: data_buffer,
3559            metadata: metadata_buffer,
3560        }
3561    }
3562
3563    /// Compresses a buffer of levels into chunks
3564    ///
3565    /// If these are repetition levels then we also calculate the repetition index here (that
3566    /// is the third return value)
3567    fn compress_levels(
3568        mut levels: RepDefSlicer<'_>,
3569        num_elements: u64,
3570        compression_strategy: &dyn CompressionStrategy,
3571        chunks: &[MiniBlockChunk],
3572        // This will be 0 if we are compressing def levels
3573        max_rep: u16,
3574    ) -> Result<CompressedLevels> {
3575        let mut rep_index = if max_rep > 0 {
3576            Vec::with_capacity(chunks.len())
3577        } else {
3578            vec![]
3579        };
3580        // Make the levels into a FixedWidth data block
3581        let num_levels = levels.num_levels() as u64;
3582        let levels_buf = levels.all_levels().clone();
3583
3584        let mut fixed_width_block = FixedWidthDataBlock {
3585            data: levels_buf,
3586            bits_per_value: 16,
3587            num_values: num_levels,
3588            block_info: BlockInfo::new(),
3589        };
3590        // Compute statistics to enable optimal compression for rep/def levels
3591        fixed_width_block.compute_stat();
3592
3593        let levels_block = DataBlock::FixedWidth(fixed_width_block);
3594        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3595        // Pick a block compressor
3596        let (compressor, compressor_desc) =
3597            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3598        // Compress blocks of levels (sized according to the chunks)
3599        let mut level_chunks = Vec::with_capacity(chunks.len());
3600        let mut values_counter = 0;
3601        for (chunk_idx, chunk) in chunks.iter().enumerate() {
3602            let chunk_num_values = chunk.num_values(values_counter, num_elements);
3603            debug_assert!(chunk_num_values > 0);
3604            values_counter += chunk_num_values;
3605            let chunk_levels = if chunk_idx < chunks.len() - 1 {
3606                levels.slice_next(chunk_num_values as usize)
3607            } else {
3608                levels.slice_rest()
3609            };
3610            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3611            if max_rep > 0 {
3612                // If max_rep > 0 then we are working with rep levels and we need
3613                // to calculate the repetition index.  The repetition index for a
3614                // chunk is currently 2 values (in the future it may be more).
3615                //
3616                // The first value is the number of rows that _finish_ in the
3617                // chunk.
3618                //
3619                // The second value is the number of "leftovers" after the last
3620                // finished row in the chunk.
3621                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3622                let rep_values = rep_values.as_ref();
3623
3624                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3625                // will count it in the previous chunk)
3626                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3627                let num_leftovers = if chunk_idx < chunks.len() - 1 {
3628                    rep_values
3629                        .iter()
3630                        .rev()
3631                        .position(|v| *v == max_rep)
3632                        // # of leftovers includes the max_rep spot
3633                        .map(|pos| pos + 1)
3634                        .unwrap_or(rep_values.len())
3635                } else {
3636                    // Last chunk can't have leftovers
3637                    0
3638                };
3639
3640                if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
3641                    // This chunk starts with a new row and so, if we thought we had leftovers
3642                    // in the previous chunk, we were mistaken
3643                    // TODO: Can use unchecked here
3644                    let rep_len = rep_index.len();
3645                    if rep_index[rep_len - 1] != 0 {
3646                        // We thought we had leftovers but that was actually a full row
3647                        rep_index[rep_len - 2] += 1;
3648                        rep_index[rep_len - 1] = 0;
3649                    }
3650                }
3651
3652                if chunk_idx == chunks.len() - 1 {
3653                    // The final list
3654                    num_rows += 1;
3655                }
3656                rep_index.push(num_rows as u64);
3657                rep_index.push(num_leftovers as u64);
3658            }
3659            let mut chunk_fixed_width = FixedWidthDataBlock {
3660                data: chunk_levels,
3661                bits_per_value: 16,
3662                num_values: num_chunk_levels,
3663                block_info: BlockInfo::new(),
3664            };
3665            chunk_fixed_width.compute_stat();
3666            let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
3667            let compressed_levels = compressor.compress(chunk_levels_block)?;
3668            level_chunks.push(CompressedLevelsChunk {
3669                data: compressed_levels,
3670                num_levels: num_chunk_levels as u16,
3671            });
3672        }
3673        debug_assert_eq!(levels.num_levels_remaining(), 0);
3674        let rep_index = if rep_index.is_empty() {
3675            None
3676        } else {
3677            Some(LanceBuffer::reinterpret_vec(rep_index))
3678        };
3679        Ok(CompressedLevels {
3680            data: level_chunks,
3681            compression: compressor_desc,
3682            rep_index,
3683        })
3684    }
3685
3686    fn encode_simple_all_null(
3687        column_idx: u32,
3688        num_rows: u64,
3689        row_number: u64,
3690    ) -> Result<EncodedPage> {
3691        let description = ProtobufUtils21::simple_all_null_layout();
3692        Ok(EncodedPage {
3693            column_idx,
3694            data: vec![],
3695            description: PageEncoding::Structural(description),
3696            num_rows,
3697            row_number,
3698        })
3699    }
3700
3701    // Encodes a page where all values are null but we have rep/def
3702    // information that we need to store (e.g. to distinguish between
3703    // different kinds of null)
3704    fn encode_complex_all_null(
3705        column_idx: u32,
3706        repdefs: Vec<RepDefBuilder>,
3707        row_number: u64,
3708        num_rows: u64,
3709    ) -> Result<EncodedPage> {
3710        let repdef = RepDefBuilder::serialize(repdefs);
3711
3712        // TODO: Actually compress repdef
3713        let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3714            LanceBuffer::reinterpret_slice(rep.clone())
3715        } else {
3716            LanceBuffer::empty()
3717        };
3718
3719        let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3720            LanceBuffer::reinterpret_slice(def.clone())
3721        } else {
3722            LanceBuffer::empty()
3723        };
3724
3725        let description = ProtobufUtils21::all_null_layout(&repdef.def_meaning);
3726        Ok(EncodedPage {
3727            column_idx,
3728            data: vec![rep_bytes, def_bytes],
3729            description: PageEncoding::Structural(description),
3730            num_rows,
3731            row_number,
3732        })
3733    }
3734
3735    #[allow(clippy::too_many_arguments)]
3736    fn encode_miniblock(
3737        column_idx: u32,
3738        field: &Field,
3739        compression_strategy: &dyn CompressionStrategy,
3740        data: DataBlock,
3741        repdefs: Vec<RepDefBuilder>,
3742        row_number: u64,
3743        dictionary_data: Option<DataBlock>,
3744        num_rows: u64,
3745    ) -> Result<EncodedPage> {
3746        let repdef = RepDefBuilder::serialize(repdefs);
3747
3748        if let DataBlock::AllNull(_null_block) = data {
3749            // We should not be using mini-block for all-null.  There are other structural
3750            // encodings for that.
3751            unreachable!()
3752        }
3753
3754        let num_items = data.num_values();
3755
3756        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3757        let (compressed_data, value_encoding) = compressor.compress(data)?;
3758
3759        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3760
3761        let mut compressed_rep = repdef
3762            .rep_slicer()
3763            .map(|rep_slicer| {
3764                Self::compress_levels(
3765                    rep_slicer,
3766                    num_items,
3767                    compression_strategy,
3768                    &compressed_data.chunks,
3769                    max_rep,
3770                )
3771            })
3772            .transpose()?;
3773
3774        let (rep_index, rep_index_depth) =
3775            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3776                Some(rep_index) => (Some(rep_index.clone()), 1),
3777                None => (None, 0),
3778            };
3779
3780        let mut compressed_def = repdef
3781            .def_slicer()
3782            .map(|def_slicer| {
3783                Self::compress_levels(
3784                    def_slicer,
3785                    num_items,
3786                    compression_strategy,
3787                    &compressed_data.chunks,
3788                    /*max_rep=*/ 0,
3789                )
3790            })
3791            .transpose()?;
3792
3793        // TODO: Parquet sparsely encodes values here.  We could do the same but
3794        // then we won't have log2 values per chunk.  This means more metadata
3795        // and potentially more decoder asymmetry.  However, it may be worth
3796        // investigating at some point
3797
3798        let rep_data = compressed_rep
3799            .as_mut()
3800            .map(|cr| std::mem::take(&mut cr.data));
3801        let def_data = compressed_def
3802            .as_mut()
3803            .map(|cd| std::mem::take(&mut cd.data));
3804
3805        let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3806
3807        // Metadata, Data, Dictionary, (maybe) Repetition Index
3808        let mut data = Vec::with_capacity(4);
3809        data.push(serialized.metadata);
3810        data.push(serialized.data);
3811
3812        if let Some(dictionary_data) = dictionary_data {
3813            let num_dictionary_items = dictionary_data.num_values();
3814            // field in `create_block_compressor` is not used currently.
3815            let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3816
3817            let (compressor, dictionary_encoding) = compression_strategy
3818                .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3819            let dictionary_buffer = compressor.compress(dictionary_data)?;
3820
3821            data.push(dictionary_buffer);
3822            if let Some(rep_index) = rep_index {
3823                data.push(rep_index);
3824            }
3825
3826            let description = ProtobufUtils21::miniblock_layout(
3827                compressed_rep.map(|cr| cr.compression),
3828                compressed_def.map(|cd| cd.compression),
3829                value_encoding,
3830                rep_index_depth,
3831                serialized.num_buffers,
3832                Some((dictionary_encoding, num_dictionary_items)),
3833                &repdef.def_meaning,
3834                num_items,
3835            );
3836            Ok(EncodedPage {
3837                num_rows,
3838                column_idx,
3839                data,
3840                description: PageEncoding::Structural(description),
3841                row_number,
3842            })
3843        } else {
3844            let description = ProtobufUtils21::miniblock_layout(
3845                compressed_rep.map(|cr| cr.compression),
3846                compressed_def.map(|cd| cd.compression),
3847                value_encoding,
3848                rep_index_depth,
3849                serialized.num_buffers,
3850                None,
3851                &repdef.def_meaning,
3852                num_items,
3853            );
3854
3855            if let Some(rep_index) = rep_index {
3856                let view = rep_index.borrow_to_typed_slice::<u64>();
3857                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3858                debug_assert_eq!(total, num_rows);
3859
3860                data.push(rep_index);
3861            }
3862
3863            Ok(EncodedPage {
3864                num_rows,
3865                column_idx,
3866                data,
3867                description: PageEncoding::Structural(description),
3868                row_number,
3869            })
3870        }
3871    }
3872
3873    // For fixed-size data we encode < control word | data > for each value
3874    fn serialize_full_zip_fixed(
3875        fixed: FixedWidthDataBlock,
3876        mut repdef: ControlWordIterator,
3877        num_values: u64,
3878    ) -> SerializedFullZip {
3879        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3880        let mut zipped_data = Vec::with_capacity(len);
3881
3882        let max_rep_index_val = if repdef.has_repetition() {
3883            len as u64
3884        } else {
3885            // Setting this to 0 means we won't write a repetition index
3886            0
3887        };
3888        let mut rep_index_builder =
3889            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3890
3891        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
3892        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
3893        assert_eq!(
3894            fixed.bits_per_value % 8,
3895            0,
3896            "Non-byte aligned full-zip compression not yet supported"
3897        );
3898
3899        let bytes_per_value = fixed.bits_per_value as usize / 8;
3900        let mut offset = 0;
3901
3902        if bytes_per_value == 0 {
3903            // No data, just dump the repdef into the buffer
3904            while let Some(control) = repdef.append_next(&mut zipped_data) {
3905                if control.is_new_row {
3906                    // We have finished a row
3907                    debug_assert!(offset <= len);
3908                    // SAFETY: We know that `start <= len`
3909                    unsafe { rep_index_builder.append(offset as u64) };
3910                }
3911                offset = zipped_data.len();
3912            }
3913        } else {
3914            // We have data, zip it with the repdef
3915            let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3916            while let Some(control) = repdef.append_next(&mut zipped_data) {
3917                if control.is_new_row {
3918                    // We have finished a row
3919                    debug_assert!(offset <= len);
3920                    // SAFETY: We know that `start <= len`
3921                    unsafe { rep_index_builder.append(offset as u64) };
3922                }
3923                if control.is_visible {
3924                    let value = data_iter.next().unwrap();
3925                    zipped_data.extend_from_slice(value);
3926                }
3927                offset = zipped_data.len();
3928            }
3929        }
3930
3931        debug_assert_eq!(zipped_data.len(), len);
3932        // Put the final value in the rep index
3933        // SAFETY: `zipped_data.len() == len`
3934        unsafe {
3935            rep_index_builder.append(zipped_data.len() as u64);
3936        }
3937
3938        let zipped_data = LanceBuffer::from(zipped_data);
3939        let rep_index = rep_index_builder.into_data();
3940        let rep_index = if rep_index.is_empty() {
3941            None
3942        } else {
3943            Some(LanceBuffer::from(rep_index))
3944        };
3945        SerializedFullZip {
3946            values: zipped_data,
3947            repetition_index: rep_index,
3948        }
3949    }
3950
3951    // For variable-size data we encode < control word | length | data > for each value
3952    //
3953    // In addition, we create a second buffer, the repetition index
3954    fn serialize_full_zip_variable(
3955        variable: VariableWidthBlock,
3956        mut repdef: ControlWordIterator,
3957        num_items: u64,
3958    ) -> SerializedFullZip {
3959        let bytes_per_offset = variable.bits_per_offset as usize / 8;
3960        assert_eq!(
3961            variable.bits_per_offset % 8,
3962            0,
3963            "Only byte-aligned offsets supported"
3964        );
3965        let len = variable.data.len()
3966            + repdef.bytes_per_word() * num_items as usize
3967            + bytes_per_offset * variable.num_values as usize;
3968        let mut buf = Vec::with_capacity(len);
3969
3970        let max_rep_index_val = len as u64;
3971        let mut rep_index_builder =
3972            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
3973
3974        // TODO: byte pack the item lengths with varint encoding
3975        match bytes_per_offset {
3976            4 => {
3977                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
3978                let mut rep_offset = 0;
3979                let mut windows_iter = offs.as_ref().windows(2);
3980                while let Some(control) = repdef.append_next(&mut buf) {
3981                    if control.is_new_row {
3982                        // We have finished a row
3983                        debug_assert!(rep_offset <= len);
3984                        // SAFETY: We know that `buf.len() <= len`
3985                        unsafe { rep_index_builder.append(rep_offset as u64) };
3986                    }
3987                    if control.is_visible {
3988                        let window = windows_iter.next().unwrap();
3989                        if control.is_valid_item {
3990                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3991                            buf.extend_from_slice(
3992                                &variable.data[window[0] as usize..window[1] as usize],
3993                            );
3994                        }
3995                    }
3996                    rep_offset = buf.len();
3997                }
3998            }
3999            8 => {
4000                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4001                let mut rep_offset = 0;
4002                let mut windows_iter = offs.as_ref().windows(2);
4003                while let Some(control) = repdef.append_next(&mut buf) {
4004                    if control.is_new_row {
4005                        // We have finished a row
4006                        debug_assert!(rep_offset <= len);
4007                        // SAFETY: We know that `buf.len() <= len`
4008                        unsafe { rep_index_builder.append(rep_offset as u64) };
4009                    }
4010                    if control.is_visible {
4011                        let window = windows_iter.next().unwrap();
4012                        if control.is_valid_item {
4013                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4014                            buf.extend_from_slice(
4015                                &variable.data[window[0] as usize..window[1] as usize],
4016                            );
4017                        }
4018                    }
4019                    rep_offset = buf.len();
4020                }
4021            }
4022            _ => panic!("Unsupported offset size"),
4023        }
4024
4025        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4026        // if we are over `len` then we have a bug.
4027        debug_assert!(buf.len() <= len);
4028        // Put the final value in the rep index
4029        // SAFETY: `zipped_data.len() == len`
4030        unsafe {
4031            rep_index_builder.append(buf.len() as u64);
4032        }
4033
4034        let zipped_data = LanceBuffer::from(buf);
4035        let rep_index = rep_index_builder.into_data();
4036        debug_assert!(!rep_index.is_empty());
4037        let rep_index = Some(LanceBuffer::from(rep_index));
4038        SerializedFullZip {
4039            values: zipped_data,
4040            repetition_index: rep_index,
4041        }
4042    }
4043
4044    /// Serializes data into a single buffer according to the full-zip format which zips
4045    /// together the repetition, definition, and value data into a single buffer.
4046    fn serialize_full_zip(
4047        compressed_data: PerValueDataBlock,
4048        repdef: ControlWordIterator,
4049        num_items: u64,
4050    ) -> SerializedFullZip {
4051        match compressed_data {
4052            PerValueDataBlock::Fixed(fixed) => {
4053                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4054            }
4055            PerValueDataBlock::Variable(var) => {
4056                Self::serialize_full_zip_variable(var, repdef, num_items)
4057            }
4058        }
4059    }
4060
4061    fn encode_full_zip(
4062        column_idx: u32,
4063        field: &Field,
4064        compression_strategy: &dyn CompressionStrategy,
4065        data: DataBlock,
4066        repdefs: Vec<RepDefBuilder>,
4067        row_number: u64,
4068        num_lists: u64,
4069    ) -> Result<EncodedPage> {
4070        let repdef = RepDefBuilder::serialize(repdefs);
4071        let max_rep = repdef
4072            .repetition_levels
4073            .as_ref()
4074            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4075        let max_def = repdef
4076            .definition_levels
4077            .as_ref()
4078            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4079
4080        // To handle FSL we just flatten
4081        // let data = data.flatten();
4082
4083        let (num_items, num_visible_items) =
4084            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4085                // If there are rep levels there may be "invisible" items and we need to encode
4086                // rep_levels.len() things which might be larger than data.num_values()
4087                (rep_levels.len() as u64, data.num_values())
4088            } else {
4089                // If there are no rep levels then we encode data.num_values() things
4090                (data.num_values(), data.num_values())
4091            };
4092
4093        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4094
4095        let repdef_iter = build_control_word_iterator(
4096            repdef.repetition_levels.as_deref(),
4097            max_rep,
4098            repdef.definition_levels.as_deref(),
4099            max_def,
4100            max_visible_def,
4101            num_items as usize,
4102        );
4103        let bits_rep = repdef_iter.bits_rep();
4104        let bits_def = repdef_iter.bits_def();
4105
4106        let compressor = compression_strategy.create_per_value(field, &data)?;
4107        let (compressed_data, value_encoding) = compressor.compress(data)?;
4108
4109        let description = match &compressed_data {
4110            PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4111                bits_rep,
4112                bits_def,
4113                fixed.bits_per_value as u32,
4114                value_encoding,
4115                &repdef.def_meaning,
4116                num_items as u32,
4117                num_visible_items as u32,
4118            ),
4119            PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4120                bits_rep,
4121                bits_def,
4122                variable.bits_per_offset as u32,
4123                value_encoding,
4124                &repdef.def_meaning,
4125                num_items as u32,
4126                num_visible_items as u32,
4127            ),
4128        };
4129
4130        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4131
4132        let data = if let Some(repindex) = zipped.repetition_index {
4133            vec![zipped.values, repindex]
4134        } else {
4135            vec![zipped.values]
4136        };
4137
4138        Ok(EncodedPage {
4139            num_rows: num_lists,
4140            column_idx,
4141            data,
4142            description: PageEncoding::Structural(description),
4143            row_number,
4144        })
4145    }
4146
4147    /// Estimates the total size of dictionary-encoded data
4148    ///
4149    /// Dictionary encoding splits data into two parts:
4150    /// 1. Dictionary: stores unique values
4151    /// 2. Indices: maps each value to a dictionary entry
4152    ///
4153    /// For FixedWidth (e.g., 128-bit Decimal):
4154    /// - Dictionary: cardinality × 16 bytes (128 bits per value)
4155    /// - Indices: num_values × 4 bytes (32-bit i32)
4156    ///
4157    /// For VariableWidth (strings/binary):
4158    /// - Dictionary values: cardinality × avg_value_size (actual data)
4159    /// - Dictionary offsets: cardinality × offset_size (32 or 64 bits)
4160    /// - Indices: num_values × offset_size (same as dictionary offsets)
4161    fn estimate_dict_size(data_block: &DataBlock) -> Option<u64> {
4162        let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4163            cardinality_array.as_primitive::<UInt64Type>().value(0)
4164        } else {
4165            return None;
4166        };
4167
4168        let num_values = data_block.num_values();
4169
4170        match data_block {
4171            DataBlock::FixedWidth(_) => {
4172                // Dictionary: cardinality unique values at 128 bits each
4173                let dict_size = cardinality * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
4174                // Indices: num_values indices at 32 bits each
4175                let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8);
4176                Some(dict_size + indices_size)
4177            }
4178            DataBlock::VariableWidth(var) => {
4179                // Only 32-bit and 64-bit offsets are supported
4180                if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4181                    return None;
4182                }
4183                let bits_per_offset = var.bits_per_offset as u64;
4184
4185                let data_size = data_block.data_size();
4186                let avg_value_size = data_size / num_values;
4187
4188                // Dictionary values: actual bytes of unique strings/binary
4189                let dict_values_size = cardinality * avg_value_size;
4190                // Dictionary offsets: pointers into dictionary values
4191                let dict_offsets_size = cardinality * (bits_per_offset / 8);
4192                // Indices: map each row to dictionary entry
4193                let indices_size = num_values * (bits_per_offset / 8);
4194
4195                Some(dict_values_size + dict_offsets_size + indices_size)
4196            }
4197            _ => None,
4198        }
4199    }
4200
4201    fn should_dictionary_encode(data_block: &DataBlock, field: &Field) -> bool {
4202        // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip
4203        // estimating the size
4204        if !matches!(
4205            data_block,
4206            DataBlock::FixedWidth(_) | DataBlock::VariableWidth(_)
4207        ) {
4208            return false;
4209        }
4210
4211        // Don't dictionary encode tiny arrays
4212        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4213            .ok()
4214            .and_then(|val| val.parse().ok())
4215            .unwrap_or(100);
4216        if data_block.num_values() < too_small {
4217            return false;
4218        }
4219
4220        // Get size ratio from metadata or env var, default to 0.8
4221        let threshold_ratio = field
4222            .metadata
4223            .get(DICT_SIZE_RATIO_META_KEY)
4224            .and_then(|val| val.parse::<f64>().ok())
4225            .or_else(|| {
4226                env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4227                    .ok()
4228                    .and_then(|val| val.parse().ok())
4229            })
4230            .unwrap_or(0.8);
4231
4232        // Validate size ratio is in valid range
4233        if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4234            panic!(
4235                "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4236                threshold_ratio
4237            );
4238        }
4239
4240        // Get raw data size
4241        let data_size = data_block.data_size();
4242
4243        // Estimate dictionary-encoded size
4244        let Some(encoded_size) = Self::estimate_dict_size(data_block) else {
4245            return false;
4246        };
4247
4248        let size_ratio_actual = if data_size > 0 {
4249            encoded_size as f64 / data_size as f64
4250        } else {
4251            return false;
4252        };
4253        size_ratio_actual < threshold_ratio
4254    }
4255
4256    // Creates an encode task, consuming all buffered data
4257    fn do_flush(
4258        &mut self,
4259        arrays: Vec<ArrayRef>,
4260        repdefs: Vec<RepDefBuilder>,
4261        row_number: u64,
4262        num_rows: u64,
4263    ) -> Result<Vec<EncodeTask>> {
4264        let column_idx = self.column_index;
4265        let compression_strategy = self.compression_strategy.clone();
4266        let field = self.field.clone();
4267        let encoding_metadata = self.encoding_metadata.clone();
4268        let task = spawn_cpu(move || {
4269            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4270
4271            if num_values == 0 {
4272                // We should not encode empty arrays.  So if we get here that should mean that we
4273                // either have all empty lists or all null lists (or a mix).  We still need to encode
4274                // the rep/def information but we can skip the data encoding.
4275                log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
4276                return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4277            }
4278            let num_nulls = arrays
4279                .iter()
4280                .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4281                .sum::<u64>();
4282
4283            if num_values == num_nulls {
4284                return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4285                    log::debug!(
4286                        "Encoding column {} with {} items ({} rows) using simple-null layout",
4287                        column_idx,
4288                        num_values,
4289                        num_rows
4290                    );
4291                    // Simple case, no rep/def and all nulls, we don't need to encode any data
4292                    Self::encode_simple_all_null(column_idx, num_values, row_number)
4293                } else {
4294                    log::debug!(
4295                        "Encoding column {} with {} items ({} rows) using complex-null layout",
4296                        column_idx,
4297                        num_values,
4298                        num_rows
4299                    );
4300                    // If we get here then we have definition levels and we need to store those
4301                    Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4302                };
4303            }
4304
4305            if let DataType::Struct(fields) = &field.data_type() {
4306                if fields.is_empty() {
4307                    if repdefs.iter().any(|rd| !rd.is_empty()) {
4308                        return Err(Error::InvalidInput { source: format!("Empty structs with rep/def information are not yet supported.  The field {} is an empty struct that either has nulls or is in a list.", field.name).into(), location: location!() });
4309                    }
4310                    // This is maybe a little confusing but the reader should never look at this anyways and it
4311                    // seems like overkill to invent a new layout just for "empty structs".
4312                    return Self::encode_simple_all_null(column_idx, num_values, row_number);
4313                }
4314            }
4315
4316            let data_block = DataBlock::from_arrays(&arrays, num_values);
4317
4318            let requires_full_zip_packed_struct =
4319                if let DataBlock::Struct(ref struct_data_block) = data_block {
4320                    struct_data_block.has_variable_width_child()
4321                } else {
4322                    false
4323                };
4324
4325            if requires_full_zip_packed_struct {
4326                log::debug!(
4327                    "Encoding column {} with {} items using full-zip packed struct layout",
4328                    column_idx,
4329                    num_values
4330                );
4331                return Self::encode_full_zip(
4332                    column_idx,
4333                    &field,
4334                    compression_strategy.as_ref(),
4335                    data_block,
4336                    repdefs,
4337                    row_number,
4338                    num_rows,
4339                );
4340            }
4341
4342            if let DataBlock::Dictionary(dict) = data_block {
4343                log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
4344                let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
4345                // TODO: https://github.com/lancedb/lance/issues/4809
4346                // If we compute stats on dictionary_data_block => panic.
4347                // If we don't compute stats on indices_data_block => panic.
4348                // This is messy.  Don't make me call compute_stat ever.
4349                indices_data_block.compute_stat();
4350                Self::encode_miniblock(
4351                    column_idx,
4352                    &field,
4353                    compression_strategy.as_ref(),
4354                    indices_data_block,
4355                    repdefs,
4356                    row_number,
4357                    Some(dictionary_data_block),
4358                    num_rows
4359                )
4360            } else if Self::should_dictionary_encode(&data_block, &field) {
4361                log::debug!(
4362                    "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4363                    column_idx,
4364                    num_values
4365                );
4366                let (indices_data_block, dictionary_data_block) =
4367                    dict::dictionary_encode(data_block);
4368                Self::encode_miniblock(
4369                    column_idx,
4370                    &field,
4371                    compression_strategy.as_ref(),
4372                    indices_data_block,
4373                    repdefs,
4374                    row_number,
4375                    Some(dictionary_data_block),
4376                    num_rows,
4377                )
4378            } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4379                log::debug!(
4380                    "Encoding column {} with {} items using mini-block layout",
4381                    column_idx,
4382                    num_values
4383                );
4384                Self::encode_miniblock(
4385                    column_idx,
4386                    &field,
4387                    compression_strategy.as_ref(),
4388                    data_block,
4389                    repdefs,
4390                    row_number,
4391                    None,
4392                    num_rows,
4393                )
4394            } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4395                log::debug!(
4396                    "Encoding column {} with {} items using full-zip layout",
4397                    column_idx,
4398                    num_values
4399                );
4400                Self::encode_full_zip(
4401                    column_idx,
4402                    &field,
4403                    compression_strategy.as_ref(),
4404                    data_block,
4405                    repdefs,
4406                    row_number,
4407                    num_rows,
4408                )
4409            } else {
4410                Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4411            }
4412        })
4413        .boxed();
4414        Ok(vec![task])
4415    }
4416
4417    fn extract_validity_buf(
4418        array: Arc<dyn Array>,
4419        repdef: &mut RepDefBuilder,
4420        keep_original_array: bool,
4421    ) -> Result<Arc<dyn Array>> {
4422        if let Some(validity) = array.nulls() {
4423            if keep_original_array {
4424                repdef.add_validity_bitmap(validity.clone());
4425            } else {
4426                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4427            }
4428            let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
4429            Ok(make_array(data_no_nulls))
4430        } else {
4431            repdef.add_no_null(array.len());
4432            Ok(array)
4433        }
4434    }
4435
4436    fn extract_validity(
4437        mut array: Arc<dyn Array>,
4438        repdef: &mut RepDefBuilder,
4439        keep_original_array: bool,
4440    ) -> Result<Arc<dyn Array>> {
4441        match array.data_type() {
4442            DataType::Null => {
4443                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4444                Ok(array)
4445            }
4446            DataType::Dictionary(_, _) => {
4447                array = dict::normalize_dict_nulls(array)?;
4448                Self::extract_validity_buf(array, repdef, keep_original_array)
4449            }
4450            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
4451            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
4452            // NOT want to increase the repdef depth.
4453            //
4454            // This would be quite catasrophic for something like vector embeddings.  Imagine we
4455            // had thousands of vectors and some were null but no vector contained null items.  If
4456            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
4457            // with a rep/def value for every single item in the vector.
4458            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4459        }
4460    }
4461}
4462
4463impl FieldEncoder for PrimitiveStructuralEncoder {
4464    // Buffers data, if there is enough to write a page then we create an encode task
4465    fn maybe_encode(
4466        &mut self,
4467        array: ArrayRef,
4468        _external_buffers: &mut OutOfLineBuffers,
4469        mut repdef: RepDefBuilder,
4470        row_number: u64,
4471        num_rows: u64,
4472    ) -> Result<Vec<EncodeTask>> {
4473        let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
4474        self.accumulated_repdefs.push(repdef);
4475
4476        if let Some((arrays, row_number, num_rows)) =
4477            self.accumulation_queue.insert(array, row_number, num_rows)
4478        {
4479            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4480            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4481        } else {
4482            Ok(vec![])
4483        }
4484    }
4485
4486    // If there is any data left in the buffer then create an encode task from it
4487    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4488        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4489            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4490            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4491        } else {
4492            Ok(vec![])
4493        }
4494    }
4495
4496    fn num_columns(&self) -> u32 {
4497        1
4498    }
4499
4500    fn finish(
4501        &mut self,
4502        _external_buffers: &mut OutOfLineBuffers,
4503    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4504        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4505    }
4506}
4507
4508#[cfg(test)]
4509#[allow(clippy::single_range_in_vec_init)]
4510mod tests {
4511    use super::{
4512        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4513        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4514        FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4515        StructuralPageScheduler,
4516    };
4517    use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
4518    use crate::data::BlockInfo;
4519    use crate::decoder::PageEncoding;
4520    use crate::encodings::logical::primitive::{
4521        ChunkDrainInstructions, PrimitiveStructuralEncoder,
4522    };
4523    use crate::format::pb21;
4524    use crate::format::pb21::compressive_encoding::Compression;
4525    use crate::testing::{check_round_trip_encoding_of_data, TestCases};
4526    use crate::version::LanceFileVersion;
4527    use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array};
4528    use arrow_schema::DataType;
4529    use std::collections::HashMap;
4530    use std::{collections::VecDeque, sync::Arc};
4531
4532    #[test]
4533    fn test_is_narrow() {
4534        let int8_array = Int8Array::from(vec![1, 2, 3]);
4535        let array_ref: ArrayRef = Arc::new(int8_array);
4536        let block = DataBlock::from_array(array_ref);
4537
4538        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4539
4540        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4541        let block = DataBlock::from_array(string_array);
4542        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4543
4544        let string_array = StringArray::from(vec![
4545            Some("hello world".repeat(100)),
4546            Some("world".to_string()),
4547        ]);
4548        let block = DataBlock::from_array(string_array);
4549        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4550    }
4551
4552    #[test]
4553    fn test_map_range() {
4554        // Null in the middle
4555        // [[A, B, C], [D, E], NULL, [F, G, H]]
4556        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4557        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4558        let max_visible_def = 0;
4559        let total_items = 8;
4560        let max_rep = 1;
4561
4562        let check = |range, expected_item_range, expected_level_range| {
4563            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4564                range,
4565                rep.as_ref(),
4566                def.as_ref(),
4567                max_rep,
4568                max_visible_def,
4569                total_items,
4570                PreambleAction::Absent,
4571            );
4572            assert_eq!(item_range, expected_item_range);
4573            assert_eq!(level_range, expected_level_range);
4574        };
4575
4576        check(0..1, 0..3, 0..3);
4577        check(1..2, 3..5, 3..5);
4578        check(2..3, 5..5, 5..6);
4579        check(3..4, 5..8, 6..9);
4580        check(0..2, 0..5, 0..5);
4581        check(1..3, 3..5, 3..6);
4582        check(2..4, 5..8, 5..9);
4583        check(0..3, 0..5, 0..6);
4584        check(1..4, 3..8, 3..9);
4585        check(0..4, 0..8, 0..9);
4586
4587        // Null at start
4588        // [NULL, [A, B], [C]]
4589        let rep = Some(vec![1, 1, 0, 1]);
4590        let def = Some(vec![1, 0, 0, 0]);
4591        let max_visible_def = 0;
4592        let total_items = 3;
4593
4594        let check = |range, expected_item_range, expected_level_range| {
4595            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4596                range,
4597                rep.as_ref(),
4598                def.as_ref(),
4599                max_rep,
4600                max_visible_def,
4601                total_items,
4602                PreambleAction::Absent,
4603            );
4604            assert_eq!(item_range, expected_item_range);
4605            assert_eq!(level_range, expected_level_range);
4606        };
4607
4608        check(0..1, 0..0, 0..1);
4609        check(1..2, 0..2, 1..3);
4610        check(2..3, 2..3, 3..4);
4611        check(0..2, 0..2, 0..3);
4612        check(1..3, 0..3, 1..4);
4613        check(0..3, 0..3, 0..4);
4614
4615        // Null at end
4616        // [[A], [B, C], NULL]
4617        let rep = Some(vec![1, 1, 0, 1]);
4618        let def = Some(vec![0, 0, 0, 1]);
4619        let max_visible_def = 0;
4620        let total_items = 3;
4621
4622        let check = |range, expected_item_range, expected_level_range| {
4623            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4624                range,
4625                rep.as_ref(),
4626                def.as_ref(),
4627                max_rep,
4628                max_visible_def,
4629                total_items,
4630                PreambleAction::Absent,
4631            );
4632            assert_eq!(item_range, expected_item_range);
4633            assert_eq!(level_range, expected_level_range);
4634        };
4635
4636        check(0..1, 0..1, 0..1);
4637        check(1..2, 1..3, 1..3);
4638        check(2..3, 3..3, 3..4);
4639        check(0..2, 0..3, 0..3);
4640        check(1..3, 1..3, 1..4);
4641        check(0..3, 0..3, 0..4);
4642
4643        // No nulls, with repetition
4644        // [[A, B], [C, D], [E, F]]
4645        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4646        let def: Option<&[u16]> = None;
4647        let max_visible_def = 0;
4648        let total_items = 6;
4649
4650        let check = |range, expected_item_range, expected_level_range| {
4651            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4652                range,
4653                rep.as_ref(),
4654                def.as_ref(),
4655                max_rep,
4656                max_visible_def,
4657                total_items,
4658                PreambleAction::Absent,
4659            );
4660            assert_eq!(item_range, expected_item_range);
4661            assert_eq!(level_range, expected_level_range);
4662        };
4663
4664        check(0..1, 0..2, 0..2);
4665        check(1..2, 2..4, 2..4);
4666        check(2..3, 4..6, 4..6);
4667        check(0..2, 0..4, 0..4);
4668        check(1..3, 2..6, 2..6);
4669        check(0..3, 0..6, 0..6);
4670
4671        // No repetition, with nulls (this case is trivial)
4672        // [A, B, NULL, C]
4673        let rep: Option<&[u16]> = None;
4674        let def = Some(vec![0, 0, 1, 0]);
4675        let max_visible_def = 1;
4676        let total_items = 4;
4677
4678        let check = |range, expected_item_range, expected_level_range| {
4679            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4680                range,
4681                rep.as_ref(),
4682                def.as_ref(),
4683                max_rep,
4684                max_visible_def,
4685                total_items,
4686                PreambleAction::Absent,
4687            );
4688            assert_eq!(item_range, expected_item_range);
4689            assert_eq!(level_range, expected_level_range);
4690        };
4691
4692        check(0..1, 0..1, 0..1);
4693        check(1..2, 1..2, 1..2);
4694        check(2..3, 2..3, 2..3);
4695        check(0..2, 0..2, 0..2);
4696        check(1..3, 1..3, 1..3);
4697        check(0..3, 0..3, 0..3);
4698
4699        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
4700        // [[..., A] [B, C], NULL]
4701        //
4702        // What we do will depend on the preamble action
4703        let rep = Some(vec![0, 1, 0, 1]);
4704        let def = Some(vec![0, 0, 0, 1]);
4705        let max_visible_def = 0;
4706        let total_items = 3;
4707
4708        let check = |range, expected_item_range, expected_level_range| {
4709            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4710                range,
4711                rep.as_ref(),
4712                def.as_ref(),
4713                max_rep,
4714                max_visible_def,
4715                total_items,
4716                PreambleAction::Take,
4717            );
4718            assert_eq!(item_range, expected_item_range);
4719            assert_eq!(level_range, expected_level_range);
4720        };
4721
4722        // If we are taking the preamble then the range must start at 0
4723        check(0..1, 0..3, 0..3);
4724        check(0..2, 0..3, 0..4);
4725
4726        let check = |range, expected_item_range, expected_level_range| {
4727            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4728                range,
4729                rep.as_ref(),
4730                def.as_ref(),
4731                max_rep,
4732                max_visible_def,
4733                total_items,
4734                PreambleAction::Skip,
4735            );
4736            assert_eq!(item_range, expected_item_range);
4737            assert_eq!(level_range, expected_level_range);
4738        };
4739
4740        check(0..1, 1..3, 1..3);
4741        check(1..2, 3..3, 3..4);
4742        check(0..2, 1..3, 1..4);
4743
4744        // Another preamble case but now it doesn't end with a new list
4745        // [[..., A], NULL, [D, E]]
4746        //
4747        // What we do will depend on the preamble action
4748        let rep = Some(vec![0, 1, 1, 0]);
4749        let def = Some(vec![0, 1, 0, 0]);
4750        let max_visible_def = 0;
4751        let total_items = 4;
4752
4753        let check = |range, expected_item_range, expected_level_range| {
4754            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4755                range,
4756                rep.as_ref(),
4757                def.as_ref(),
4758                max_rep,
4759                max_visible_def,
4760                total_items,
4761                PreambleAction::Take,
4762            );
4763            assert_eq!(item_range, expected_item_range);
4764            assert_eq!(level_range, expected_level_range);
4765        };
4766
4767        // If we are taking the preamble then the range must start at 0
4768        check(0..1, 0..1, 0..2);
4769        check(0..2, 0..3, 0..4);
4770
4771        let check = |range, expected_item_range, expected_level_range| {
4772            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4773                range,
4774                rep.as_ref(),
4775                def.as_ref(),
4776                max_rep,
4777                max_visible_def,
4778                total_items,
4779                PreambleAction::Skip,
4780            );
4781            assert_eq!(item_range, expected_item_range);
4782            assert_eq!(level_range, expected_level_range);
4783        };
4784
4785        // If we are taking the preamble then the range must start at 0
4786        check(0..1, 1..1, 1..2);
4787        check(1..2, 1..3, 2..4);
4788        check(0..2, 1..3, 1..4);
4789
4790        // Now a preamble case without any definition levels
4791        // [[..., A] [B, C], [D]]
4792        let rep = Some(vec![0, 1, 0, 1]);
4793        let def: Option<Vec<u16>> = None;
4794        let max_visible_def = 0;
4795        let total_items = 4;
4796
4797        let check = |range, expected_item_range, expected_level_range| {
4798            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4799                range,
4800                rep.as_ref(),
4801                def.as_ref(),
4802                max_rep,
4803                max_visible_def,
4804                total_items,
4805                PreambleAction::Take,
4806            );
4807            assert_eq!(item_range, expected_item_range);
4808            assert_eq!(level_range, expected_level_range);
4809        };
4810
4811        // If we are taking the preamble then the range must start at 0
4812        check(0..1, 0..3, 0..3);
4813        check(0..2, 0..4, 0..4);
4814
4815        let check = |range, expected_item_range, expected_level_range| {
4816            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4817                range,
4818                rep.as_ref(),
4819                def.as_ref(),
4820                max_rep,
4821                max_visible_def,
4822                total_items,
4823                PreambleAction::Skip,
4824            );
4825            assert_eq!(item_range, expected_item_range);
4826            assert_eq!(level_range, expected_level_range);
4827        };
4828
4829        check(0..1, 1..3, 1..3);
4830        check(1..2, 3..4, 3..4);
4831        check(0..2, 1..4, 1..4);
4832
4833        // If we have nested lists then non-top level lists may be empty/null
4834        // and we need to make sure we still handle them as invisible items (we
4835        // failed to do this previously)
4836        let rep = Some(vec![2, 1, 2, 0, 1, 2]);
4837        let def = Some(vec![0, 1, 2, 0, 0, 0]);
4838        let max_rep = 2;
4839        let max_visible_def = 0;
4840        let total_items = 4;
4841
4842        let check = |range, expected_item_range, expected_level_range| {
4843            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4844                range,
4845                rep.as_ref(),
4846                def.as_ref(),
4847                max_rep,
4848                max_visible_def,
4849                total_items,
4850                PreambleAction::Absent,
4851            );
4852            assert_eq!(item_range, expected_item_range);
4853            assert_eq!(level_range, expected_level_range);
4854        };
4855
4856        check(0..3, 0..4, 0..6);
4857        check(0..1, 0..1, 0..2);
4858        check(1..2, 1..3, 2..5);
4859        check(2..3, 3..4, 5..6);
4860
4861        // Invisible items in a preamble that we are taking (regressing a previous failure)
4862        let rep = Some(vec![0, 0, 1, 0, 1, 1]);
4863        let def = Some(vec![0, 1, 0, 0, 0, 0]);
4864        let max_rep = 1;
4865        let max_visible_def = 0;
4866        let total_items = 5;
4867
4868        let check = |range, expected_item_range, expected_level_range| {
4869            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4870                range,
4871                rep.as_ref(),
4872                def.as_ref(),
4873                max_rep,
4874                max_visible_def,
4875                total_items,
4876                PreambleAction::Take,
4877            );
4878            assert_eq!(item_range, expected_item_range);
4879            assert_eq!(level_range, expected_level_range);
4880        };
4881
4882        check(0..0, 0..1, 0..2);
4883        check(0..1, 0..3, 0..4);
4884        check(0..2, 0..4, 0..5);
4885
4886        // Skip preamble (with invis items) and skip a few rows (with invis items)
4887        // and then take a few rows but not all the rows
4888        let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
4889        let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
4890        let max_rep = 1;
4891        let max_visible_def = 0;
4892        let total_items = 5;
4893
4894        let check = |range, expected_item_range, expected_level_range| {
4895            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4896                range,
4897                rep.as_ref(),
4898                def.as_ref(),
4899                max_rep,
4900                max_visible_def,
4901                total_items,
4902                PreambleAction::Skip,
4903            );
4904            assert_eq!(item_range, expected_item_range);
4905            assert_eq!(level_range, expected_level_range);
4906        };
4907
4908        check(2..3, 2..4, 5..7);
4909    }
4910
4911    #[test]
4912    fn test_schedule_instructions() {
4913        // Convert repetition index to bytes for testing
4914        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
4915        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
4916        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
4917
4918        let check = |user_ranges, expected_instructions| {
4919            let instructions =
4920                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4921            assert_eq!(instructions, expected_instructions);
4922        };
4923
4924        // The instructions we expect if we're grabbing the whole range
4925        let expected_take_all = vec![
4926            ChunkInstructions {
4927                chunk_idx: 0,
4928                preamble: PreambleAction::Absent,
4929                rows_to_skip: 0,
4930                rows_to_take: 6,
4931                take_trailer: true,
4932            },
4933            ChunkInstructions {
4934                chunk_idx: 1,
4935                preamble: PreambleAction::Take,
4936                rows_to_skip: 0,
4937                rows_to_take: 2,
4938                take_trailer: false,
4939            },
4940            ChunkInstructions {
4941                chunk_idx: 2,
4942                preamble: PreambleAction::Absent,
4943                rows_to_skip: 0,
4944                rows_to_take: 5,
4945                take_trailer: true,
4946            },
4947            ChunkInstructions {
4948                chunk_idx: 3,
4949                preamble: PreambleAction::Take,
4950                rows_to_skip: 0,
4951                rows_to_take: 1,
4952                take_trailer: false,
4953            },
4954        ];
4955
4956        // Take all as 1 range
4957        check(&[0..14], expected_take_all.clone());
4958
4959        // Take all a individual rows
4960        check(
4961            &[
4962                0..1,
4963                1..2,
4964                2..3,
4965                3..4,
4966                4..5,
4967                5..6,
4968                6..7,
4969                7..8,
4970                8..9,
4971                9..10,
4972                10..11,
4973                11..12,
4974                12..13,
4975                13..14,
4976            ],
4977            expected_take_all,
4978        );
4979
4980        // Test some partial takes
4981
4982        // 2 rows in the same chunk but not contiguous
4983        check(
4984            &[0..1, 3..4],
4985            vec![
4986                ChunkInstructions {
4987                    chunk_idx: 0,
4988                    preamble: PreambleAction::Absent,
4989                    rows_to_skip: 0,
4990                    rows_to_take: 1,
4991                    take_trailer: false,
4992                },
4993                ChunkInstructions {
4994                    chunk_idx: 0,
4995                    preamble: PreambleAction::Absent,
4996                    rows_to_skip: 3,
4997                    rows_to_take: 1,
4998                    take_trailer: false,
4999                },
5000            ],
5001        );
5002
5003        // Taking just a trailer/preamble
5004        check(
5005            &[5..6],
5006            vec![
5007                ChunkInstructions {
5008                    chunk_idx: 0,
5009                    preamble: PreambleAction::Absent,
5010                    rows_to_skip: 5,
5011                    rows_to_take: 1,
5012                    take_trailer: true,
5013                },
5014                ChunkInstructions {
5015                    chunk_idx: 1,
5016                    preamble: PreambleAction::Take,
5017                    rows_to_skip: 0,
5018                    rows_to_take: 0,
5019                    take_trailer: false,
5020                },
5021            ],
5022        );
5023
5024        // Skipping an entire chunk
5025        check(
5026            &[7..10],
5027            vec![
5028                ChunkInstructions {
5029                    chunk_idx: 1,
5030                    preamble: PreambleAction::Skip,
5031                    rows_to_skip: 1,
5032                    rows_to_take: 1,
5033                    take_trailer: false,
5034                },
5035                ChunkInstructions {
5036                    chunk_idx: 2,
5037                    preamble: PreambleAction::Absent,
5038                    rows_to_skip: 0,
5039                    rows_to_take: 2,
5040                    take_trailer: false,
5041                },
5042            ],
5043        );
5044    }
5045
5046    #[test]
5047    fn test_drain_instructions() {
5048        fn drain_from_instructions(
5049            instructions: &mut VecDeque<ChunkInstructions>,
5050            mut rows_desired: u64,
5051            need_preamble: &mut bool,
5052            skip_in_chunk: &mut u64,
5053        ) -> Vec<ChunkDrainInstructions> {
5054            // Note: instructions.len() is an upper bound, we typically take much fewer
5055            let mut drain_instructions = Vec::with_capacity(instructions.len());
5056            while rows_desired > 0 || *need_preamble {
5057                let (next_instructions, consumed_chunk) = instructions
5058                    .front()
5059                    .unwrap()
5060                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5061                if consumed_chunk {
5062                    instructions.pop_front();
5063                }
5064                drain_instructions.push(next_instructions);
5065            }
5066            drain_instructions
5067        }
5068
5069        // Convert repetition index to bytes for testing
5070        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5071        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5072        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5073        let user_ranges = vec![1..7, 10..14];
5074
5075        // First, schedule the ranges
5076        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5077
5078        let mut to_drain = VecDeque::from(scheduled.clone());
5079
5080        // Now we drain in batches of 4
5081
5082        let mut need_preamble = false;
5083        let mut skip_in_chunk = 0;
5084
5085        let next_batch =
5086            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5087
5088        assert!(!need_preamble);
5089        assert_eq!(skip_in_chunk, 4);
5090        assert_eq!(
5091            next_batch,
5092            vec![ChunkDrainInstructions {
5093                chunk_instructions: scheduled[0].clone(),
5094                rows_to_take: 4,
5095                rows_to_skip: 0,
5096                preamble_action: PreambleAction::Absent,
5097            }]
5098        );
5099
5100        let next_batch =
5101            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5102
5103        assert!(!need_preamble);
5104        assert_eq!(skip_in_chunk, 2);
5105
5106        assert_eq!(
5107            next_batch,
5108            vec![
5109                ChunkDrainInstructions {
5110                    chunk_instructions: scheduled[0].clone(),
5111                    rows_to_take: 1,
5112                    rows_to_skip: 4,
5113                    preamble_action: PreambleAction::Absent,
5114                },
5115                ChunkDrainInstructions {
5116                    chunk_instructions: scheduled[1].clone(),
5117                    rows_to_take: 1,
5118                    rows_to_skip: 0,
5119                    preamble_action: PreambleAction::Take,
5120                },
5121                ChunkDrainInstructions {
5122                    chunk_instructions: scheduled[2].clone(),
5123                    rows_to_take: 2,
5124                    rows_to_skip: 0,
5125                    preamble_action: PreambleAction::Absent,
5126                }
5127            ]
5128        );
5129
5130        let next_batch =
5131            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5132
5133        assert!(!need_preamble);
5134        assert_eq!(skip_in_chunk, 0);
5135
5136        assert_eq!(
5137            next_batch,
5138            vec![
5139                ChunkDrainInstructions {
5140                    chunk_instructions: scheduled[2].clone(),
5141                    rows_to_take: 1,
5142                    rows_to_skip: 2,
5143                    preamble_action: PreambleAction::Absent,
5144                },
5145                ChunkDrainInstructions {
5146                    chunk_instructions: scheduled[3].clone(),
5147                    rows_to_take: 1,
5148                    rows_to_skip: 0,
5149                    preamble_action: PreambleAction::Take,
5150                },
5151            ]
5152        );
5153
5154        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
5155        let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
5156        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5157        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5158        let user_ranges = vec![0..28];
5159
5160        // First, schedule the ranges
5161        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5162
5163        let mut to_drain = VecDeque::from(scheduled.clone());
5164
5165        // Drain first chunk and some of second chunk
5166
5167        let mut need_preamble = false;
5168        let mut skip_in_chunk = 0;
5169
5170        let next_batch =
5171            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5172
5173        assert_eq!(
5174            next_batch,
5175            vec![
5176                ChunkDrainInstructions {
5177                    chunk_instructions: scheduled[0].clone(),
5178                    rows_to_take: 6,
5179                    rows_to_skip: 0,
5180                    preamble_action: PreambleAction::Absent,
5181                },
5182                ChunkDrainInstructions {
5183                    chunk_instructions: scheduled[1].clone(),
5184                    rows_to_take: 1,
5185                    rows_to_skip: 0,
5186                    preamble_action: PreambleAction::Take,
5187                },
5188            ]
5189        );
5190
5191        assert!(!need_preamble);
5192        assert_eq!(skip_in_chunk, 1);
5193
5194        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
5195        // we get a drain task to take the preamble of the third chunk (and nothing else)
5196        let next_batch =
5197            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5198
5199        assert_eq!(
5200            next_batch,
5201            vec![
5202                ChunkDrainInstructions {
5203                    chunk_instructions: scheduled[1].clone(),
5204                    rows_to_take: 2,
5205                    rows_to_skip: 1,
5206                    preamble_action: PreambleAction::Skip,
5207                },
5208                ChunkDrainInstructions {
5209                    chunk_instructions: scheduled[2].clone(),
5210                    rows_to_take: 0,
5211                    rows_to_skip: 0,
5212                    preamble_action: PreambleAction::Take,
5213                },
5214            ]
5215        );
5216
5217        assert!(!need_preamble);
5218        assert_eq!(skip_in_chunk, 0);
5219    }
5220
5221    #[tokio::test]
5222    async fn test_fullzip_repetition_index_caching() {
5223        use crate::testing::SimulatedScheduler;
5224
5225        // Simplified FixedPerValueDecompressor for testing
5226        #[derive(Debug)]
5227        struct TestFixedDecompressor;
5228
5229        impl FixedPerValueDecompressor for TestFixedDecompressor {
5230            fn decompress(
5231                &self,
5232                _data: FixedWidthDataBlock,
5233                _num_rows: u64,
5234            ) -> crate::Result<DataBlock> {
5235                unimplemented!("Test decompressor")
5236            }
5237
5238            fn bits_per_value(&self) -> u64 {
5239                32
5240            }
5241        }
5242
5243        // Create test repetition index data
5244        let rows_in_page = 100u64;
5245        let bytes_per_value = 4u64;
5246        let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
5247
5248        // Create mock repetition index data
5249        let mut rep_index_data = Vec::new();
5250        for i in 0..=rows_in_page {
5251            let offset = (i * 100) as u32; // Each row starts at i * 100 bytes
5252            rep_index_data.extend_from_slice(&offset.to_le_bytes());
5253        }
5254
5255        // Simulate storage with the repetition index at position 1000
5256        let mut full_data = vec![0u8; 1000];
5257        full_data.extend_from_slice(&rep_index_data);
5258        full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after
5259
5260        let data = bytes::Bytes::from(full_data);
5261        let io = Arc::new(SimulatedScheduler::new(data));
5262        let _cache = Arc::new(lance_core::cache::LanceCache::with_capacity(1024 * 1024));
5263
5264        // Create FullZipScheduler with repetition index
5265        let mut scheduler = FullZipScheduler {
5266            data_buf_position: 0,
5267            rep_index: Some(FullZipRepIndexDetails {
5268                buf_position: 1000,
5269                bytes_per_value,
5270            }),
5271            priority: 0,
5272            rows_in_page,
5273            bits_per_offset: 32,
5274            details: Arc::new(FullZipDecodeDetails {
5275                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5276                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5277                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5278                max_rep: 0,
5279                max_visible_def: 0,
5280            }),
5281            cached_state: None,
5282            enable_cache: true, // Enable caching for test
5283        };
5284
5285        // First initialization should load and cache the repetition index
5286        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5287        let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5288
5289        // Verify that we got a FullZipCacheableState (not NoCachedPageData)
5290        let is_cached = cached_data1
5291            .clone()
5292            .as_arc_any()
5293            .downcast::<FullZipCacheableState>()
5294            .is_ok();
5295        assert!(
5296            is_cached,
5297            "Expected FullZipCacheableState, got NoCachedPageData"
5298        );
5299
5300        // Load the cached data into the scheduler
5301        scheduler.load(&cached_data1);
5302
5303        // Verify that cached_state is now populated
5304        assert!(
5305            scheduler.cached_state.is_some(),
5306            "cached_state should be populated after load"
5307        );
5308
5309        // Verify the cached data contains the repetition index
5310        let cached_state = scheduler.cached_state.as_ref().unwrap();
5311
5312        // Test that schedule_ranges_rep uses the cached data
5313        let ranges = vec![0..10, 20..30];
5314        let result = scheduler.schedule_ranges_rep(
5315            &ranges,
5316            &io_dyn,
5317            FullZipRepIndexDetails {
5318                buf_position: 1000,
5319                bytes_per_value,
5320            },
5321        );
5322
5323        // The result should be OK (not an error)
5324        assert!(
5325            result.is_ok(),
5326            "schedule_ranges_rep should succeed with cached data"
5327        );
5328
5329        // Second scheduler instance should be able to use the cached data
5330        let mut scheduler2 = FullZipScheduler {
5331            data_buf_position: 0,
5332            rep_index: Some(FullZipRepIndexDetails {
5333                buf_position: 1000,
5334                bytes_per_value,
5335            }),
5336            priority: 0,
5337            rows_in_page,
5338            bits_per_offset: 32,
5339            details: scheduler.details.clone(),
5340            cached_state: None,
5341            enable_cache: true, // Enable caching for test
5342        };
5343
5344        // Load cached data from the first scheduler
5345        scheduler2.load(&cached_data1);
5346        assert!(
5347            scheduler2.cached_state.is_some(),
5348            "Second scheduler should have cached_state after load"
5349        );
5350
5351        // Verify that both schedulers have the same cached data
5352        let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5353        assert!(
5354            Arc::ptr_eq(cached_state, cached_state2),
5355            "Both schedulers should share the same cached data"
5356        );
5357    }
5358
5359    #[tokio::test]
5360    async fn test_fullzip_cache_config_controls_caching() {
5361        use crate::testing::SimulatedScheduler;
5362
5363        // Simplified FixedPerValueDecompressor for testing
5364        #[derive(Debug)]
5365        struct TestFixedDecompressor;
5366
5367        impl FixedPerValueDecompressor for TestFixedDecompressor {
5368            fn decompress(
5369                &self,
5370                _data: FixedWidthDataBlock,
5371                _num_rows: u64,
5372            ) -> crate::Result<DataBlock> {
5373                unimplemented!("Test decompressor")
5374            }
5375
5376            fn bits_per_value(&self) -> u64 {
5377                32
5378            }
5379        }
5380
5381        // Test that enable_cache flag actually controls caching behavior
5382        let rows_in_page = 1000_u64;
5383        let bytes_per_value = 4_u64;
5384
5385        // Create simulated data
5386        let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5387        let value_data = vec![0u8; 4000]; // Dummy value data
5388        let mut full_data = vec![0u8; 1000]; // Padding before rep index
5389        full_data.extend_from_slice(&rep_index_data);
5390        full_data.extend_from_slice(&value_data);
5391
5392        let data = bytes::Bytes::from(full_data);
5393        let io = Arc::new(SimulatedScheduler::new(data));
5394
5395        // Test 1: With caching disabled
5396        let mut scheduler_no_cache = FullZipScheduler {
5397            data_buf_position: 0,
5398            rep_index: Some(FullZipRepIndexDetails {
5399                buf_position: 1000,
5400                bytes_per_value,
5401            }),
5402            priority: 0,
5403            rows_in_page,
5404            bits_per_offset: 32,
5405            details: Arc::new(FullZipDecodeDetails {
5406                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5407                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5408                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5409                max_rep: 0,
5410                max_visible_def: 0,
5411            }),
5412            cached_state: None,
5413            enable_cache: false, // Caching disabled
5414        };
5415
5416        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5417        let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5418
5419        // Should return NoCachedPageData when caching is disabled
5420        assert!(
5421            cached_data
5422                .as_arc_any()
5423                .downcast_ref::<super::NoCachedPageData>()
5424                .is_some(),
5425            "With enable_cache=false, should return NoCachedPageData"
5426        );
5427
5428        // Test 2: With caching enabled
5429        let mut scheduler_with_cache = FullZipScheduler {
5430            data_buf_position: 0,
5431            rep_index: Some(FullZipRepIndexDetails {
5432                buf_position: 1000,
5433                bytes_per_value,
5434            }),
5435            priority: 0,
5436            rows_in_page,
5437            bits_per_offset: 32,
5438            details: Arc::new(FullZipDecodeDetails {
5439                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5440                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5441                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5442                max_rep: 0,
5443                max_visible_def: 0,
5444            }),
5445            cached_state: None,
5446            enable_cache: true, // Caching enabled
5447        };
5448
5449        let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5450
5451        // Should return FullZipCacheableState when caching is enabled
5452        assert!(
5453            cached_data2
5454                .as_arc_any()
5455                .downcast_ref::<super::FullZipCacheableState>()
5456                .is_some(),
5457            "With enable_cache=true, should return FullZipCacheableState"
5458        );
5459    }
5460
5461    /// This test is used to reproduce fuzz test https://github.com/lancedb/lance/issues/4492
5462    #[tokio::test]
5463    async fn test_fuzz_issue_4492_empty_rep_values() {
5464        use lance_datagen::{array, gen_batch, RowCount, Seed};
5465
5466        let seed = 1823859942947654717u64;
5467        let num_rows = 2741usize;
5468
5469        // Generate the exact same data that caused the failure
5470        let batch_gen = gen_batch().with_seed(Seed::from(seed));
5471        let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
5472        let list_generator = array::rand_list_any(base_generator, false);
5473
5474        let batch = batch_gen
5475            .anon_col(list_generator)
5476            .into_batch_rows(RowCount::from(num_rows as u64))
5477            .unwrap();
5478
5479        let list_array = batch.column(0).clone();
5480
5481        // Force miniblock encoding
5482        let mut metadata = HashMap::new();
5483        metadata.insert(
5484            STRUCTURAL_ENCODING_META_KEY.to_string(),
5485            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5486        );
5487
5488        let test_cases = TestCases::default()
5489            .with_min_file_version(LanceFileVersion::V2_1)
5490            .with_batch_size(100)
5491            .with_range(0..num_rows.min(500) as u64)
5492            .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
5493
5494        check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
5495    }
5496
5497    #[tokio::test]
5498    async fn test_large_dictionary_general_compression() {
5499        use arrow_array::{ArrayRef, StringArray};
5500        use std::collections::HashMap;
5501        use std::sync::Arc;
5502
5503        // Create large string dictionary data (>32KiB) with low cardinality
5504        // Use 100 unique strings, each 500 bytes long = 50KB dictionary
5505        let unique_values: Vec<String> = (0..100)
5506            .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
5507            .collect();
5508
5509        // Repeat these strings many times to create a large array
5510        let repeated_strings: Vec<_> = unique_values
5511            .iter()
5512            .cycle()
5513            .take(100_000)
5514            .map(|s| Some(s.as_str()))
5515            .collect();
5516
5517        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
5518
5519        // Configure test to use V2_2 and verify encoding
5520        let test_cases = TestCases::default()
5521            .with_min_file_version(LanceFileVersion::V2_2)
5522            .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
5523                assert_eq!(cols.len(), 1);
5524                let col = &cols[0];
5525
5526                // Navigate to the dictionary encoding in the page layout
5527                if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) {
5528                    // Check that dictionary is wrapped with general compression
5529                    if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout {
5530                        if let Some(dictionary_encoding) = &mini_block.dictionary {
5531                            match dictionary_encoding.compression.as_ref() {
5532                                Some(Compression::General(general)) => {
5533                                    // Verify it's using LZ4 or Zstd
5534                                    let compression = general.compression.as_ref().unwrap();
5535                                    assert!(
5536                                        compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4
5537                                        || compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd,
5538                                        "Expected LZ4 or Zstd compression for large dictionary"
5539                                    );
5540                                }
5541                                _ => panic!("Expected General compression for large dictionary"),
5542                            }
5543                        }
5544                    }
5545                }
5546            }));
5547
5548        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
5549    }
5550
5551    // Dictionary encoding decision tests
5552    /// Helper to create FixedWidth test data block with exact cardinality stat injected
5553    /// to ensure consistent test behavior (avoids HLL estimation error)
5554    fn create_test_fixed_data_block(num_values: u64, cardinality: u64) -> DataBlock {
5555        use crate::statistics::Stat;
5556
5557        let block_info = BlockInfo::default();
5558
5559        // Manually inject exact cardinality stat for consistent test behavior
5560        let cardinality_array = Arc::new(UInt64Array::from(vec![cardinality]));
5561        block_info
5562            .0
5563            .write()
5564            .unwrap()
5565            .insert(Stat::Cardinality, cardinality_array);
5566
5567        DataBlock::FixedWidth(FixedWidthDataBlock {
5568            bits_per_value: 32,
5569            data: crate::buffer::LanceBuffer::from(vec![0u8; (num_values * 4) as usize]),
5570            num_values,
5571            block_info,
5572        })
5573    }
5574
5575    /// Helper to create VariableWidth (string) test data block with exact cardinality
5576    fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
5577        use crate::statistics::Stat;
5578        use arrow_array::StringArray;
5579
5580        assert!(cardinality <= num_values && cardinality > 0);
5581
5582        let mut values = Vec::with_capacity(num_values as usize);
5583        for i in 0..num_values {
5584            values.push(format!("value_{:016}", i % cardinality));
5585        }
5586
5587        let array = StringArray::from(values);
5588        let block = DataBlock::from_array(Arc::new(array) as ArrayRef);
5589
5590        // Manually inject stats for consistent test behavior
5591        if let DataBlock::VariableWidth(ref var_block) = block {
5592            let mut info = var_block.block_info.0.write().unwrap();
5593            // Cardinality: exact value to avoid HLL estimation error
5594            info.insert(
5595                Stat::Cardinality,
5596                Arc::new(UInt64Array::from(vec![cardinality])),
5597            );
5598        }
5599
5600        block
5601    }
5602
5603    #[test]
5604    fn test_estimate_dict_size_fixed_width() {
5605        use crate::encodings::logical::primitive::dict::{
5606            DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
5607        };
5608
5609        let block = create_test_fixed_data_block(1000, 400);
5610        let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5611
5612        // Dictionary: 400 * 16 bytes (128-bit values)
5613        // Indices: 1000 * 4 bytes (32-bit i32)
5614        let expected_dict_size = 400 * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
5615        let expected_indices_size = 1000 * (DICT_INDICES_BITS_PER_VALUE / 8);
5616        let expected_total = expected_dict_size + expected_indices_size;
5617
5618        assert_eq!(estimated_size, expected_total);
5619    }
5620
5621    #[test]
5622    fn test_estimate_dict_size_variable_width() {
5623        let block = create_test_variable_width_block(1000, 400);
5624        let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5625
5626        // Get actual data size
5627        let data_size = block.data_size();
5628        let avg_value_size = data_size / 1000;
5629
5630        let expected = 400 * avg_value_size + 400 * 4 + 1000 * 4;
5631
5632        assert_eq!(estimated_size, expected);
5633    }
5634
5635    #[test]
5636    fn test_should_dictionary_encode() {
5637        use crate::constants::DICT_SIZE_RATIO_META_KEY;
5638        use lance_core::datatypes::Field as LanceField;
5639
5640        // Create data where dict encoding saves space
5641        let block = create_test_variable_width_block(1000, 10);
5642
5643        let mut metadata = HashMap::new();
5644        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5645        let arrow_field =
5646            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5647        let field = LanceField::try_from(&arrow_field).unwrap();
5648
5649        let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5650
5651        assert!(result, "Should use dictionary encode based on size");
5652    }
5653
5654    #[test]
5655    fn test_should_not_dictionary_encode() {
5656        use crate::constants::DICT_SIZE_RATIO_META_KEY;
5657        use lance_core::datatypes::Field as LanceField;
5658
5659        let block = create_test_fixed_data_block(1000, 10);
5660
5661        let mut metadata = HashMap::new();
5662        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5663        let arrow_field =
5664            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5665        let field = LanceField::try_from(&arrow_field).unwrap();
5666
5667        let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5668
5669        assert!(!result, "Should not use dictionary encode based on size");
5670    }
5671}