lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use crate::{
16    constants::{
17        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18    },
19    data::DictionaryDataBlock,
20    encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21    format::{
22        pb21::{self, compressive_encoding::Compression, CompressiveEncoding, PageLayout},
23        ProtobufUtils21,
24    },
25};
26use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
27use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
30use itertools::Itertools;
31use lance_arrow::deepcopy::deep_copy_nulls;
32use lance_core::{
33    cache::{CacheKey, Context, DeepSizeOf},
34    error::{Error, LanceOptionExt},
35    utils::bit::pad_bytes,
36};
37use log::trace;
38use snafu::location;
39
40use crate::{
41    compression::{
42        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
43    },
44    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
45    utils::bytepack::BytepackedIntegerEncoder,
46};
47use crate::{
48    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
49    encodings::logical::primitive::fullzip::PerValueDataBlock,
50};
51use crate::{
52    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
53};
54use crate::{
55    encodings::logical::primitive::miniblock::MiniBlockCompressed,
56    statistics::{ComputeStat, GetStat, Stat},
57};
58use crate::{
59    repdef::{
60        build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
61        ControlWordParser, DefinitionInterpretation, RepDefSlicer,
62    },
63    utils::accumulation::AccumulationQueue,
64};
65use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
66
67use crate::constants::DICT_SIZE_RATIO_META_KEY;
68use crate::encodings::logical::primitive::dict::{
69    DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
70};
71use crate::{
72    buffer::LanceBuffer,
73    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
74    decoder::{
75        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
76        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
77        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
78        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
79    },
80    encoder::{
81        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
82    },
83    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
84    EncodingsIo,
85};
86
87pub mod blob;
88pub mod dict;
89pub mod fullzip;
90pub mod miniblock;
91
92const FILL_BYTE: u8 = 0xFE;
93
94struct PageLoadTask {
95    decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
96    num_rows: u64,
97}
98
99/// A trait for figuring out how to schedule the data within
100/// a single page.
101trait StructuralPageScheduler: std::fmt::Debug + Send {
102    /// Fetches any metadata required for the page
103    fn initialize<'a>(
104        &'a mut self,
105        io: &Arc<dyn EncodingsIo>,
106    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
107    /// Loads metadata from a previous initialize call
108    fn load(&mut self, data: &Arc<dyn CachedPageData>);
109    /// Schedules the read of the given ranges in the page
110    ///
111    /// The read may be split into multiple "shards" if the page is extremely large.
112    /// Each shard maps to one or more rows and can be decoded independently.
113    ///
114    /// Note: this sharding is for splitting up very large pages into smaller reads to
115    /// avoid buffering too much data in memory.  It is not related to the batch size or
116    /// compute units in any way.
117    fn schedule_ranges(
118        &self,
119        ranges: &[Range<u64>],
120        io: &Arc<dyn EncodingsIo>,
121    ) -> Result<Vec<PageLoadTask>>;
122}
123
124/// Metadata describing the decoded size of a mini-block
125#[derive(Debug)]
126struct ChunkMeta {
127    num_values: u64,
128    chunk_size_bytes: u64,
129    offset_bytes: u64,
130}
131
132/// A mini-block chunk that has been decoded and decompressed
133#[derive(Debug, Clone)]
134struct DecodedMiniBlockChunk {
135    rep: Option<ScalarBuffer<u16>>,
136    def: Option<ScalarBuffer<u16>>,
137    values: DataBlock,
138}
139
140/// A task to decode a one or more mini-blocks of data into an output batch
141///
142/// Note: Two batches might share the same mini-block of data.  When this happens
143/// then each batch gets a copy of the block and each batch decodes the block independently.
144///
145/// This means we have duplicated work but it is necessary to avoid having to synchronize
146/// the decoding of the block. (TODO: test this theory)
147#[derive(Debug)]
148struct DecodeMiniBlockTask {
149    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
150    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
151    value_decompressor: Arc<dyn MiniBlockDecompressor>,
152    dictionary_data: Option<Arc<DataBlock>>,
153    def_meaning: Arc<[DefinitionInterpretation]>,
154    num_buffers: u64,
155    max_visible_level: u16,
156    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
157}
158
159impl DecodeMiniBlockTask {
160    fn decode_levels(
161        rep_decompressor: &dyn BlockDecompressor,
162        levels: LanceBuffer,
163        num_levels: u16,
164    ) -> Result<ScalarBuffer<u16>> {
165        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
166        let rep = rep.as_fixed_width().unwrap();
167        debug_assert_eq!(rep.num_values, num_levels as u64);
168        debug_assert_eq!(rep.bits_per_value, 16);
169        Ok(rep.data.borrow_to_typed_slice::<u16>())
170    }
171
172    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
173    // values from `level_buf` starting at `offset`.
174    //
175    // We need to handle both the case where `levels` is None (no nulls encountered
176    // yet) and the case where `level_buf` is None (the input we are copying from has
177    // no nulls)
178    fn extend_levels(
179        range: Range<u64>,
180        levels: &mut Option<LevelBuffer>,
181        level_buf: &Option<impl AsRef<[u16]>>,
182        dest_offset: usize,
183    ) {
184        if let Some(level_buf) = level_buf {
185            if levels.is_none() {
186                // This is the first non-empty def buf we've hit, fill in the past
187                // with 0 (valid)
188                let mut new_levels_vec =
189                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
190                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
191                *levels = Some(new_levels_vec);
192            }
193            levels.as_mut().unwrap().extend(
194                level_buf.as_ref()[range.start as usize..range.end as usize]
195                    .iter()
196                    .copied(),
197            );
198        } else if let Some(levels) = levels {
199            let num_values = (range.end - range.start) as usize;
200            // This is an all-valid level_buf but we had nulls earlier and so we
201            // need to materialize it
202            levels.extend(iter::repeat_n(0, num_values));
203        }
204    }
205
206    /// Maps a range of rows to a range of items and a range of levels
207    ///
208    /// If there is no repetition information this just returns the range as-is.
209    ///
210    /// If there is repetition information then we need to do some work to figure out what
211    /// range of items corresponds to the requested range of rows.
212    ///
213    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
214    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
215    ///
216    /// Idx: 0 1 2 3 4 5 6
217    /// Rep: 1 0 0 1 0 1 0
218    ///
219    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
220    ///
221    /// If there are invisible items then we don't count them when calculating the range of items we
222    /// are interested in but we do count them when calculating the range of levels we are interested
223    /// in.  As a result we have to return both the item range (first return value) and the level range
224    /// (second return value).
225    ///
226    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
227    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
228    ///
229    /// Idx: 0 1 2 3 4 5 6 7 8
230    /// Rep: 1 0 0 1 0 1 1 0 0
231    /// Def: 0 0 0 0 0 1 0 0 0
232    /// Itm: 1 2 3 4 5 6 7 8
233    ///
234    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
235    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
236    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
237    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
238    ///
239    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
240    ///
241    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
242    fn map_range(
243        range: Range<u64>,
244        rep: Option<&impl AsRef<[u16]>>,
245        def: Option<&impl AsRef<[u16]>>,
246        max_rep: u16,
247        max_visible_def: u16,
248        // The total number of items (not rows) in the chunk.  This is not quite the same as
249        // rep.len() / def.len() because it doesn't count invisible items
250        total_items: u64,
251        preamble_action: PreambleAction,
252    ) -> (Range<u64>, Range<u64>) {
253        if let Some(rep) = rep {
254            let mut rep = rep.as_ref();
255            // If there is a preamble and we need to skip it then do that first.  The work is the same
256            // whether there is def information or not
257            let mut items_in_preamble = 0_u64;
258            let first_row_start = match preamble_action {
259                PreambleAction::Skip | PreambleAction::Take => {
260                    let first_row_start = if let Some(def) = def.as_ref() {
261                        let mut first_row_start = None;
262                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
263                            if *rep == max_rep {
264                                first_row_start = Some(idx as u64);
265                                break;
266                            }
267                            if *def <= max_visible_def {
268                                items_in_preamble += 1;
269                            }
270                        }
271                        first_row_start
272                    } else {
273                        let first_row_start =
274                            rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
275                        items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
276                        first_row_start
277                    };
278                    // It is possible for a chunk to be entirely partial values but if it is then it
279                    // should never show up as a preamble to skip
280                    if first_row_start.is_none() {
281                        assert!(preamble_action == PreambleAction::Take);
282                        return (0..total_items, 0..rep.len() as u64);
283                    }
284                    let first_row_start = first_row_start.unwrap();
285                    rep = &rep[first_row_start as usize..];
286                    first_row_start
287                }
288                PreambleAction::Absent => {
289                    debug_assert!(rep[0] == max_rep);
290                    0
291                }
292            };
293
294            // We hit this case when all we needed was the preamble
295            if range.start == range.end {
296                debug_assert!(preamble_action == PreambleAction::Take);
297                debug_assert!(items_in_preamble <= total_items);
298                return (0..items_in_preamble, 0..first_row_start);
299            }
300            assert!(range.start < range.end);
301
302            let mut rows_seen = 0;
303            let mut new_start = 0;
304            let mut new_levels_start = 0;
305
306            if let Some(def) = def {
307                let def = &def.as_ref()[first_row_start as usize..];
308
309                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
310                let mut lead_invis_seen = 0;
311
312                if range.start > 0 {
313                    if def[0] > max_visible_def {
314                        lead_invis_seen += 1;
315                    }
316                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
317                        if *rep == max_rep {
318                            rows_seen += 1;
319                            if rows_seen == range.start {
320                                new_start = idx as u64 + 1 - lead_invis_seen;
321                                new_levels_start = idx as u64 + 1;
322                                break;
323                            }
324                        }
325                        if *def > max_visible_def {
326                            lead_invis_seen += 1;
327                        }
328                    }
329                }
330
331                rows_seen += 1;
332
333                let mut new_end = u64::MAX;
334                let mut new_levels_end = rep.len() as u64;
335                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
336                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
337                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
338                    .iter()
339                    .zip(&def[(new_levels_start + 1) as usize..])
340                    .enumerate()
341                {
342                    if *rep == max_rep {
343                        rows_seen += 1;
344                        if rows_seen == range.end + 1 {
345                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
346                            new_levels_end = idx as u64 + new_levels_start + 1;
347                            break;
348                        }
349                    }
350                    if *def > max_visible_def {
351                        tail_invis_seen += 1;
352                    }
353                }
354
355                if new_end == u64::MAX {
356                    new_levels_end = rep.len() as u64;
357                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
358                    new_end = rep.len() as u64 - total_invis_seen;
359                }
360
361                assert_ne!(new_end, u64::MAX);
362
363                // Adjust for any skipped preamble
364                if preamble_action == PreambleAction::Skip {
365                    new_start += items_in_preamble;
366                    new_end += items_in_preamble;
367                    new_levels_start += first_row_start;
368                    new_levels_end += first_row_start;
369                } else if preamble_action == PreambleAction::Take {
370                    debug_assert_eq!(new_start, 0);
371                    debug_assert_eq!(new_levels_start, 0);
372                    new_end += items_in_preamble;
373                    new_levels_end += first_row_start;
374                }
375
376                debug_assert!(new_end <= total_items);
377                (new_start..new_end, new_levels_start..new_levels_end)
378            } else {
379                // Easy case, there are no invisible items, so we don't need to check for them
380                // The items range and levels range will be the same.  We do still need to walk
381                // the rep levels to find the row boundaries
382
383                // range.start == 0 always maps to 0, otherwise we need to walk
384                if range.start > 0 {
385                    for (idx, rep) in rep.iter().skip(1).enumerate() {
386                        if *rep == max_rep {
387                            rows_seen += 1;
388                            if rows_seen == range.start {
389                                new_start = idx as u64 + 1;
390                                break;
391                            }
392                        }
393                    }
394                }
395                let mut new_end = rep.len() as u64;
396                // range.end == max_items always maps to rep.len(), otherwise we need to walk
397                if range.end < total_items {
398                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
399                        if *rep == max_rep {
400                            rows_seen += 1;
401                            if rows_seen == range.end {
402                                new_end = idx as u64 + new_start + 1;
403                                break;
404                            }
405                        }
406                    }
407                }
408
409                // Adjust for any skipped preamble
410                if preamble_action == PreambleAction::Skip {
411                    new_start += first_row_start;
412                    new_end += first_row_start;
413                } else if preamble_action == PreambleAction::Take {
414                    debug_assert_eq!(new_start, 0);
415                    new_end += first_row_start;
416                }
417
418                debug_assert!(new_end <= total_items);
419                (new_start..new_end, new_start..new_end)
420            }
421        } else {
422            // No repetition info, easy case, just use the range as-is and the item
423            // and level ranges are the same
424            (range.clone(), range)
425        }
426    }
427
428    // Unserialize a miniblock into a collection of vectors
429    fn decode_miniblock_chunk(
430        &self,
431        buf: &LanceBuffer,
432        items_in_chunk: u64,
433    ) -> Result<DecodedMiniBlockChunk> {
434        let mut offset = 0;
435        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
436        offset += 2;
437
438        let rep_size = if self.rep_decompressor.is_some() {
439            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
440            offset += 2;
441            Some(rep_size)
442        } else {
443            None
444        };
445        let def_size = if self.def_decompressor.is_some() {
446            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
447            offset += 2;
448            Some(def_size)
449        } else {
450            None
451        };
452        let buffer_sizes = (0..self.num_buffers)
453            .map(|_| {
454                let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
455                offset += 2;
456                size
457            })
458            .collect::<Vec<_>>();
459
460        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
461
462        let rep = rep_size.map(|rep_size| {
463            let rep = buf.slice_with_length(offset, rep_size as usize);
464            offset += rep_size as usize;
465            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
466            rep
467        });
468
469        let def = def_size.map(|def_size| {
470            let def = buf.slice_with_length(offset, def_size as usize);
471            offset += def_size as usize;
472            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
473            def
474        });
475
476        let buffers = buffer_sizes
477            .into_iter()
478            .map(|buf_size| {
479                let buf = buf.slice_with_length(offset, buf_size as usize);
480                offset += buf_size as usize;
481                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
482                buf
483            })
484            .collect::<Vec<_>>();
485
486        let values = self
487            .value_decompressor
488            .decompress(buffers, items_in_chunk)?;
489
490        let rep = rep
491            .map(|rep| {
492                Self::decode_levels(
493                    self.rep_decompressor.as_ref().unwrap().as_ref(),
494                    rep,
495                    num_levels,
496                )
497            })
498            .transpose()?;
499        let def = def
500            .map(|def| {
501                Self::decode_levels(
502                    self.def_decompressor.as_ref().unwrap().as_ref(),
503                    def,
504                    num_levels,
505                )
506            })
507            .transpose()?;
508
509        Ok(DecodedMiniBlockChunk { rep, def, values })
510    }
511}
512
513impl DecodePageTask for DecodeMiniBlockTask {
514    fn decode(self: Box<Self>) -> Result<DecodedPage> {
515        // First, we create output buffers for the rep and def and data
516        let mut repbuf: Option<LevelBuffer> = None;
517        let mut defbuf: Option<LevelBuffer> = None;
518
519        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
520
521        // This is probably an over-estimate but it's quick and easy to calculate
522        let estimated_size_bytes = self
523            .instructions
524            .iter()
525            .map(|(_, chunk)| chunk.data.len())
526            .sum::<usize>()
527            * 2;
528        let mut data_builder =
529            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
530
531        // We need to keep track of the offset into repbuf/defbuf that we are building up
532        let mut level_offset = 0;
533
534        // Pre-compute caching needs for each chunk by checking if the next chunk is the same
535        let needs_caching: Vec<bool> = self
536            .instructions
537            .windows(2)
538            .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
539            .chain(std::iter::once(false)) // the last one never needs caching
540            .collect();
541
542        // Cache for storing decoded chunks when beneficial
543        let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
544
545        // Now we iterate through each instruction and process it
546        for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
547            let should_cache_this_chunk = needs_caching[idx];
548
549            let decoded_chunk = match &chunk_cache {
550                Some((cached_chunk_idx, ref cached_chunk))
551                    if *cached_chunk_idx == chunk.chunk_idx =>
552                {
553                    // Clone only when we have a cache hit (much cheaper than decoding)
554                    cached_chunk.clone()
555                }
556                _ => {
557                    // Cache miss, need to decode
558                    let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
559
560                    // Only update cache if this chunk will benefit the next access
561                    if should_cache_this_chunk {
562                        chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
563                    }
564                    decoded
565                }
566            };
567
568            let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
569
570            // Our instructions tell us which rows we want to take from this chunk
571            let row_range_start =
572                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
573            let row_range_end = row_range_start + instructions.rows_to_take;
574
575            // We use the rep info to map the row range to an item range / levels range
576            let (item_range, level_range) = Self::map_range(
577                row_range_start..row_range_end,
578                rep.as_ref(),
579                def.as_ref(),
580                max_rep,
581                self.max_visible_level,
582                chunk.items_in_chunk,
583                instructions.preamble_action,
584            );
585            if item_range.end - item_range.start > chunk.items_in_chunk {
586                return Err(lance_core::Error::Internal {
587                    message: format!(
588                        "Item range {:?} is greater than chunk items in chunk {:?}",
589                        item_range, chunk.items_in_chunk
590                    ),
591                    location: location!(),
592                });
593            }
594
595            // Now we append the data to the output buffers
596            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
597            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
598            level_offset += (level_range.end - level_range.start) as usize;
599            data_builder.append(&values, item_range);
600        }
601
602        let mut data = data_builder.finish();
603
604        let unraveler =
605            RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
606
607        if let Some(dictionary) = &self.dictionary_data {
608            // Don't decode here, that happens later (if needed)
609            let DataBlock::FixedWidth(indices) = data else {
610                return Err(lance_core::Error::Internal {
611                    message: format!(
612                        "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
613                        data
614                    ),
615                    location: location!(),
616                });
617            };
618            data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
619                indices,
620                dictionary.as_ref().clone(),
621            ));
622        }
623
624        Ok(DecodedPage {
625            data,
626            repdef: unraveler,
627        })
628    }
629}
630
631/// A chunk that has been loaded by the miniblock scheduler (but not
632/// yet decoded)
633#[derive(Debug)]
634struct LoadedChunk {
635    data: LanceBuffer,
636    items_in_chunk: u64,
637    byte_range: Range<u64>,
638    chunk_idx: usize,
639}
640
641impl Clone for LoadedChunk {
642    fn clone(&self) -> Self {
643        Self {
644            // Safe as we always create borrowed buffers here
645            data: self.data.clone(),
646            items_in_chunk: self.items_in_chunk,
647            byte_range: self.byte_range.clone(),
648            chunk_idx: self.chunk_idx,
649        }
650    }
651}
652
653/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
654/// details on the different layouts.
655#[derive(Debug)]
656struct MiniBlockDecoder {
657    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
658    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
659    value_decompressor: Arc<dyn MiniBlockDecompressor>,
660    def_meaning: Arc<[DefinitionInterpretation]>,
661    loaded_chunks: VecDeque<LoadedChunk>,
662    instructions: VecDeque<ChunkInstructions>,
663    offset_in_current_chunk: u64,
664    num_rows: u64,
665    num_buffers: u64,
666    dictionary: Option<Arc<DataBlock>>,
667}
668
669/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
670/// process for miniblock encoded data.
671impl StructuralPageDecoder for MiniBlockDecoder {
672    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
673        let mut items_desired = num_rows;
674        let mut need_preamble = false;
675        let mut skip_in_chunk = self.offset_in_current_chunk;
676        let mut drain_instructions = Vec::new();
677        while items_desired > 0 || need_preamble {
678            let (instructions, consumed) = self
679                .instructions
680                .front()
681                .unwrap()
682                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
683
684            while self.loaded_chunks.front().unwrap().chunk_idx
685                != instructions.chunk_instructions.chunk_idx
686            {
687                self.loaded_chunks.pop_front();
688            }
689            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
690            if consumed {
691                self.instructions.pop_front();
692            }
693        }
694        // We can throw away need_preamble here because it must be false.  If it were true it would mean
695        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
696        self.offset_in_current_chunk = skip_in_chunk;
697
698        let max_visible_level = self
699            .def_meaning
700            .iter()
701            .take_while(|l| !l.is_list())
702            .map(|l| l.num_def_levels())
703            .sum::<u16>();
704
705        Ok(Box::new(DecodeMiniBlockTask {
706            instructions: drain_instructions,
707            def_decompressor: self.def_decompressor.clone(),
708            rep_decompressor: self.rep_decompressor.clone(),
709            value_decompressor: self.value_decompressor.clone(),
710            dictionary_data: self.dictionary.clone(),
711            def_meaning: self.def_meaning.clone(),
712            num_buffers: self.num_buffers,
713            max_visible_level,
714        }))
715    }
716
717    fn num_rows(&self) -> u64 {
718        self.num_rows
719    }
720}
721
722#[derive(Debug)]
723struct CachedComplexAllNullState {
724    rep: Option<ScalarBuffer<u16>>,
725    def: Option<ScalarBuffer<u16>>,
726}
727
728impl DeepSizeOf for CachedComplexAllNullState {
729    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
730        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
731            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
732    }
733}
734
735impl CachedPageData for CachedComplexAllNullState {
736    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
737        self
738    }
739}
740
741/// A scheduler for all-null data that has repetition and definition levels
742///
743/// We still need to do some I/O in this case because we need to figure out what kind of null we
744/// are dealing with (null list, null struct, what level null struct, etc.)
745///
746/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
747/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
748/// and fast so...maybe not :)
749#[derive(Debug)]
750pub struct ComplexAllNullScheduler {
751    // Set from protobuf
752    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
753    def_meaning: Arc<[DefinitionInterpretation]>,
754    repdef: Option<Arc<CachedComplexAllNullState>>,
755    max_visible_level: u16,
756}
757
758impl ComplexAllNullScheduler {
759    pub fn new(
760        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
761        def_meaning: Arc<[DefinitionInterpretation]>,
762    ) -> Self {
763        let max_visible_level = def_meaning
764            .iter()
765            .take_while(|l| !l.is_list())
766            .map(|l| l.num_def_levels())
767            .sum::<u16>();
768        Self {
769            buffer_offsets_and_sizes,
770            def_meaning,
771            repdef: None,
772            max_visible_level,
773        }
774    }
775}
776
777impl StructuralPageScheduler for ComplexAllNullScheduler {
778    fn initialize<'a>(
779        &'a mut self,
780        io: &Arc<dyn EncodingsIo>,
781    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
782        // Fully load the rep & def buffers, as needed
783        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
784        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
785        let has_rep = rep_size > 0;
786        let has_def = def_size > 0;
787
788        let mut reads = Vec::with_capacity(2);
789        if has_rep {
790            reads.push(rep_pos..rep_pos + rep_size);
791        }
792        if has_def {
793            reads.push(def_pos..def_pos + def_size);
794        }
795
796        let data = io.submit_request(reads, 0);
797
798        async move {
799            let data = data.await?;
800            let mut data_iter = data.into_iter();
801
802            let rep = if has_rep {
803                let rep = data_iter.next().unwrap();
804                let rep = LanceBuffer::from_bytes(rep, 2);
805                let rep = rep.borrow_to_typed_slice::<u16>();
806                Some(rep)
807            } else {
808                None
809            };
810
811            let def = if has_def {
812                let def = data_iter.next().unwrap();
813                let def = LanceBuffer::from_bytes(def, 2);
814                let def = def.borrow_to_typed_slice::<u16>();
815                Some(def)
816            } else {
817                None
818            };
819
820            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
821
822            self.repdef = Some(repdef.clone());
823
824            Ok(repdef as Arc<dyn CachedPageData>)
825        }
826        .boxed()
827    }
828
829    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
830        self.repdef = Some(
831            data.clone()
832                .as_arc_any()
833                .downcast::<CachedComplexAllNullState>()
834                .unwrap(),
835        );
836    }
837
838    fn schedule_ranges(
839        &self,
840        ranges: &[Range<u64>],
841        _io: &Arc<dyn EncodingsIo>,
842    ) -> Result<Vec<PageLoadTask>> {
843        let ranges = VecDeque::from_iter(ranges.iter().cloned());
844        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
845        let decoder = Box::new(ComplexAllNullPageDecoder {
846            ranges,
847            rep: self.repdef.as_ref().unwrap().rep.clone(),
848            def: self.repdef.as_ref().unwrap().def.clone(),
849            num_rows,
850            def_meaning: self.def_meaning.clone(),
851            max_visible_level: self.max_visible_level,
852        }) as Box<dyn StructuralPageDecoder>;
853        let page_load_task = PageLoadTask {
854            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
855            num_rows,
856        };
857        Ok(vec![page_load_task])
858    }
859}
860
861#[derive(Debug)]
862pub struct ComplexAllNullPageDecoder {
863    ranges: VecDeque<Range<u64>>,
864    rep: Option<ScalarBuffer<u16>>,
865    def: Option<ScalarBuffer<u16>>,
866    num_rows: u64,
867    def_meaning: Arc<[DefinitionInterpretation]>,
868    max_visible_level: u16,
869}
870
871impl ComplexAllNullPageDecoder {
872    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
873        let mut rows_desired = num_rows;
874        let mut ranges = Vec::with_capacity(self.ranges.len());
875        while rows_desired > 0 {
876            let front = self.ranges.front_mut().unwrap();
877            let avail = front.end - front.start;
878            if avail > rows_desired {
879                ranges.push(front.start..front.start + rows_desired);
880                front.start += rows_desired;
881                rows_desired = 0;
882            } else {
883                ranges.push(self.ranges.pop_front().unwrap());
884                rows_desired -= avail;
885            }
886        }
887        ranges
888    }
889}
890
891impl StructuralPageDecoder for ComplexAllNullPageDecoder {
892    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
893        let drained_ranges = self.drain_ranges(num_rows);
894        Ok(Box::new(DecodeComplexAllNullTask {
895            ranges: drained_ranges,
896            rep: self.rep.clone(),
897            def: self.def.clone(),
898            def_meaning: self.def_meaning.clone(),
899            max_visible_level: self.max_visible_level,
900        }))
901    }
902
903    fn num_rows(&self) -> u64 {
904        self.num_rows
905    }
906}
907
908/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
909/// for the null data.
910#[derive(Debug)]
911pub struct DecodeComplexAllNullTask {
912    ranges: Vec<Range<u64>>,
913    rep: Option<ScalarBuffer<u16>>,
914    def: Option<ScalarBuffer<u16>>,
915    def_meaning: Arc<[DefinitionInterpretation]>,
916    max_visible_level: u16,
917}
918
919impl DecodeComplexAllNullTask {
920    fn decode_level(
921        &self,
922        levels: &Option<ScalarBuffer<u16>>,
923        num_values: u64,
924    ) -> Option<Vec<u16>> {
925        levels.as_ref().map(|levels| {
926            let mut referenced_levels = Vec::with_capacity(num_values as usize);
927            for range in &self.ranges {
928                referenced_levels.extend(
929                    levels[range.start as usize..range.end as usize]
930                        .iter()
931                        .copied(),
932                );
933            }
934            referenced_levels
935        })
936    }
937}
938
939impl DecodePageTask for DecodeComplexAllNullTask {
940    fn decode(self: Box<Self>) -> Result<DecodedPage> {
941        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
942        let rep = self.decode_level(&self.rep, num_values);
943        let def = self.decode_level(&self.def, num_values);
944
945        // If there are definition levels there may be empty / null lists which are not visible
946        // in the items array.  We need to account for that here to figure out how many values
947        // should be in the items array.
948        let num_values = if let Some(def) = &def {
949            def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
950        } else {
951            num_values
952        };
953
954        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
955        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
956        Ok(DecodedPage {
957            data,
958            repdef: unraveler,
959        })
960    }
961}
962
963/// A scheduler for simple all-null data
964///
965/// "simple" all-null data is data that is all null and only has a single level of definition and
966/// no repetition.  We don't need to read any data at all in this case.
967#[derive(Debug, Default)]
968pub struct SimpleAllNullScheduler {}
969
970impl StructuralPageScheduler for SimpleAllNullScheduler {
971    fn initialize<'a>(
972        &'a mut self,
973        _io: &Arc<dyn EncodingsIo>,
974    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
975        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
976    }
977
978    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
979
980    fn schedule_ranges(
981        &self,
982        ranges: &[Range<u64>],
983        _io: &Arc<dyn EncodingsIo>,
984    ) -> Result<Vec<PageLoadTask>> {
985        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
986        let decoder =
987            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
988        let page_load_task = PageLoadTask {
989            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
990            num_rows,
991        };
992        Ok(vec![page_load_task])
993    }
994}
995
996/// A page decode task for all-null data without any
997/// repetition and only a single level of definition
998#[derive(Debug)]
999struct SimpleAllNullDecodePageTask {
1000    num_values: u64,
1001}
1002impl DecodePageTask for SimpleAllNullDecodePageTask {
1003    fn decode(self: Box<Self>) -> Result<DecodedPage> {
1004        let unraveler = RepDefUnraveler::new(
1005            None,
1006            Some(vec![1; self.num_values as usize]),
1007            Arc::new([DefinitionInterpretation::NullableItem]),
1008            self.num_values,
1009        );
1010        Ok(DecodedPage {
1011            data: DataBlock::AllNull(AllNullDataBlock {
1012                num_values: self.num_values,
1013            }),
1014            repdef: unraveler,
1015        })
1016    }
1017}
1018
1019#[derive(Debug)]
1020pub struct SimpleAllNullPageDecoder {
1021    num_rows: u64,
1022}
1023
1024impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1025    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1026        Ok(Box::new(SimpleAllNullDecodePageTask {
1027            num_values: num_rows,
1028        }))
1029    }
1030
1031    fn num_rows(&self) -> u64 {
1032        self.num_rows
1033    }
1034}
1035
1036#[derive(Debug, Clone)]
1037struct MiniBlockSchedulerDictionary {
1038    // These come from the protobuf
1039    dictionary_decompressor: Arc<dyn BlockDecompressor>,
1040    dictionary_buf_position_and_size: (u64, u64),
1041    dictionary_data_alignment: u64,
1042    num_dictionary_items: u64,
1043}
1044
1045/// Individual block metadata within a MiniBlock repetition index.
1046#[derive(Debug)]
1047struct MiniBlockRepIndexBlock {
1048    // The index of the first row that starts after the beginning of this block.  If the block
1049    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
1050    // then this will be a row that starts in some future block.
1051    first_row: u64,
1052    // The number of rows in the block, including the trailer but not the preamble.
1053    // Can be 0 if the block is entirely preamble
1054    starts_including_trailer: u64,
1055    // Whether the block has a preamble
1056    has_preamble: bool,
1057    // Whether the block has a trailer
1058    has_trailer: bool,
1059}
1060
1061impl DeepSizeOf for MiniBlockRepIndexBlock {
1062    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1063        0
1064    }
1065}
1066
1067/// Repetition index for MiniBlock encoding.
1068///
1069/// Stores block-level offset information to enable efficient random
1070/// access to nested data structures within mini-blocks.
1071#[derive(Debug)]
1072struct MiniBlockRepIndex {
1073    blocks: Vec<MiniBlockRepIndexBlock>,
1074}
1075
1076impl DeepSizeOf for MiniBlockRepIndex {
1077    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1078        self.blocks.deep_size_of_children(context)
1079    }
1080}
1081
1082impl MiniBlockRepIndex {
1083    /// Decode repetition index from chunk metadata using default values.
1084    ///
1085    /// This creates a repetition index where each chunk has no partial values
1086    /// and no trailers, suitable for simple sequential data layouts.
1087    pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1088        let mut blocks = Vec::with_capacity(chunks.len());
1089        let mut offset: u64 = 0;
1090
1091        for c in chunks {
1092            blocks.push(MiniBlockRepIndexBlock {
1093                first_row: offset,
1094                starts_including_trailer: c.num_values,
1095                has_preamble: false,
1096                has_trailer: false,
1097            });
1098
1099            offset += c.num_values;
1100        }
1101
1102        Self { blocks }
1103    }
1104
1105    /// Decode repetition index from raw bytes in little-endian format.
1106    ///
1107    /// The bytes should contain u64 values arranged in groups of `stride` elements,
1108    /// where the first two values of each group represent ends_count and partial_count.
1109    /// Returns an empty index if no bytes are provided.
1110    pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1111        // Convert bytes to u64 slice, handling alignment automatically
1112        let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1113        let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1114        let n = u64_slice.len() / stride;
1115
1116        let mut blocks = Vec::with_capacity(n);
1117        let mut chunk_has_preamble = false;
1118        let mut offset: u64 = 0;
1119
1120        // Extract first two values from each block: ends_count and partial_count
1121        for i in 0..n {
1122            let base_idx = i * stride;
1123            let ends = u64_slice[base_idx];
1124            let partial = u64_slice[base_idx + 1];
1125
1126            let has_trailer = partial > 0;
1127            // Convert branches to arithmetic for better compiler optimization
1128            let starts_including_trailer =
1129                ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1130
1131            blocks.push(MiniBlockRepIndexBlock {
1132                first_row: offset,
1133                starts_including_trailer,
1134                has_preamble: chunk_has_preamble,
1135                has_trailer,
1136            });
1137
1138            chunk_has_preamble = has_trailer;
1139            offset += starts_including_trailer;
1140        }
1141
1142        Self { blocks }
1143    }
1144}
1145
1146/// State that is loaded once and cached for future lookups
1147#[derive(Debug)]
1148struct MiniBlockCacheableState {
1149    /// Metadata that describes each chunk in the page
1150    chunk_meta: Vec<ChunkMeta>,
1151    /// The decoded repetition index
1152    rep_index: MiniBlockRepIndex,
1153    /// The dictionary for the page, if any
1154    dictionary: Option<Arc<DataBlock>>,
1155}
1156
1157impl DeepSizeOf for MiniBlockCacheableState {
1158    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1159        self.rep_index.deep_size_of_children(context)
1160            + self
1161                .dictionary
1162                .as_ref()
1163                .map(|dict| dict.data_size() as usize)
1164                .unwrap_or(0)
1165    }
1166}
1167
1168impl CachedPageData for MiniBlockCacheableState {
1169    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1170        self
1171    }
1172}
1173
1174/// A scheduler for a page that has been encoded with the mini-block layout
1175///
1176/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1177/// in practice.
1178///
1179/// First, during initialization, we load the chunk metadata, the repetition index,
1180/// and the dictionary (these last two may not be present)
1181///
1182/// Then, during scheduling, we use the user's requested row ranges and the repetition
1183/// index to determine which chunks we need and which rows we need from those chunks.
1184///
1185/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1186/// from the user is 40..60 then we need to:
1187///
1188///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1189///    then read 3 items for the 11th row of our range.
1190///  - Read the second chunk and read the remaining items in our 11th row and then read
1191///    the remaining 9 full rows.
1192///
1193/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1194/// The first two decode tasks will just need the first chunk.  The third decode task will
1195/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1196/// chunk.  The final decode task will just need the second chunk.
1197///
1198/// The above prose descriptions are what are represented by [`ChunkInstructions`] and
1199/// [`ChunkDrainInstructions`].
1200#[derive(Debug)]
1201pub struct MiniBlockScheduler {
1202    // These come from the protobuf
1203    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1204    priority: u64,
1205    items_in_page: u64,
1206    repetition_index_depth: u16,
1207    num_buffers: u64,
1208    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1209    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1210    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1211    def_meaning: Arc<[DefinitionInterpretation]>,
1212    dictionary: Option<MiniBlockSchedulerDictionary>,
1213    // This is set after initialization
1214    page_meta: Option<Arc<MiniBlockCacheableState>>,
1215}
1216
1217impl MiniBlockScheduler {
1218    fn try_new(
1219        buffer_offsets_and_sizes: &[(u64, u64)],
1220        priority: u64,
1221        items_in_page: u64,
1222        layout: &pb21::MiniBlockLayout,
1223        decompressors: &dyn DecompressionStrategy,
1224    ) -> Result<Self> {
1225        let rep_decompressor = layout
1226            .rep_compression
1227            .as_ref()
1228            .map(|rep_compression| {
1229                decompressors
1230                    .create_block_decompressor(rep_compression)
1231                    .map(Arc::from)
1232            })
1233            .transpose()?;
1234        let def_decompressor = layout
1235            .def_compression
1236            .as_ref()
1237            .map(|def_compression| {
1238                decompressors
1239                    .create_block_decompressor(def_compression)
1240                    .map(Arc::from)
1241            })
1242            .transpose()?;
1243        let def_meaning = layout
1244            .layers
1245            .iter()
1246            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1247            .collect::<Vec<_>>();
1248        let value_decompressor = decompressors.create_miniblock_decompressor(
1249            layout.value_compression.as_ref().unwrap(),
1250            decompressors,
1251        )?;
1252
1253        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1254            let num_dictionary_items = layout.num_dictionary_items;
1255            match dictionary_encoding.compression.as_ref().unwrap() {
1256                Compression::Variable(_) => Some(MiniBlockSchedulerDictionary {
1257                    dictionary_decompressor: decompressors
1258                        .create_block_decompressor(dictionary_encoding)?
1259                        .into(),
1260                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1261                    dictionary_data_alignment: 4,
1262                    num_dictionary_items,
1263                }),
1264                Compression::Flat(_) => Some(MiniBlockSchedulerDictionary {
1265                    dictionary_decompressor: decompressors
1266                        .create_block_decompressor(dictionary_encoding)?
1267                        .into(),
1268                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1269                    dictionary_data_alignment: 16,
1270                    num_dictionary_items,
1271                }),
1272                Compression::General(_) => Some(MiniBlockSchedulerDictionary {
1273                    dictionary_decompressor: decompressors
1274                        .create_block_decompressor(dictionary_encoding)?
1275                        .into(),
1276                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1277                    dictionary_data_alignment: 1,
1278                    num_dictionary_items,
1279                }),
1280                _ => unreachable!(
1281                    "Mini-block dictionary encoding must use Variable, Flat, or General compression"
1282                ),
1283            }
1284        } else {
1285            None
1286        };
1287
1288        Ok(Self {
1289            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1290            rep_decompressor,
1291            def_decompressor,
1292            value_decompressor: value_decompressor.into(),
1293            repetition_index_depth: layout.repetition_index_depth as u16,
1294            num_buffers: layout.num_buffers,
1295            priority,
1296            items_in_page,
1297            dictionary,
1298            def_meaning: def_meaning.into(),
1299            page_meta: None,
1300        })
1301    }
1302
1303    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1304        let page_meta = self.page_meta.as_ref().unwrap();
1305        chunk_indices
1306            .iter()
1307            .map(|&chunk_idx| {
1308                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1309                let bytes_start = chunk_meta.offset_bytes;
1310                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1311                LoadedChunk {
1312                    byte_range: bytes_start..bytes_end,
1313                    items_in_chunk: chunk_meta.num_values,
1314                    chunk_idx,
1315                    data: LanceBuffer::empty(),
1316                }
1317            })
1318            .collect()
1319    }
1320}
1321
1322#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1323enum PreambleAction {
1324    Take,
1325    Skip,
1326    Absent,
1327}
1328
1329// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1330// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1331// us how exactly to read from the chunk.
1332//
1333// Examples:
1334//
1335// | Chunk 0     | Chunk 1   | Chunk 2   | Chunk 3 |
1336// | xxxxyyyyzzz | zzzzzzzzz | zzzzzzzzz | aaabbcc |
1337//
1338// Full read (0..6)
1339//
1340// Chunk 0: (several rows, ends with trailer)
1341//   preamble: absent
1342//   rows_to_skip: 0
1343//   rows_to_take: 3 (x, y, z)
1344//   take_trailer: true
1345//
1346// Chunk 1: (all preamble, ends with trailer)
1347//   preamble: take
1348//   rows_to_skip: 0
1349//   rows_to_take: 0
1350//   take_trailer: true
1351//
1352// Chunk 2: (all preamble, no trailer)
1353//   preamble: take
1354//   rows_to_skip: 0
1355//   rows_to_take: 0
1356//   take_trailer: false
1357//
1358// Chunk 3: (several rows, no trailer or preamble)
1359//   preamble: absent
1360//   rows_to_skip: 0
1361//   rows_to_take: 3 (a, b, c)
1362//   take_trailer: false
1363#[derive(Clone, Debug, PartialEq, Eq)]
1364struct ChunkInstructions {
1365    // The index of the chunk to read
1366    chunk_idx: usize,
1367    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1368    // is no repetition index there is never a preamble.
1369    //
1370    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1371    // that spans several chunks.
1372    preamble: PreambleAction,
1373    // How many complete rows (not including the preamble or trailer) to skip
1374    //
1375    // If this is non-zero then premable must not be Take
1376    rows_to_skip: u64,
1377    // How many rows to take.  If a row splits across chunks then we will count the row in the first
1378    // chunk that contains the row.
1379    rows_to_take: u64,
1380    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1381    // never a trailer.
1382    //
1383    // A chunk that is all preamble may or may not have a trailer.
1384    //
1385    // If this is true then we want to include the trailer
1386    take_trailer: bool,
1387}
1388
1389// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1390// start decoding them, based on a batch size, which might not align with what we scheduled.
1391//
1392// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1393//
1394// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1395// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1396// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1397// then take 10 rows.
1398//
1399// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1400//  -no preamble, skip 5, take 10, take trailer
1401//  -take preamble, skip 0, take 50, no trailer
1402//
1403// and we are draining 20 rows then the drain instructions for the first batch will be:
1404//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1405//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1406#[derive(Debug, PartialEq, Eq)]
1407struct ChunkDrainInstructions {
1408    chunk_instructions: ChunkInstructions,
1409    rows_to_skip: u64,
1410    rows_to_take: u64,
1411    preamble_action: PreambleAction,
1412}
1413
1414impl ChunkInstructions {
1415    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1416    //
1417    // We assume that `user_ranges` are in sorted order and non-overlapping
1418    //
1419    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1420    fn schedule_instructions(
1421        rep_index: &MiniBlockRepIndex,
1422        user_ranges: &[Range<u64>],
1423    ) -> Vec<Self> {
1424        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1425        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1426        // per row which can happen with lists.
1427        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1428
1429        for user_range in user_ranges {
1430            let mut rows_needed = user_range.end - user_range.start;
1431            let mut need_preamble = false;
1432
1433            // Need to find the first chunk with a first row >= user_range.start.  If there are
1434            // multiple chunks with the same first row we need to take the first one.
1435            let mut block_index = match rep_index
1436                .blocks
1437                .binary_search_by_key(&user_range.start, |block| block.first_row)
1438            {
1439                Ok(idx) => {
1440                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1441                    // are grabbing first eligible chunk
1442                    let mut idx = idx;
1443                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1444                        idx -= 1;
1445                    }
1446                    idx
1447                }
1448                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1449                Err(idx) => idx - 1,
1450            };
1451
1452            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1453
1454            while rows_needed > 0 || need_preamble {
1455                // Check if we've gone past the last block (should not happen)
1456                if block_index >= rep_index.blocks.len() {
1457                    log::warn!("schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early");
1458                    break;
1459                }
1460
1461                let chunk = &rep_index.blocks[block_index];
1462                let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1463
1464                // Handle blocks that are entirely preamble (rows_avail = 0)
1465                // These blocks have no rows to take but may have a preamble we need
1466                // We only look for preamble if to_skip == 0 (we're not skipping rows)
1467                if rows_avail == 0 && to_skip == 0 {
1468                    // Only process if this chunk has a preamble we need
1469                    if chunk.has_preamble && need_preamble {
1470                        chunk_instructions.push(Self {
1471                            chunk_idx: block_index,
1472                            preamble: PreambleAction::Take,
1473                            rows_to_skip: 0,
1474                            rows_to_take: 0,
1475                            // We still need to look at has_trailer to distinguish between "all preamble
1476                            // and row ends at end of chunk" and "all preamble and row bleeds into next
1477                            // chunk".  Both cases will have 0 rows available.
1478                            take_trailer: chunk.has_trailer,
1479                        });
1480                        // Only set need_preamble = false if the chunk has at least one row,
1481                        // Or we are reaching the last block,
1482                        // Otherwise, the chunk is entirely preamble and we need the next chunk's preamble too
1483                        if chunk.starts_including_trailer > 0
1484                            || block_index == rep_index.blocks.len() - 1
1485                        {
1486                            need_preamble = false;
1487                        }
1488                    }
1489                    // Move to next block
1490                    block_index += 1;
1491                    continue;
1492                }
1493
1494                // Edge case: if rows_avail == 0 but to_skip > 0
1495                // This theoretically shouldn't happen (binary search should avoid it)
1496                // but handle it for safety
1497                if rows_avail == 0 && to_skip > 0 {
1498                    // This block doesn't have enough rows to skip, move to next block
1499                    // Adjust to_skip by the number of rows in this block
1500                    to_skip -= chunk.starts_including_trailer;
1501                    block_index += 1;
1502                    continue;
1503                }
1504
1505                let rows_to_take = rows_avail.min(rows_needed);
1506                rows_needed -= rows_to_take;
1507
1508                let mut take_trailer = false;
1509                let preamble = if chunk.has_preamble {
1510                    if need_preamble {
1511                        PreambleAction::Take
1512                    } else {
1513                        PreambleAction::Skip
1514                    }
1515                } else {
1516                    PreambleAction::Absent
1517                };
1518
1519                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1520                if rows_to_take == rows_avail && chunk.has_trailer {
1521                    take_trailer = true;
1522                    need_preamble = true;
1523                } else {
1524                    need_preamble = false;
1525                };
1526
1527                chunk_instructions.push(Self {
1528                    preamble,
1529                    chunk_idx: block_index,
1530                    rows_to_skip: to_skip,
1531                    rows_to_take,
1532                    take_trailer,
1533                });
1534
1535                to_skip = 0;
1536                block_index += 1;
1537            }
1538        }
1539
1540        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1541        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1542        // rows of chunk 0 starting at 0")
1543        if user_ranges.len() > 1 {
1544            // TODO: Could probably optimize this allocation away
1545            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1546            let mut instructions_iter = chunk_instructions.into_iter();
1547            merged_instructions.push(instructions_iter.next().unwrap());
1548            for instruction in instructions_iter {
1549                let last = merged_instructions.last_mut().unwrap();
1550                if last.chunk_idx == instruction.chunk_idx
1551                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1552                {
1553                    last.rows_to_take += instruction.rows_to_take;
1554                    last.take_trailer |= instruction.take_trailer;
1555                } else {
1556                    merged_instructions.push(instruction);
1557                }
1558            }
1559            merged_instructions
1560        } else {
1561            chunk_instructions
1562        }
1563    }
1564
1565    fn drain_from_instruction(
1566        &self,
1567        rows_desired: &mut u64,
1568        need_preamble: &mut bool,
1569        skip_in_chunk: &mut u64,
1570    ) -> (ChunkDrainInstructions, bool) {
1571        // If we need the premable then we shouldn't be skipping anything
1572        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1573        let rows_avail = self.rows_to_take - *skip_in_chunk;
1574        let has_preamble = self.preamble != PreambleAction::Absent;
1575        let preamble_action = match (*need_preamble, has_preamble) {
1576            (true, true) => PreambleAction::Take,
1577            (true, false) => panic!("Need preamble but there isn't one"),
1578            (false, true) => PreambleAction::Skip,
1579            (false, false) => PreambleAction::Absent,
1580        };
1581
1582        // How many rows are we actually taking in this take step (including the preamble
1583        // and trailer both as individual rows)
1584        let rows_taking = if *rows_desired >= rows_avail {
1585            // We want all the rows.  If there is a trailer we are grabbing it and will need
1586            // the preamble of the next chunk
1587            // If there is a trailer and we are taking all the rows then we need the preamble
1588            // of the next chunk.
1589            //
1590            // Also, if this chunk is entirely preamble (rows_avail == 0 && !take_trailer) then we
1591            // need the preamble of the next chunk.
1592            *need_preamble = self.take_trailer;
1593            rows_avail
1594        } else {
1595            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1596            // it so we will not need the preamble
1597            *need_preamble = false;
1598            *rows_desired
1599        };
1600        let rows_skipped = *skip_in_chunk;
1601
1602        // Update the state for the next iteration
1603        let consumed_chunk = if *rows_desired >= rows_avail {
1604            *rows_desired -= rows_avail;
1605            *skip_in_chunk = 0;
1606            true
1607        } else {
1608            *skip_in_chunk += *rows_desired;
1609            *rows_desired = 0;
1610            false
1611        };
1612
1613        (
1614            ChunkDrainInstructions {
1615                chunk_instructions: self.clone(),
1616                rows_to_skip: rows_skipped,
1617                rows_to_take: rows_taking,
1618                preamble_action,
1619            },
1620            consumed_chunk,
1621        )
1622    }
1623}
1624
1625impl StructuralPageScheduler for MiniBlockScheduler {
1626    fn initialize<'a>(
1627        &'a mut self,
1628        io: &Arc<dyn EncodingsIo>,
1629    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1630        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1631        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1632        // need.
1633        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1634        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1635        let mut bufs_needed = 1;
1636        if self.dictionary.is_some() {
1637            bufs_needed += 1;
1638        }
1639        if self.repetition_index_depth > 0 {
1640            bufs_needed += 1;
1641        }
1642        let mut required_ranges = Vec::with_capacity(bufs_needed);
1643        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1644        if let Some(ref dictionary) = self.dictionary {
1645            required_ranges.push(
1646                dictionary.dictionary_buf_position_and_size.0
1647                    ..dictionary.dictionary_buf_position_and_size.0
1648                        + dictionary.dictionary_buf_position_and_size.1,
1649            );
1650        }
1651        if self.repetition_index_depth > 0 {
1652            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1653            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1654        }
1655        let io_req = io.submit_request(required_ranges, 0);
1656
1657        async move {
1658            let mut buffers = io_req.await?.into_iter().fuse();
1659            let meta_bytes = buffers.next().unwrap();
1660            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1661            let rep_index_bytes = buffers.next();
1662
1663            // Parse the metadata and build the chunk meta
1664            assert!(meta_bytes.len() % 2 == 0);
1665            let bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1666            let words = bytes.borrow_to_typed_slice::<u16>();
1667            let words = words.as_ref();
1668
1669            let mut chunk_meta = Vec::with_capacity(words.len());
1670
1671            let mut rows_counter = 0;
1672            let mut offset_bytes = value_buf_position;
1673            for (word_idx, word) in words.iter().enumerate() {
1674                let log_num_values = word & 0x0F;
1675                let divided_bytes = word >> 4;
1676                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1677                debug_assert!(num_bytes > 0);
1678                let num_values = if word_idx < words.len() - 1 {
1679                    debug_assert!(log_num_values > 0);
1680                    1 << log_num_values
1681                } else {
1682                    debug_assert!(
1683                        log_num_values == 0
1684                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1685                    );
1686                    self.items_in_page - rows_counter
1687                };
1688                rows_counter += num_values;
1689
1690                chunk_meta.push(ChunkMeta {
1691                    num_values,
1692                    chunk_size_bytes: num_bytes as u64,
1693                    offset_bytes,
1694                });
1695                offset_bytes += num_bytes as u64;
1696            }
1697
1698            // Build the repetition index
1699            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1700                assert!(rep_index_data.len() % 8 == 0);
1701                let stride = self.repetition_index_depth as usize + 1;
1702                MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1703            } else {
1704                MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1705            };
1706
1707            let mut page_meta = MiniBlockCacheableState {
1708                chunk_meta,
1709                rep_index,
1710                dictionary: None,
1711            };
1712
1713            // decode dictionary
1714            if let Some(ref mut dictionary) = self.dictionary {
1715                let dictionary_data = dictionary_bytes.unwrap();
1716                page_meta.dictionary =
1717                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1718                        LanceBuffer::from_bytes(
1719                            dictionary_data,
1720                            dictionary.dictionary_data_alignment,
1721                        ),
1722                        dictionary.num_dictionary_items,
1723                    )?));
1724            };
1725            let page_meta = Arc::new(page_meta);
1726            self.page_meta = Some(page_meta.clone());
1727            Ok(page_meta as Arc<dyn CachedPageData>)
1728        }
1729        .boxed()
1730    }
1731
1732    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1733        self.page_meta = Some(
1734            data.clone()
1735                .as_arc_any()
1736                .downcast::<MiniBlockCacheableState>()
1737                .unwrap(),
1738        );
1739    }
1740
1741    fn schedule_ranges(
1742        &self,
1743        ranges: &[Range<u64>],
1744        io: &Arc<dyn EncodingsIo>,
1745    ) -> Result<Vec<PageLoadTask>> {
1746        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1747
1748        let page_meta = self.page_meta.as_ref().unwrap();
1749
1750        let chunk_instructions =
1751            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1752
1753        debug_assert_eq!(
1754            num_rows,
1755            chunk_instructions
1756                .iter()
1757                .map(|ci| ci.rows_to_take)
1758                .sum::<u64>()
1759        );
1760
1761        let chunks_needed = chunk_instructions
1762            .iter()
1763            .map(|ci| ci.chunk_idx)
1764            .unique()
1765            .collect::<Vec<_>>();
1766
1767        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1768        let chunk_ranges = loaded_chunks
1769            .iter()
1770            .map(|c| c.byte_range.clone())
1771            .collect::<Vec<_>>();
1772        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1773
1774        let rep_decompressor = self.rep_decompressor.clone();
1775        let def_decompressor = self.def_decompressor.clone();
1776        let value_decompressor = self.value_decompressor.clone();
1777        let num_buffers = self.num_buffers;
1778        let dictionary = page_meta
1779            .dictionary
1780            .as_ref()
1781            .map(|dictionary| dictionary.clone());
1782        let def_meaning = self.def_meaning.clone();
1783
1784        let res = async move {
1785            let loaded_chunk_data = loaded_chunk_data.await?;
1786            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1787                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1788            }
1789
1790            Ok(Box::new(MiniBlockDecoder {
1791                rep_decompressor,
1792                def_decompressor,
1793                value_decompressor,
1794                def_meaning,
1795                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1796                instructions: VecDeque::from(chunk_instructions),
1797                offset_in_current_chunk: 0,
1798                dictionary,
1799                num_rows,
1800                num_buffers,
1801            }) as Box<dyn StructuralPageDecoder>)
1802        }
1803        .boxed();
1804        let page_load_task = PageLoadTask {
1805            decoder_fut: res,
1806            num_rows,
1807        };
1808        Ok(vec![page_load_task])
1809    }
1810}
1811
1812#[derive(Debug, Clone, Copy)]
1813struct FullZipRepIndexDetails {
1814    buf_position: u64,
1815    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1816}
1817
1818#[derive(Debug)]
1819enum PerValueDecompressor {
1820    Fixed(Arc<dyn FixedPerValueDecompressor>),
1821    Variable(Arc<dyn VariablePerValueDecompressor>),
1822}
1823
1824#[derive(Debug)]
1825struct FullZipDecodeDetails {
1826    value_decompressor: PerValueDecompressor,
1827    def_meaning: Arc<[DefinitionInterpretation]>,
1828    ctrl_word_parser: ControlWordParser,
1829    max_rep: u16,
1830    max_visible_def: u16,
1831}
1832
1833/// A scheduler for full-zip encoded data
1834///
1835/// When the data type has a fixed-width then we simply need to map from
1836/// row ranges to byte ranges using the fixed-width of the data type.
1837///
1838/// When the data type is variable-width or has any repetition then a
1839/// repetition index is required.
1840#[derive(Debug)]
1841pub struct FullZipScheduler {
1842    data_buf_position: u64,
1843    rep_index: Option<FullZipRepIndexDetails>,
1844    priority: u64,
1845    rows_in_page: u64,
1846    bits_per_offset: u8,
1847    details: Arc<FullZipDecodeDetails>,
1848    /// Cached state containing the decoded repetition index
1849    cached_state: Option<Arc<FullZipCacheableState>>,
1850    /// Whether to enable caching of repetition indices
1851    enable_cache: bool,
1852}
1853
1854impl FullZipScheduler {
1855    fn try_new(
1856        buffer_offsets_and_sizes: &[(u64, u64)],
1857        priority: u64,
1858        rows_in_page: u64,
1859        layout: &pb21::FullZipLayout,
1860        decompressors: &dyn DecompressionStrategy,
1861    ) -> Result<Self> {
1862        // We don't need the data_buf_size because either the data type is
1863        // fixed-width (and we can tell size from rows_in_page) or it is not
1864        // and we have a repetition index.
1865        let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1866        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1867            let num_reps = rows_in_page + 1;
1868            let bytes_per_rep = len / num_reps;
1869            debug_assert_eq!(len % num_reps, 0);
1870            debug_assert!(
1871                bytes_per_rep == 1
1872                    || bytes_per_rep == 2
1873                    || bytes_per_rep == 4
1874                    || bytes_per_rep == 8
1875            );
1876            FullZipRepIndexDetails {
1877                buf_position: *pos,
1878                bytes_per_value: bytes_per_rep,
1879            }
1880        });
1881
1882        let value_decompressor = match layout.details {
1883            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
1884                let decompressor = decompressors.create_fixed_per_value_decompressor(
1885                    layout.value_compression.as_ref().unwrap(),
1886                )?;
1887                PerValueDecompressor::Fixed(decompressor.into())
1888            }
1889            Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
1890                let decompressor = decompressors.create_variable_per_value_decompressor(
1891                    layout.value_compression.as_ref().unwrap(),
1892                )?;
1893                PerValueDecompressor::Variable(decompressor.into())
1894            }
1895            None => {
1896                panic!("Full-zip layout must have a `details` field");
1897            }
1898        };
1899        let ctrl_word_parser = ControlWordParser::new(
1900            layout.bits_rep.try_into().unwrap(),
1901            layout.bits_def.try_into().unwrap(),
1902        );
1903        let def_meaning = layout
1904            .layers
1905            .iter()
1906            .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1907            .collect::<Vec<_>>();
1908
1909        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1910        let max_visible_def = def_meaning
1911            .iter()
1912            .filter(|d| !d.is_list())
1913            .map(|d| d.num_def_levels())
1914            .sum();
1915
1916        let bits_per_offset = match layout.details {
1917            Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
1918            Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1919                bits_per_offset as u8
1920            }
1921            None => panic!("Full-zip layout must have a `details` field"),
1922        };
1923
1924        let details = Arc::new(FullZipDecodeDetails {
1925            value_decompressor,
1926            def_meaning: def_meaning.into(),
1927            ctrl_word_parser,
1928            max_rep,
1929            max_visible_def,
1930        });
1931        Ok(Self {
1932            data_buf_position,
1933            rep_index,
1934            details,
1935            priority,
1936            rows_in_page,
1937            bits_per_offset,
1938            cached_state: None,
1939            enable_cache: false, // Default to false, will be set later
1940        })
1941    }
1942
1943    /// Creates a decoder from the loaded data
1944    fn create_decoder(
1945        details: Arc<FullZipDecodeDetails>,
1946        data: VecDeque<LanceBuffer>,
1947        num_rows: u64,
1948        bits_per_offset: u8,
1949    ) -> Result<Box<dyn StructuralPageDecoder>> {
1950        match &details.value_decompressor {
1951            PerValueDecompressor::Fixed(decompressor) => {
1952                let bits_per_value = decompressor.bits_per_value();
1953                if bits_per_value == 0 {
1954                    return Err(lance_core::Error::Internal {
1955                        message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1956                        location: location!(),
1957                    });
1958                }
1959                if bits_per_value % 8 != 0 {
1960                    return Err(lance_core::Error::NotSupported {
1961                        source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1962                        location: location!(),
1963                    });
1964                }
1965                let bytes_per_value = bits_per_value / 8;
1966                let total_bytes_per_value =
1967                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1968                Ok(Box::new(FixedFullZipDecoder {
1969                    details,
1970                    data,
1971                    num_rows,
1972                    offset_in_current: 0,
1973                    bytes_per_value: bytes_per_value as usize,
1974                    total_bytes_per_value,
1975                }) as Box<dyn StructuralPageDecoder>)
1976            }
1977            PerValueDecompressor::Variable(_decompressor) => {
1978                Ok(Box::new(VariableFullZipDecoder::new(
1979                    details,
1980                    data,
1981                    num_rows,
1982                    bits_per_offset,
1983                    bits_per_offset,
1984                )))
1985            }
1986        }
1987    }
1988
1989    /// Extracts byte ranges from a repetition index buffer
1990    /// The buffer contains pairs of (start, end) values for each range
1991    fn extract_byte_ranges_from_pairs(
1992        buffer: LanceBuffer,
1993        bytes_per_value: u64,
1994        data_buf_position: u64,
1995    ) -> Vec<Range<u64>> {
1996        ByteUnpacker::new(buffer, bytes_per_value as usize)
1997            .chunks(2)
1998            .into_iter()
1999            .map(|mut c| {
2000                let start = c.next().unwrap() + data_buf_position;
2001                let end = c.next().unwrap() + data_buf_position;
2002                start..end
2003            })
2004            .collect::<Vec<_>>()
2005    }
2006
2007    /// Extracts byte ranges from a cached repetition index buffer
2008    /// The buffer contains all values and we need to extract specific ranges
2009    fn extract_byte_ranges_from_cached(
2010        buffer: &LanceBuffer,
2011        ranges: &[Range<u64>],
2012        bytes_per_value: u64,
2013        data_buf_position: u64,
2014    ) -> Vec<Range<u64>> {
2015        ranges
2016            .iter()
2017            .map(|r| {
2018                let start_offset = (r.start * bytes_per_value) as usize;
2019                let end_offset = (r.end * bytes_per_value) as usize;
2020
2021                let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2022                let start_val =
2023                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2024                        .next()
2025                        .unwrap();
2026
2027                let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2028                let end_val =
2029                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2030                        .next()
2031                        .unwrap();
2032
2033                (data_buf_position + start_val)..(data_buf_position + end_val)
2034            })
2035            .collect()
2036    }
2037
2038    /// Computes the ranges in the repetition index that need to be loaded
2039    fn compute_rep_index_ranges(
2040        ranges: &[Range<u64>],
2041        rep_index: &FullZipRepIndexDetails,
2042    ) -> Vec<Range<u64>> {
2043        ranges
2044            .iter()
2045            .flat_map(|r| {
2046                let first_val_start =
2047                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2048                let first_val_end = first_val_start + rep_index.bytes_per_value;
2049                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2050                let last_val_end = last_val_start + rep_index.bytes_per_value;
2051                [first_val_start..first_val_end, last_val_start..last_val_end]
2052            })
2053            .collect()
2054    }
2055
2056    /// Resolves byte ranges from repetition index (either from cache or disk)
2057    async fn resolve_byte_ranges(
2058        data_buf_position: u64,
2059        ranges: &[Range<u64>],
2060        io: &Arc<dyn EncodingsIo>,
2061        rep_index: &FullZipRepIndexDetails,
2062        cached_state: Option<&Arc<FullZipCacheableState>>,
2063        priority: u64,
2064    ) -> Result<Vec<Range<u64>>> {
2065        if let Some(cached_state) = cached_state {
2066            // Use cached repetition index
2067            Ok(Self::extract_byte_ranges_from_cached(
2068                &cached_state.rep_index_buffer,
2069                ranges,
2070                rep_index.bytes_per_value,
2071                data_buf_position,
2072            ))
2073        } else {
2074            // Load from disk
2075            let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
2076            let rep_data = io.submit_request(rep_ranges, priority).await?;
2077            let rep_buffer = LanceBuffer::concat(
2078                &rep_data
2079                    .into_iter()
2080                    .map(|d| LanceBuffer::from_bytes(d, 1))
2081                    .collect::<Vec<_>>(),
2082            );
2083            Ok(Self::extract_byte_ranges_from_pairs(
2084                rep_buffer,
2085                rep_index.bytes_per_value,
2086                data_buf_position,
2087            ))
2088        }
2089    }
2090
2091    /// Schedules ranges in the presence of a repetition index
2092    fn schedule_ranges_rep(
2093        &self,
2094        ranges: &[Range<u64>],
2095        io: &Arc<dyn EncodingsIo>,
2096        rep_index: FullZipRepIndexDetails,
2097    ) -> Result<Vec<PageLoadTask>> {
2098        // Copy necessary fields to avoid lifetime issues
2099        let data_buf_position = self.data_buf_position;
2100        let cached_state = self.cached_state.clone();
2101        let priority = self.priority;
2102        let details = self.details.clone();
2103        let bits_per_offset = self.bits_per_offset;
2104        let ranges = ranges.to_vec();
2105        let io_clone = io.clone();
2106        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2107
2108        let load_task = async move {
2109            // Step 1: Resolve byte ranges from repetition index
2110            let byte_ranges = Self::resolve_byte_ranges(
2111                data_buf_position,
2112                &ranges,
2113                &io_clone,
2114                &rep_index,
2115                cached_state.as_ref(),
2116                priority,
2117            )
2118            .await?;
2119
2120            // Step 2: Load data
2121            let data = io_clone.submit_request(byte_ranges, priority).await?;
2122            let data = data
2123                .into_iter()
2124                .map(|d| LanceBuffer::from_bytes(d, 1))
2125                .collect::<VecDeque<_>>();
2126
2127            // Step 3: Calculate total rows
2128            let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2129
2130            // Step 4: Create decoder
2131            Self::create_decoder(details, data, num_rows, bits_per_offset)
2132        }
2133        .boxed();
2134        let page_load_task = PageLoadTask {
2135            decoder_fut: load_task,
2136            num_rows,
2137        };
2138        Ok(vec![page_load_task])
2139    }
2140
2141    // In the simple case there is no repetition and we just have large fixed-width
2142    // rows of data.  We can just map row ranges to byte ranges directly using the
2143    // fixed-width of the data type.
2144    fn schedule_ranges_simple(
2145        &self,
2146        ranges: &[Range<u64>],
2147        io: &dyn EncodingsIo,
2148    ) -> Result<Vec<PageLoadTask>> {
2149        // Convert row ranges to item ranges (i.e. multiply by items per row)
2150        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2151
2152        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2153            unreachable!()
2154        };
2155
2156        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
2157        let bits_per_value = decompressor.bits_per_value();
2158        assert_eq!(bits_per_value % 8, 0);
2159        let bytes_per_value = bits_per_value / 8;
2160        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2161        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2162        let byte_ranges = ranges.iter().map(|r| {
2163            debug_assert!(r.end <= self.rows_in_page);
2164            let start = self.data_buf_position + r.start * total_bytes_per_value;
2165            let end = self.data_buf_position + r.end * total_bytes_per_value;
2166            start..end
2167        });
2168
2169        // Request byte ranges
2170        let data = io.submit_request(byte_ranges.collect(), self.priority);
2171
2172        let details = self.details.clone();
2173
2174        let load_task = async move {
2175            let data = data.await?;
2176            let data = data
2177                .into_iter()
2178                .map(|d| LanceBuffer::from_bytes(d, 1))
2179                .collect();
2180            Ok(Box::new(FixedFullZipDecoder {
2181                details,
2182                data,
2183                num_rows,
2184                offset_in_current: 0,
2185                bytes_per_value: bytes_per_value as usize,
2186                total_bytes_per_value: total_bytes_per_value as usize,
2187            }) as Box<dyn StructuralPageDecoder>)
2188        }
2189        .boxed();
2190        let page_load_task = PageLoadTask {
2191            decoder_fut: load_task,
2192            num_rows,
2193        };
2194        Ok(vec![page_load_task])
2195    }
2196}
2197
2198/// Cacheable state for FullZip encoding, storing the decoded repetition index
2199#[derive(Debug)]
2200struct FullZipCacheableState {
2201    /// The raw repetition index buffer for future decoding
2202    rep_index_buffer: LanceBuffer,
2203}
2204
2205impl DeepSizeOf for FullZipCacheableState {
2206    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2207        self.rep_index_buffer.len()
2208    }
2209}
2210
2211impl CachedPageData for FullZipCacheableState {
2212    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2213        self
2214    }
2215}
2216
2217impl StructuralPageScheduler for FullZipScheduler {
2218    /// Initializes the scheduler. If there's a repetition index, loads and caches it.
2219    /// Otherwise returns NoCachedPageData.
2220    fn initialize<'a>(
2221        &'a mut self,
2222        io: &Arc<dyn EncodingsIo>,
2223    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2224        // Check if caching is enabled and we have a repetition index
2225        if self.enable_cache && self.rep_index.is_some() {
2226            let rep_index = self.rep_index.as_ref().unwrap();
2227            // Calculate the total size of the repetition index
2228            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2229            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2230
2231            // Load the repetition index buffer
2232            let io_clone = io.clone();
2233            let future = async move {
2234                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2235                let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2236
2237                // Create and return the cacheable state
2238                Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2239            };
2240
2241            future.boxed()
2242        } else {
2243            // Caching disabled or no repetition index, skip caching
2244            std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2245        }
2246    }
2247
2248    /// Loads previously cached repetition index data from the cache system.
2249    /// This method is called when a scheduler instance needs to use cached data
2250    /// that was initialized by another instance or in a previous operation.
2251    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2252        // Try to downcast to our specific cache type
2253        if let Ok(cached_state) = cache
2254            .clone()
2255            .as_arc_any()
2256            .downcast::<FullZipCacheableState>()
2257        {
2258            // Store the cached state for use in schedule_ranges
2259            self.cached_state = Some(cached_state);
2260        }
2261    }
2262
2263    fn schedule_ranges(
2264        &self,
2265        ranges: &[Range<u64>],
2266        io: &Arc<dyn EncodingsIo>,
2267    ) -> Result<Vec<PageLoadTask>> {
2268        if let Some(rep_index) = self.rep_index {
2269            self.schedule_ranges_rep(ranges, io, rep_index)
2270        } else {
2271            self.schedule_ranges_simple(ranges, io.as_ref())
2272        }
2273    }
2274}
2275
2276/// A decoder for full-zip encoded data when the data has a fixed-width
2277///
2278/// Here we need to unzip the control words from the values themselves and
2279/// then decompress the requested values.
2280///
2281/// We use a PerValueDecompressor because we will only be decompressing the
2282/// requested data.  This decoder / scheduler does not do any read amplification.
2283#[derive(Debug)]
2284struct FixedFullZipDecoder {
2285    details: Arc<FullZipDecodeDetails>,
2286    data: VecDeque<LanceBuffer>,
2287    offset_in_current: usize,
2288    bytes_per_value: usize,
2289    total_bytes_per_value: usize,
2290    num_rows: u64,
2291}
2292
2293impl FixedFullZipDecoder {
2294    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2295        debug_assert!(num_rows > 0);
2296        let cur_buf = self.data.front_mut().unwrap();
2297        let start = self.offset_in_current;
2298        if self.details.ctrl_word_parser.has_rep() {
2299            // This is a slightly slower path.  In order to figure out where to split we need to
2300            // examine the rep index so we can convert num_lists to num_rows
2301            let mut rows_started = 0;
2302            // We always need at least one value.  Now loop through until we have passed num_rows
2303            // values
2304            let mut num_items = 0;
2305            while self.offset_in_current < cur_buf.len() {
2306                let control = self.details.ctrl_word_parser.parse_desc(
2307                    &cur_buf[self.offset_in_current..],
2308                    self.details.max_rep,
2309                    self.details.max_visible_def,
2310                );
2311                if control.is_new_row {
2312                    if rows_started == num_rows {
2313                        break;
2314                    }
2315                    rows_started += 1;
2316                }
2317                num_items += 1;
2318                if control.is_visible {
2319                    self.offset_in_current += self.total_bytes_per_value;
2320                } else {
2321                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2322                }
2323            }
2324
2325            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2326            if self.offset_in_current == cur_buf.len() {
2327                self.data.pop_front();
2328                self.offset_in_current = 0;
2329            }
2330
2331            FullZipDecodeTaskItem {
2332                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2333                    data: task_slice,
2334                    bits_per_value: self.bytes_per_value as u64 * 8,
2335                    num_values: num_items,
2336                    block_info: BlockInfo::new(),
2337                }),
2338                rows_in_buf: rows_started,
2339            }
2340        } else {
2341            // If there's no repetition we can calculate the slicing point by just multiplying
2342            // the number of rows by the total bytes per value
2343            let cur_buf = self.data.front_mut().unwrap();
2344            let bytes_avail = cur_buf.len() - self.offset_in_current;
2345            let offset_in_cur = self.offset_in_current;
2346
2347            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2348            let mut rows_taken = num_rows;
2349            let task_slice = if bytes_needed >= bytes_avail {
2350                self.offset_in_current = 0;
2351                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2352                self.data
2353                    .pop_front()
2354                    .unwrap()
2355                    .slice_with_length(offset_in_cur, bytes_avail)
2356            } else {
2357                self.offset_in_current += bytes_needed;
2358                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2359            };
2360            FullZipDecodeTaskItem {
2361                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2362                    data: task_slice,
2363                    bits_per_value: self.bytes_per_value as u64 * 8,
2364                    num_values: rows_taken,
2365                    block_info: BlockInfo::new(),
2366                }),
2367                rows_in_buf: rows_taken,
2368            }
2369        }
2370    }
2371}
2372
2373impl StructuralPageDecoder for FixedFullZipDecoder {
2374    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2375        let mut task_data = Vec::with_capacity(self.data.len());
2376        let mut remaining = num_rows;
2377        while remaining > 0 {
2378            let task_item = self.slice_next_task(remaining);
2379            remaining -= task_item.rows_in_buf;
2380            task_data.push(task_item);
2381        }
2382        Ok(Box::new(FixedFullZipDecodeTask {
2383            details: self.details.clone(),
2384            data: task_data,
2385            bytes_per_value: self.bytes_per_value,
2386            num_rows: num_rows as usize,
2387        }))
2388    }
2389
2390    fn num_rows(&self) -> u64 {
2391        self.num_rows
2392    }
2393}
2394
2395/// A decoder for full-zip encoded data when the data has a variable-width
2396///
2397/// Here we need to unzip the control words AND lengths from the values and
2398/// then decompress the requested values.
2399#[derive(Debug)]
2400struct VariableFullZipDecoder {
2401    details: Arc<FullZipDecodeDetails>,
2402    decompressor: Arc<dyn VariablePerValueDecompressor>,
2403    data: LanceBuffer,
2404    offsets: LanceBuffer,
2405    rep: ScalarBuffer<u16>,
2406    def: ScalarBuffer<u16>,
2407    repdef_starts: Vec<usize>,
2408    data_starts: Vec<usize>,
2409    offset_starts: Vec<usize>,
2410    visible_item_counts: Vec<u64>,
2411    bits_per_offset: u8,
2412    current_idx: usize,
2413    num_rows: u64,
2414}
2415
2416impl VariableFullZipDecoder {
2417    fn new(
2418        details: Arc<FullZipDecodeDetails>,
2419        data: VecDeque<LanceBuffer>,
2420        num_rows: u64,
2421        in_bits_per_length: u8,
2422        out_bits_per_offset: u8,
2423    ) -> Self {
2424        let decompressor = match details.value_decompressor {
2425            PerValueDecompressor::Variable(ref d) => d.clone(),
2426            _ => unreachable!(),
2427        };
2428
2429        assert_eq!(in_bits_per_length % 8, 0);
2430        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2431
2432        let mut decoder = Self {
2433            details,
2434            decompressor,
2435            data: LanceBuffer::empty(),
2436            offsets: LanceBuffer::empty(),
2437            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2438            def: LanceBuffer::empty().borrow_to_typed_slice(),
2439            bits_per_offset: out_bits_per_offset,
2440            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2441            data_starts: Vec::with_capacity(num_rows as usize + 1),
2442            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2443            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2444            current_idx: 0,
2445            num_rows,
2446        };
2447
2448        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2449        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2450        // unpacked to know where the values start and end.
2451        //
2452        // We don't want to unzip on the decode thread because that is a single-threaded path
2453        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2454        //
2455        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2456        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2457        // we load all the data into memory and then throw it away only to load it all into memory again during
2458        // the decode.
2459        //
2460        // There are some alternatives to investigate:
2461        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2462        //     range in between.  This will give us the break points that we need for slicing and won't increase
2463        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2464        //     even when doing a full scan.
2465        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2466        //     has to do more work but the work is all fused.
2467        //   - We could just try doing this work on the decode thread and see if it is a problem.
2468        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2469
2470        decoder
2471    }
2472
2473    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2474        match bits_per_offset {
2475            8 => *data.get_unchecked(0) as u64,
2476            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2477            32 => u32::from_le_bytes([
2478                *data.get_unchecked(0),
2479                *data.get_unchecked(1),
2480                *data.get_unchecked(2),
2481                *data.get_unchecked(3),
2482            ]) as u64,
2483            64 => u64::from_le_bytes([
2484                *data.get_unchecked(0),
2485                *data.get_unchecked(1),
2486                *data.get_unchecked(2),
2487                *data.get_unchecked(3),
2488                *data.get_unchecked(4),
2489                *data.get_unchecked(5),
2490                *data.get_unchecked(6),
2491                *data.get_unchecked(7),
2492            ]),
2493            _ => unreachable!(),
2494        }
2495    }
2496
2497    fn unzip(
2498        &mut self,
2499        data: VecDeque<LanceBuffer>,
2500        in_bits_per_length: u8,
2501        out_bits_per_offset: u8,
2502        num_rows: u64,
2503    ) {
2504        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2505        let mut rep = Vec::with_capacity(num_rows as usize);
2506        let mut def = Vec::with_capacity(num_rows as usize);
2507        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2508
2509        // This undercounts if there are lists
2510        // It can also overcount if there are invisible items
2511        let bytes_per_offset = out_bits_per_offset as usize / 8;
2512        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2513        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2514
2515        let bytes_per_length = in_bits_per_length as usize / 8;
2516        let bytes_lengths = bytes_per_length * num_rows as usize;
2517
2518        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2519        // This overcounts since bytes_lengths and bytes_cw are undercounts
2520        // It can also undercount if there are invisible items (hence the saturating_sub)
2521        let mut unzipped_data =
2522            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2523
2524        let mut current_offset = 0_u64;
2525        let mut visible_item_count = 0_u64;
2526        for databuf in data.into_iter() {
2527            let mut databuf = databuf.as_ref();
2528            while !databuf.is_empty() {
2529                let data_start = unzipped_data.len();
2530                let offset_start = offsets_data.len();
2531                // We might have only-rep or only-def, neither, or both.  They move at the same
2532                // speed though so we only need one index into it
2533                let repdef_start = rep.len().max(def.len());
2534                // TODO: Kind of inefficient we parse the control word twice here
2535                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2536                    databuf,
2537                    self.details.max_rep,
2538                    self.details.max_visible_def,
2539                );
2540                self.details
2541                    .ctrl_word_parser
2542                    .parse(databuf, &mut rep, &mut def);
2543                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2544
2545                if ctrl_desc.is_new_row {
2546                    self.repdef_starts.push(repdef_start);
2547                    self.data_starts.push(data_start);
2548                    self.offset_starts.push(offset_start);
2549                    self.visible_item_counts.push(visible_item_count);
2550                }
2551                if ctrl_desc.is_visible {
2552                    visible_item_count += 1;
2553                    if ctrl_desc.is_valid_item {
2554                        // Safety: Data should have at least bytes_per_length bytes remaining
2555                        debug_assert!(databuf.len() >= bytes_per_length);
2556                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2557                        match out_bits_per_offset {
2558                            32 => offsets_data
2559                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2560                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2561                            _ => unreachable!(),
2562                        };
2563                        databuf = &databuf[bytes_per_offset..];
2564                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2565                        databuf = &databuf[length as usize..];
2566                        current_offset += length;
2567                    } else {
2568                        // Null items still get an offset
2569                        match out_bits_per_offset {
2570                            32 => offsets_data
2571                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2572                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2573                            _ => unreachable!(),
2574                        }
2575                    }
2576                }
2577            }
2578        }
2579        self.repdef_starts.push(rep.len().max(def.len()));
2580        self.data_starts.push(unzipped_data.len());
2581        self.offset_starts.push(offsets_data.len());
2582        self.visible_item_counts.push(visible_item_count);
2583        match out_bits_per_offset {
2584            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2585            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2586            _ => unreachable!(),
2587        };
2588        self.rep = ScalarBuffer::from(rep);
2589        self.def = ScalarBuffer::from(def);
2590        self.data = LanceBuffer::from(unzipped_data);
2591        self.offsets = LanceBuffer::from(offsets_data);
2592    }
2593}
2594
2595impl StructuralPageDecoder for VariableFullZipDecoder {
2596    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2597        let start = self.current_idx;
2598        let end = start + num_rows as usize;
2599
2600        // This might seem a little peculiar.  We are returning the entire data for every single
2601        // batch.  This is because the offsets are relative to the start of the data.  In other words
2602        // imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
2603        // and we return in batches of two.  The second set of offsets will be [20, 30, 40].
2604        //
2605        // So either we pay for a copy to normalize the offsets or we just return the entire data buffer
2606        // which is slightly cheaper.
2607        let data = self.data.clone();
2608
2609        let offset_start = self.offset_starts[start];
2610        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2611        let offsets = self
2612            .offsets
2613            .slice_with_length(offset_start, offset_end - offset_start);
2614
2615        let repdef_start = self.repdef_starts[start];
2616        let repdef_end = self.repdef_starts[end];
2617        let rep = if self.rep.is_empty() {
2618            self.rep.clone()
2619        } else {
2620            self.rep.slice(repdef_start, repdef_end - repdef_start)
2621        };
2622        let def = if self.def.is_empty() {
2623            self.def.clone()
2624        } else {
2625            self.def.slice(repdef_start, repdef_end - repdef_start)
2626        };
2627
2628        let visible_item_counts_start = self.visible_item_counts[start];
2629        let visible_item_counts_end = self.visible_item_counts[end];
2630        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2631
2632        self.current_idx += num_rows as usize;
2633
2634        Ok(Box::new(VariableFullZipDecodeTask {
2635            details: self.details.clone(),
2636            decompressor: self.decompressor.clone(),
2637            data,
2638            offsets,
2639            bits_per_offset: self.bits_per_offset,
2640            num_visible_items,
2641            rep,
2642            def,
2643        }))
2644    }
2645
2646    fn num_rows(&self) -> u64 {
2647        self.num_rows
2648    }
2649}
2650
2651#[derive(Debug)]
2652struct VariableFullZipDecodeTask {
2653    details: Arc<FullZipDecodeDetails>,
2654    decompressor: Arc<dyn VariablePerValueDecompressor>,
2655    data: LanceBuffer,
2656    offsets: LanceBuffer,
2657    bits_per_offset: u8,
2658    num_visible_items: u64,
2659    rep: ScalarBuffer<u16>,
2660    def: ScalarBuffer<u16>,
2661}
2662
2663impl DecodePageTask for VariableFullZipDecodeTask {
2664    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2665        let block = VariableWidthBlock {
2666            data: self.data,
2667            offsets: self.offsets,
2668            bits_per_offset: self.bits_per_offset,
2669            num_values: self.num_visible_items,
2670            block_info: BlockInfo::new(),
2671        };
2672        let decomopressed = self.decompressor.decompress(block)?;
2673        let rep = if self.rep.is_empty() {
2674            None
2675        } else {
2676            Some(self.rep.to_vec())
2677        };
2678        let def = if self.def.is_empty() {
2679            None
2680        } else {
2681            Some(self.def.to_vec())
2682        };
2683        let unraveler = RepDefUnraveler::new(
2684            rep,
2685            def,
2686            self.details.def_meaning.clone(),
2687            self.num_visible_items,
2688        );
2689        Ok(DecodedPage {
2690            data: decomopressed,
2691            repdef: unraveler,
2692        })
2693    }
2694}
2695
2696#[derive(Debug)]
2697struct FullZipDecodeTaskItem {
2698    data: PerValueDataBlock,
2699    rows_in_buf: u64,
2700}
2701
2702/// A task to unzip and decompress full-zip encoded data when that data
2703/// has a fixed-width.
2704#[derive(Debug)]
2705struct FixedFullZipDecodeTask {
2706    details: Arc<FullZipDecodeDetails>,
2707    data: Vec<FullZipDecodeTaskItem>,
2708    num_rows: usize,
2709    bytes_per_value: usize,
2710}
2711
2712impl DecodePageTask for FixedFullZipDecodeTask {
2713    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2714        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2715        let estimated_size_bytes = self
2716            .data
2717            .iter()
2718            .map(|task_item| task_item.data.data_size() as usize)
2719            .sum::<usize>()
2720            * 2;
2721        let mut data_builder =
2722            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2723
2724        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2725            // Fast path, no need to unzip because there is no rep/def
2726            //
2727            // We decompress each buffer and add it to our output buffer
2728            for task_item in self.data.into_iter() {
2729                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2730                    unreachable!()
2731                };
2732                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2733                else {
2734                    unreachable!()
2735                };
2736                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2737                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2738                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2739            }
2740
2741            let unraveler = RepDefUnraveler::new(
2742                None,
2743                None,
2744                self.details.def_meaning.clone(),
2745                self.num_rows as u64,
2746            );
2747
2748            Ok(DecodedPage {
2749                data: data_builder.finish(),
2750                repdef: unraveler,
2751            })
2752        } else {
2753            // Slow path, unzipping needed
2754            let mut rep = Vec::with_capacity(self.num_rows);
2755            let mut def = Vec::with_capacity(self.num_rows);
2756
2757            for task_item in self.data.into_iter() {
2758                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2759                    unreachable!()
2760                };
2761                let mut buf_slice = fixed_data.data.as_ref();
2762                let num_values = fixed_data.num_values as usize;
2763                // We will be unzipping repdef in to `rep` and `def` and the
2764                // values into `values` (which contains the compressed values)
2765                let mut values = Vec::with_capacity(
2766                    fixed_data.data.len()
2767                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2768                );
2769                let mut visible_items = 0;
2770                for _ in 0..num_values {
2771                    // Extract rep/def
2772                    self.details
2773                        .ctrl_word_parser
2774                        .parse(buf_slice, &mut rep, &mut def);
2775                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2776
2777                    let is_visible = def
2778                        .last()
2779                        .map(|d| *d <= self.details.max_visible_def)
2780                        .unwrap_or(true);
2781                    if is_visible {
2782                        // Extract value
2783                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2784                        buf_slice = &buf_slice[self.bytes_per_value..];
2785                        visible_items += 1;
2786                    }
2787                }
2788
2789                // Finally, we decompress the values and add them to our output buffer
2790                let values_buf = LanceBuffer::from(values);
2791                let fixed_data = FixedWidthDataBlock {
2792                    bits_per_value: self.bytes_per_value as u64 * 8,
2793                    block_info: BlockInfo::new(),
2794                    data: values_buf,
2795                    num_values: visible_items,
2796                };
2797                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2798                else {
2799                    unreachable!()
2800                };
2801                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2802                data_builder.append(&decompressed, 0..visible_items);
2803            }
2804
2805            let repetition = if rep.is_empty() { None } else { Some(rep) };
2806            let definition = if def.is_empty() { None } else { Some(def) };
2807
2808            let unraveler = RepDefUnraveler::new(
2809                repetition,
2810                definition,
2811                self.details.def_meaning.clone(),
2812                self.num_rows as u64,
2813            );
2814            let data = data_builder.finish();
2815
2816            Ok(DecodedPage {
2817                data,
2818                repdef: unraveler,
2819            })
2820        }
2821    }
2822}
2823
2824#[derive(Debug)]
2825struct StructuralPrimitiveFieldSchedulingJob<'a> {
2826    scheduler: &'a StructuralPrimitiveFieldScheduler,
2827    ranges: Vec<Range<u64>>,
2828    page_idx: usize,
2829    range_idx: usize,
2830    global_row_offset: u64,
2831}
2832
2833impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2834    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2835        Self {
2836            scheduler,
2837            ranges,
2838            page_idx: 0,
2839            range_idx: 0,
2840            global_row_offset: 0,
2841        }
2842    }
2843}
2844
2845impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2846    fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
2847        if self.range_idx >= self.ranges.len() {
2848            return Ok(Vec::new());
2849        }
2850        // Get our current range
2851        let mut range = self.ranges[self.range_idx].clone();
2852        let priority = range.start;
2853
2854        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2855        trace!(
2856            "Current range is {:?} and current page has {} rows",
2857            range,
2858            cur_page.num_rows
2859        );
2860        // Skip entire pages until we have some overlap with our next range
2861        while cur_page.num_rows + self.global_row_offset <= range.start {
2862            self.global_row_offset += cur_page.num_rows;
2863            self.page_idx += 1;
2864            trace!("Skipping entire page of {} rows", cur_page.num_rows);
2865            cur_page = &self.scheduler.page_schedulers[self.page_idx];
2866        }
2867
2868        // Now the cur_page has overlap with range.  Continue looping through ranges
2869        // until we find a range that exceeds the current page
2870
2871        let mut ranges_in_page = Vec::new();
2872        while cur_page.num_rows + self.global_row_offset > range.start {
2873            range.start = range.start.max(self.global_row_offset);
2874            let start_in_page = range.start - self.global_row_offset;
2875            let end_in_page = start_in_page + (range.end - range.start);
2876            let end_in_page = end_in_page.min(cur_page.num_rows);
2877            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2878
2879            ranges_in_page.push(start_in_page..end_in_page);
2880            if last_in_range {
2881                self.range_idx += 1;
2882                if self.range_idx == self.ranges.len() {
2883                    break;
2884                }
2885                range = self.ranges[self.range_idx].clone();
2886            } else {
2887                break;
2888            }
2889        }
2890
2891        trace!(
2892            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2893            ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
2894            ranges_in_page.len(),
2895            cur_page.num_rows,
2896            priority,
2897            self.scheduler.column_index,
2898            cur_page.page_index,
2899        );
2900
2901        self.global_row_offset += cur_page.num_rows;
2902        self.page_idx += 1;
2903
2904        let page_decoders = cur_page
2905            .scheduler
2906            .schedule_ranges(&ranges_in_page, context.io())?;
2907
2908        let cur_path = context.current_path();
2909        page_decoders
2910            .into_iter()
2911            .map(|page_load_task| {
2912                let cur_path = cur_path.clone();
2913                let page_decoder = page_load_task.decoder_fut;
2914                let unloaded_page = async move {
2915                    let page_decoder = page_decoder.await?;
2916                    Ok(LoadedPageShard {
2917                        decoder: page_decoder,
2918                        path: cur_path,
2919                    })
2920                }
2921                .boxed();
2922                Ok(ScheduledScanLine {
2923                    decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
2924                    rows_scheduled: page_load_task.num_rows,
2925                })
2926            })
2927            .collect::<Result<Vec<_>>>()
2928    }
2929}
2930
2931#[derive(Debug)]
2932struct PageInfoAndScheduler {
2933    page_index: usize,
2934    num_rows: u64,
2935    scheduler: Box<dyn StructuralPageScheduler>,
2936}
2937
2938/// A scheduler for a leaf node
2939///
2940/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
2941/// appropriate for the layout of the page.
2942#[derive(Debug)]
2943pub struct StructuralPrimitiveFieldScheduler {
2944    page_schedulers: Vec<PageInfoAndScheduler>,
2945    column_index: u32,
2946}
2947
2948impl StructuralPrimitiveFieldScheduler {
2949    pub fn try_new(
2950        column_info: &ColumnInfo,
2951        decompressors: &dyn DecompressionStrategy,
2952        cache_repetition_index: bool,
2953        target_field: &Field,
2954    ) -> Result<Self> {
2955        let page_schedulers = column_info
2956            .page_infos
2957            .iter()
2958            .enumerate()
2959            .map(|(page_index, page_info)| {
2960                Self::page_info_to_scheduler(
2961                    page_info,
2962                    page_index,
2963                    decompressors,
2964                    cache_repetition_index,
2965                    target_field,
2966                )
2967            })
2968            .collect::<Result<Vec<_>>>()?;
2969        Ok(Self {
2970            page_schedulers,
2971            column_index: column_info.index,
2972        })
2973    }
2974
2975    fn page_layout_to_scheduler(
2976        page_info: &PageInfo,
2977        page_layout: &PageLayout,
2978        decompressors: &dyn DecompressionStrategy,
2979        cache_repetition_index: bool,
2980        target_field: &Field,
2981    ) -> Result<Box<dyn StructuralPageScheduler>> {
2982        use pb21::page_layout::Layout;
2983        Ok(match page_layout.layout.as_ref().expect_ok()? {
2984            Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
2985                &page_info.buffer_offsets_and_sizes,
2986                page_info.priority,
2987                mini_block.num_items,
2988                mini_block,
2989                decompressors,
2990            )?),
2991            Layout::FullZipLayout(full_zip) => {
2992                let mut scheduler = FullZipScheduler::try_new(
2993                    &page_info.buffer_offsets_and_sizes,
2994                    page_info.priority,
2995                    page_info.num_rows,
2996                    full_zip,
2997                    decompressors,
2998                )?;
2999                scheduler.enable_cache = cache_repetition_index;
3000                Box::new(scheduler)
3001            }
3002            Layout::AllNullLayout(all_null) => {
3003                let def_meaning = all_null
3004                    .layers
3005                    .iter()
3006                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3007                    .collect::<Vec<_>>();
3008                if def_meaning.len() == 1
3009                    && def_meaning[0] == DefinitionInterpretation::NullableItem
3010                {
3011                    Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3012                } else {
3013                    Box::new(ComplexAllNullScheduler::new(
3014                        page_info.buffer_offsets_and_sizes.clone(),
3015                        def_meaning.into(),
3016                    )) as Box<dyn StructuralPageScheduler>
3017                }
3018            }
3019            Layout::BlobLayout(blob) => {
3020                let inner_scheduler = Self::page_layout_to_scheduler(
3021                    page_info,
3022                    blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3023                    decompressors,
3024                    cache_repetition_index,
3025                    target_field,
3026                )?;
3027                let def_meaning = blob
3028                    .layers
3029                    .iter()
3030                    .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3031                    .collect::<Vec<_>>();
3032                if matches!(target_field.data_type(), DataType::Struct(_)) {
3033                    // User wants to decode blob into struct
3034                    Box::new(BlobDescriptionPageScheduler::new(
3035                        inner_scheduler,
3036                        def_meaning.into(),
3037                    ))
3038                } else {
3039                    // User wants to decode blob into binary data
3040                    Box::new(BlobPageScheduler::new(
3041                        inner_scheduler,
3042                        page_info.priority,
3043                        page_info.num_rows,
3044                        def_meaning.into(),
3045                    ))
3046                }
3047            }
3048        })
3049    }
3050
3051    fn page_info_to_scheduler(
3052        page_info: &PageInfo,
3053        page_index: usize,
3054        decompressors: &dyn DecompressionStrategy,
3055        cache_repetition_index: bool,
3056        target_field: &Field,
3057    ) -> Result<PageInfoAndScheduler> {
3058        let page_layout = page_info.encoding.as_structural();
3059        let scheduler = Self::page_layout_to_scheduler(
3060            page_info,
3061            page_layout,
3062            decompressors,
3063            cache_repetition_index,
3064            target_field,
3065        )?;
3066        Ok(PageInfoAndScheduler {
3067            page_index,
3068            num_rows: page_info.num_rows,
3069            scheduler,
3070        })
3071    }
3072}
3073
3074pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3075    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3076}
3077
3078pub struct NoCachedPageData;
3079
3080impl DeepSizeOf for NoCachedPageData {
3081    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3082        0
3083    }
3084}
3085impl CachedPageData for NoCachedPageData {
3086    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3087        self
3088    }
3089}
3090
3091pub struct CachedFieldData {
3092    pages: Vec<Arc<dyn CachedPageData>>,
3093}
3094
3095impl DeepSizeOf for CachedFieldData {
3096    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3097        self.pages.deep_size_of_children(ctx)
3098    }
3099}
3100
3101// Cache key for field data
3102#[derive(Debug, Clone)]
3103pub struct FieldDataCacheKey {
3104    pub column_index: u32,
3105}
3106
3107impl CacheKey for FieldDataCacheKey {
3108    type ValueType = CachedFieldData;
3109
3110    fn key(&self) -> std::borrow::Cow<'_, str> {
3111        self.column_index.to_string().into()
3112    }
3113}
3114
3115impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3116    fn initialize<'a>(
3117        &'a mut self,
3118        _filter: &'a FilterExpression,
3119        context: &'a SchedulerContext,
3120    ) -> BoxFuture<'a, Result<()>> {
3121        let cache_key = FieldDataCacheKey {
3122            column_index: self.column_index,
3123        };
3124        let cache = context.cache().clone();
3125
3126        async move {
3127            if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3128                self.page_schedulers
3129                    .iter_mut()
3130                    .zip(cached_data.pages.iter())
3131                    .for_each(|(page_scheduler, cached_data)| {
3132                        page_scheduler.scheduler.load(cached_data);
3133                    });
3134                return Ok(());
3135            }
3136
3137            let page_data = self
3138                .page_schedulers
3139                .iter_mut()
3140                .map(|s| s.scheduler.initialize(context.io()))
3141                .collect::<FuturesOrdered<_>>();
3142
3143            let page_data = page_data.try_collect::<Vec<_>>().await?;
3144            let cached_data = Arc::new(CachedFieldData { pages: page_data });
3145            cache.insert_with_key(&cache_key, cached_data).await;
3146            Ok(())
3147        }
3148        .boxed()
3149    }
3150
3151    fn schedule_ranges<'a>(
3152        &'a self,
3153        ranges: &[Range<u64>],
3154        _filter: &FilterExpression,
3155    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3156        let ranges = ranges.to_vec();
3157        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3158            self, ranges,
3159        )))
3160    }
3161}
3162
3163/// Takes the output from several pages decoders and
3164/// concatenates them.
3165#[derive(Debug)]
3166pub struct StructuralCompositeDecodeArrayTask {
3167    tasks: Vec<Box<dyn DecodePageTask>>,
3168    should_validate: bool,
3169    data_type: DataType,
3170}
3171
3172impl StructuralCompositeDecodeArrayTask {
3173    fn restore_validity(
3174        array: Arc<dyn Array>,
3175        unraveler: &mut CompositeRepDefUnraveler,
3176    ) -> Arc<dyn Array> {
3177        let validity = unraveler.unravel_validity(array.len());
3178        let Some(validity) = validity else {
3179            return array;
3180        };
3181        if array.data_type() == &DataType::Null {
3182            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
3183            return array;
3184        }
3185        assert_eq!(validity.len(), array.len());
3186        // SAFETY: We've should have already asserted the buffers are all valid, we are just
3187        // adding null buffers to the array here
3188        make_array(unsafe {
3189            array
3190                .to_data()
3191                .into_builder()
3192                .nulls(Some(validity))
3193                .build_unchecked()
3194        })
3195    }
3196}
3197
3198impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3199    fn decode(self: Box<Self>) -> Result<DecodedArray> {
3200        let mut arrays = Vec::with_capacity(self.tasks.len());
3201        let mut unravelers = Vec::with_capacity(self.tasks.len());
3202        for task in self.tasks {
3203            let decoded = task.decode()?;
3204            unravelers.push(decoded.repdef);
3205
3206            let array = make_array(
3207                decoded
3208                    .data
3209                    .into_arrow(self.data_type.clone(), self.should_validate)?,
3210            );
3211
3212            arrays.push(array);
3213        }
3214        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3215        let array = arrow_select::concat::concat(&array_refs)?;
3216        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3217
3218        let array = Self::restore_validity(array, &mut repdef);
3219
3220        Ok(DecodedArray { array, repdef })
3221    }
3222}
3223
3224#[derive(Debug)]
3225pub struct StructuralPrimitiveFieldDecoder {
3226    field: Arc<ArrowField>,
3227    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3228    should_validate: bool,
3229    rows_drained_in_current: u64,
3230}
3231
3232impl StructuralPrimitiveFieldDecoder {
3233    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3234        Self {
3235            field: field.clone(),
3236            page_decoders: VecDeque::new(),
3237            should_validate,
3238            rows_drained_in_current: 0,
3239        }
3240    }
3241}
3242
3243impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3244    fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3245        assert!(child.path.is_empty());
3246        self.page_decoders.push_back(child.decoder);
3247        Ok(())
3248    }
3249
3250    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3251        let mut remaining = num_rows;
3252        let mut tasks = Vec::new();
3253        while remaining > 0 {
3254            let cur_page = self.page_decoders.front_mut().unwrap();
3255            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3256            let to_take = num_in_page.min(remaining);
3257
3258            let task = cur_page.drain(to_take)?;
3259            tasks.push(task);
3260
3261            if to_take == num_in_page {
3262                self.page_decoders.pop_front();
3263                self.rows_drained_in_current = 0;
3264            } else {
3265                self.rows_drained_in_current += to_take;
3266            }
3267
3268            remaining -= to_take;
3269        }
3270        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3271            tasks,
3272            should_validate: self.should_validate,
3273            data_type: self.field.data_type().clone(),
3274        }))
3275    }
3276
3277    fn data_type(&self) -> &DataType {
3278        self.field.data_type()
3279    }
3280}
3281
3282/// The serialized representation of full-zip data
3283struct SerializedFullZip {
3284    /// The zipped values buffer
3285    values: LanceBuffer,
3286    /// The repetition index (only present if there is repetition)
3287    repetition_index: Option<LanceBuffer>,
3288}
3289
3290// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3291// to allow us to store a chunk size in 12 bits.
3292//
3293// If we directly record the size in bytes with 12 bits we would be limited to
3294// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3295// words we can store the # of words instead which gives us 32KiB.  We want
3296// at least 24KiB so we can handle even the worst case of
3297// - 4Ki values compressed into an 8186 byte buffer
3298// - 4 bytes to describe rep & def lengths
3299// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3300//   plan for it)
3301//
3302// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3303// values like offsets to be stored in a mini-block and safely read back out.  It also
3304// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3305// needed).
3306//
3307// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3308// the page" and "aligned to 8 bytes from the start of the file."
3309const MINIBLOCK_ALIGNMENT: usize = 8;
3310
3311/// An encoder for primitive (leaf) arrays
3312///
3313/// This encoder is fairly complicated and follows a number of paths depending
3314/// on the data.
3315///
3316/// First, we convert the validity & offsets information into repetition and
3317/// definition levels.  Then we compress the data itself into a single buffer.
3318///
3319/// If the data is narrow then we encode the data in small chunks (each chunk
3320/// should be a few disk sectors and contains a buffer of repetition, a buffer
3321/// of definition, and a buffer of value data).  This approach is called
3322/// "mini-block".  These mini-blocks are stored into a single data buffer.
3323///
3324/// If the data is wide then we zip together the repetition and definition value
3325/// with the value data into a single buffer.  This approach is called "zipped".
3326///
3327/// If there is any repetition information then we create a repetition index
3328///
3329/// In addition, the compression process may create zero or more metadata buffers.
3330/// For example, a dictionary compression will create dictionary metadata.  Any
3331/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3332/// stored in a separate buffer on disk and read at initialization time.
3333///
3334/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3335/// at (roughly) the end of the file so there is, at most, one read per column of
3336/// metadata per file.
3337pub struct PrimitiveStructuralEncoder {
3338    // Accumulates arrays until we have enough data to justify a disk page
3339    accumulation_queue: AccumulationQueue,
3340
3341    keep_original_array: bool,
3342    accumulated_repdefs: Vec<RepDefBuilder>,
3343    // The compression strategy we will use to compress the data
3344    compression_strategy: Arc<dyn CompressionStrategy>,
3345    column_index: u32,
3346    field: Field,
3347    encoding_metadata: Arc<HashMap<String, String>>,
3348}
3349
3350struct CompressedLevelsChunk {
3351    data: LanceBuffer,
3352    num_levels: u16,
3353}
3354
3355struct CompressedLevels {
3356    data: Vec<CompressedLevelsChunk>,
3357    compression: CompressiveEncoding,
3358    rep_index: Option<LanceBuffer>,
3359}
3360
3361struct SerializedMiniBlockPage {
3362    num_buffers: u64,
3363    data: LanceBuffer,
3364    metadata: LanceBuffer,
3365}
3366
3367impl PrimitiveStructuralEncoder {
3368    pub fn try_new(
3369        options: &EncodingOptions,
3370        compression_strategy: Arc<dyn CompressionStrategy>,
3371        column_index: u32,
3372        field: Field,
3373        encoding_metadata: Arc<HashMap<String, String>>,
3374    ) -> Result<Self> {
3375        Ok(Self {
3376            accumulation_queue: AccumulationQueue::new(
3377                options.cache_bytes_per_column,
3378                column_index,
3379                options.keep_original_array,
3380            ),
3381            keep_original_array: options.keep_original_array,
3382            accumulated_repdefs: Vec::new(),
3383            column_index,
3384            compression_strategy,
3385            field,
3386            encoding_metadata,
3387        })
3388    }
3389
3390    // TODO: This is a heuristic we may need to tune at some point
3391    //
3392    // As data gets narrow then the "zipping" process gets too expensive
3393    //   and we prefer mini-block
3394    // As data gets wide then the # of values per block shrinks (very wide)
3395    //   data doesn't even fit in a mini-block and the block overhead gets
3396    //   too large and we prefer zipped.
3397    fn is_narrow(data_block: &DataBlock) -> bool {
3398        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3399
3400        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3401            let max_len_array = max_len_array
3402                .as_any()
3403                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3404                .unwrap();
3405            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3406                return true;
3407            }
3408        }
3409        false
3410    }
3411
3412    fn prefers_miniblock(
3413        data_block: &DataBlock,
3414        encoding_metadata: &HashMap<String, String>,
3415    ) -> bool {
3416        // If the user specifically requested miniblock then use it
3417        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3418            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3419        }
3420        // Otherwise only use miniblock if it is narrow
3421        Self::is_narrow(data_block)
3422    }
3423
3424    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3425        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3426        // user specifically requested not to use it (in which case we're probably going
3427        // to emit an error)
3428        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3429            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3430        }
3431        true
3432    }
3433
3434    // Converts value data, repetition levels, and definition levels into a single
3435    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3436    // which tells us the size of each block.  Finally, if repetition is present then
3437    // we also create a buffer for the repetition index.
3438    //
3439    // Each chunk is serialized as:
3440    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3441    //
3442    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3443    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3444    //
3445    // Each block has a u16 word of metadata.  The upper 12 bits contain the
3446    // # of 8-byte words in the block (if the block does not fill the final word
3447    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3448    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3449    // 0xA)  All blocks except the last must have power-of-two number of values.
3450    // This not only makes metadata smaller but it makes decoding easier since
3451    // batch sizes are typically a power of 2.  4 bits would allow us to express
3452    // up to 16Ki values but we restrict this further to 4Ki values.
3453    //
3454    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3455    //
3456    // All metadata words are serialized (as little endian) into a single buffer
3457    // of metadata values.
3458    //
3459    // If there is repetition then we also create a repetition index.  This is a
3460    // single buffer of integer vectors (stored in row major order).  There is one
3461    // entry for each chunk.  The size of the vector is based on the depth of random
3462    // access we want to support.
3463    //
3464    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3465    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3466    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3467    // levels of nested access and so on.
3468    //
3469    // The first number in the vector is the number of top-level rows that complete in
3470    // the chunk.  The second number is the number of second-level rows that complete
3471    // after the final top-level row completed (or beginning of the chunk if no top-level
3472    // row completes in the chunk).  And so on.  The final number in the vector is always
3473    // the number of leftover items not covered by earlier entries in the vector.
3474    //
3475    // Currently we are limited to 0 levels of nested access but that will change in the
3476    // future.
3477    //
3478    // The repetition index and the chunk metadata are read at initialization time and
3479    // cached in memory.
3480    fn serialize_miniblocks(
3481        miniblocks: MiniBlockCompressed,
3482        rep: Option<Vec<CompressedLevelsChunk>>,
3483        def: Option<Vec<CompressedLevelsChunk>>,
3484    ) -> SerializedMiniBlockPage {
3485        let bytes_rep = rep
3486            .as_ref()
3487            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3488            .unwrap_or(0);
3489        let bytes_def = def
3490            .as_ref()
3491            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3492            .unwrap_or(0);
3493        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3494        let mut num_buffers = miniblocks.data.len();
3495        if rep.is_some() {
3496            num_buffers += 1;
3497        }
3498        if def.is_some() {
3499            num_buffers += 1;
3500        }
3501        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3502        let max_extra = 9 * num_buffers;
3503        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3504        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3505
3506        let mut rep_iter = rep.map(|r| r.into_iter());
3507        let mut def_iter = def.map(|d| d.into_iter());
3508
3509        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3510        for chunk in miniblocks.chunks {
3511            let start_pos = data_buffer.len();
3512            // Start of chunk should be aligned
3513            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3514
3515            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3516            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3517
3518            // Write the number of levels, or 0 if there is no rep/def
3519            let num_levels = rep
3520                .as_ref()
3521                .map(|r| r.num_levels)
3522                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3523            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3524
3525            // Write the buffer lengths
3526            if let Some(rep) = rep.as_ref() {
3527                let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3528                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3529            }
3530            if let Some(def) = def.as_ref() {
3531                let bytes_def = u16::try_from(def.data.len()).unwrap();
3532                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3533            }
3534
3535            for buffer_size in &chunk.buffer_sizes {
3536                let bytes = *buffer_size;
3537                data_buffer.extend_from_slice(&bytes.to_le_bytes());
3538            }
3539
3540            // Pad
3541            let add_padding = |data_buffer: &mut Vec<u8>| {
3542                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3543                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3544            };
3545            add_padding(&mut data_buffer);
3546
3547            // Write the buffers themselves
3548            if let Some(rep) = rep.as_ref() {
3549                data_buffer.extend_from_slice(&rep.data);
3550                add_padding(&mut data_buffer);
3551            }
3552            if let Some(def) = def.as_ref() {
3553                data_buffer.extend_from_slice(&def.data);
3554                add_padding(&mut data_buffer);
3555            }
3556            for (buffer_size, (buffer, buffer_offset)) in chunk
3557                .buffer_sizes
3558                .iter()
3559                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3560            {
3561                let start = *buffer_offset;
3562                let end = start + *buffer_size as usize;
3563                *buffer_offset += *buffer_size as usize;
3564                data_buffer.extend_from_slice(&buffer[start..end]);
3565                add_padding(&mut data_buffer);
3566            }
3567
3568            let chunk_bytes = data_buffer.len() - start_pos;
3569            assert!(chunk_bytes <= 32 * 1024);
3570            assert!(chunk_bytes > 0);
3571            assert_eq!(chunk_bytes % 8, 0);
3572            // We subtract 1 here from chunk_bytes because we want to be able to express
3573            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3574            // 0xFFF
3575            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3576            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3577
3578            let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3579            meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3580        }
3581
3582        let data_buffer = LanceBuffer::from(data_buffer);
3583        let metadata_buffer = LanceBuffer::from(meta_buffer);
3584
3585        SerializedMiniBlockPage {
3586            num_buffers: miniblocks.data.len() as u64,
3587            data: data_buffer,
3588            metadata: metadata_buffer,
3589        }
3590    }
3591
3592    /// Compresses a buffer of levels into chunks
3593    ///
3594    /// If these are repetition levels then we also calculate the repetition index here (that
3595    /// is the third return value)
3596    fn compress_levels(
3597        mut levels: RepDefSlicer<'_>,
3598        num_elements: u64,
3599        compression_strategy: &dyn CompressionStrategy,
3600        chunks: &[MiniBlockChunk],
3601        // This will be 0 if we are compressing def levels
3602        max_rep: u16,
3603    ) -> Result<CompressedLevels> {
3604        let mut rep_index = if max_rep > 0 {
3605            Vec::with_capacity(chunks.len())
3606        } else {
3607            vec![]
3608        };
3609        // Make the levels into a FixedWidth data block
3610        let num_levels = levels.num_levels() as u64;
3611        let levels_buf = levels.all_levels().clone();
3612
3613        let mut fixed_width_block = FixedWidthDataBlock {
3614            data: levels_buf,
3615            bits_per_value: 16,
3616            num_values: num_levels,
3617            block_info: BlockInfo::new(),
3618        };
3619        // Compute statistics to enable optimal compression for rep/def levels
3620        fixed_width_block.compute_stat();
3621
3622        let levels_block = DataBlock::FixedWidth(fixed_width_block);
3623        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3624        // Pick a block compressor
3625        let (compressor, compressor_desc) =
3626            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3627        // Compress blocks of levels (sized according to the chunks)
3628        let mut level_chunks = Vec::with_capacity(chunks.len());
3629        let mut values_counter = 0;
3630        for (chunk_idx, chunk) in chunks.iter().enumerate() {
3631            let chunk_num_values = chunk.num_values(values_counter, num_elements);
3632            debug_assert!(chunk_num_values > 0);
3633            values_counter += chunk_num_values;
3634            let chunk_levels = if chunk_idx < chunks.len() - 1 {
3635                levels.slice_next(chunk_num_values as usize)
3636            } else {
3637                levels.slice_rest()
3638            };
3639            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3640            if max_rep > 0 {
3641                // If max_rep > 0 then we are working with rep levels and we need
3642                // to calculate the repetition index.  The repetition index for a
3643                // chunk is currently 2 values (in the future it may be more).
3644                //
3645                // The first value is the number of rows that _finish_ in the
3646                // chunk.
3647                //
3648                // The second value is the number of "leftovers" after the last
3649                // finished row in the chunk.
3650                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3651                let rep_values = rep_values.as_ref();
3652
3653                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3654                // will count it in the previous chunk)
3655                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3656                let num_leftovers = if chunk_idx < chunks.len() - 1 {
3657                    rep_values
3658                        .iter()
3659                        .rev()
3660                        .position(|v| *v == max_rep)
3661                        // # of leftovers includes the max_rep spot
3662                        .map(|pos| pos + 1)
3663                        .unwrap_or(rep_values.len())
3664                } else {
3665                    // Last chunk can't have leftovers
3666                    0
3667                };
3668
3669                if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
3670                    // This chunk starts with a new row and so, if we thought we had leftovers
3671                    // in the previous chunk, we were mistaken
3672                    // TODO: Can use unchecked here
3673                    let rep_len = rep_index.len();
3674                    if rep_index[rep_len - 1] != 0 {
3675                        // We thought we had leftovers but that was actually a full row
3676                        rep_index[rep_len - 2] += 1;
3677                        rep_index[rep_len - 1] = 0;
3678                    }
3679                }
3680
3681                if chunk_idx == chunks.len() - 1 {
3682                    // The final list
3683                    num_rows += 1;
3684                }
3685                rep_index.push(num_rows as u64);
3686                rep_index.push(num_leftovers as u64);
3687            }
3688            let mut chunk_fixed_width = FixedWidthDataBlock {
3689                data: chunk_levels,
3690                bits_per_value: 16,
3691                num_values: num_chunk_levels,
3692                block_info: BlockInfo::new(),
3693            };
3694            chunk_fixed_width.compute_stat();
3695            let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
3696            let compressed_levels = compressor.compress(chunk_levels_block)?;
3697            level_chunks.push(CompressedLevelsChunk {
3698                data: compressed_levels,
3699                num_levels: num_chunk_levels as u16,
3700            });
3701        }
3702        debug_assert_eq!(levels.num_levels_remaining(), 0);
3703        let rep_index = if rep_index.is_empty() {
3704            None
3705        } else {
3706            Some(LanceBuffer::reinterpret_vec(rep_index))
3707        };
3708        Ok(CompressedLevels {
3709            data: level_chunks,
3710            compression: compressor_desc,
3711            rep_index,
3712        })
3713    }
3714
3715    fn encode_simple_all_null(
3716        column_idx: u32,
3717        num_rows: u64,
3718        row_number: u64,
3719    ) -> Result<EncodedPage> {
3720        let description = ProtobufUtils21::simple_all_null_layout();
3721        Ok(EncodedPage {
3722            column_idx,
3723            data: vec![],
3724            description: PageEncoding::Structural(description),
3725            num_rows,
3726            row_number,
3727        })
3728    }
3729
3730    // Encodes a page where all values are null but we have rep/def
3731    // information that we need to store (e.g. to distinguish between
3732    // different kinds of null)
3733    fn encode_complex_all_null(
3734        column_idx: u32,
3735        repdefs: Vec<RepDefBuilder>,
3736        row_number: u64,
3737        num_rows: u64,
3738    ) -> Result<EncodedPage> {
3739        let repdef = RepDefBuilder::serialize(repdefs);
3740
3741        // TODO: Actually compress repdef
3742        let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3743            LanceBuffer::reinterpret_slice(rep.clone())
3744        } else {
3745            LanceBuffer::empty()
3746        };
3747
3748        let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3749            LanceBuffer::reinterpret_slice(def.clone())
3750        } else {
3751            LanceBuffer::empty()
3752        };
3753
3754        let description = ProtobufUtils21::all_null_layout(&repdef.def_meaning);
3755        Ok(EncodedPage {
3756            column_idx,
3757            data: vec![rep_bytes, def_bytes],
3758            description: PageEncoding::Structural(description),
3759            num_rows,
3760            row_number,
3761        })
3762    }
3763
3764    #[allow(clippy::too_many_arguments)]
3765    fn encode_miniblock(
3766        column_idx: u32,
3767        field: &Field,
3768        compression_strategy: &dyn CompressionStrategy,
3769        data: DataBlock,
3770        repdefs: Vec<RepDefBuilder>,
3771        row_number: u64,
3772        dictionary_data: Option<DataBlock>,
3773        num_rows: u64,
3774    ) -> Result<EncodedPage> {
3775        let repdef = RepDefBuilder::serialize(repdefs);
3776
3777        if let DataBlock::AllNull(_null_block) = data {
3778            // We should not be using mini-block for all-null.  There are other structural
3779            // encodings for that.
3780            unreachable!()
3781        }
3782
3783        let num_items = data.num_values();
3784
3785        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3786        let (compressed_data, value_encoding) = compressor.compress(data)?;
3787
3788        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3789
3790        let mut compressed_rep = repdef
3791            .rep_slicer()
3792            .map(|rep_slicer| {
3793                Self::compress_levels(
3794                    rep_slicer,
3795                    num_items,
3796                    compression_strategy,
3797                    &compressed_data.chunks,
3798                    max_rep,
3799                )
3800            })
3801            .transpose()?;
3802
3803        let (rep_index, rep_index_depth) =
3804            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3805                Some(rep_index) => (Some(rep_index.clone()), 1),
3806                None => (None, 0),
3807            };
3808
3809        let mut compressed_def = repdef
3810            .def_slicer()
3811            .map(|def_slicer| {
3812                Self::compress_levels(
3813                    def_slicer,
3814                    num_items,
3815                    compression_strategy,
3816                    &compressed_data.chunks,
3817                    /*max_rep=*/ 0,
3818                )
3819            })
3820            .transpose()?;
3821
3822        // TODO: Parquet sparsely encodes values here.  We could do the same but
3823        // then we won't have log2 values per chunk.  This means more metadata
3824        // and potentially more decoder asymmetry.  However, it may be worth
3825        // investigating at some point
3826
3827        let rep_data = compressed_rep
3828            .as_mut()
3829            .map(|cr| std::mem::take(&mut cr.data));
3830        let def_data = compressed_def
3831            .as_mut()
3832            .map(|cd| std::mem::take(&mut cd.data));
3833
3834        let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3835
3836        // Metadata, Data, Dictionary, (maybe) Repetition Index
3837        let mut data = Vec::with_capacity(4);
3838        data.push(serialized.metadata);
3839        data.push(serialized.data);
3840
3841        if let Some(dictionary_data) = dictionary_data {
3842            let num_dictionary_items = dictionary_data.num_values();
3843            // field in `create_block_compressor` is not used currently.
3844            let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3845
3846            let (compressor, dictionary_encoding) = compression_strategy
3847                .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3848            let dictionary_buffer = compressor.compress(dictionary_data)?;
3849
3850            data.push(dictionary_buffer);
3851            if let Some(rep_index) = rep_index {
3852                data.push(rep_index);
3853            }
3854
3855            let description = ProtobufUtils21::miniblock_layout(
3856                compressed_rep.map(|cr| cr.compression),
3857                compressed_def.map(|cd| cd.compression),
3858                value_encoding,
3859                rep_index_depth,
3860                serialized.num_buffers,
3861                Some((dictionary_encoding, num_dictionary_items)),
3862                &repdef.def_meaning,
3863                num_items,
3864            );
3865            Ok(EncodedPage {
3866                num_rows,
3867                column_idx,
3868                data,
3869                description: PageEncoding::Structural(description),
3870                row_number,
3871            })
3872        } else {
3873            let description = ProtobufUtils21::miniblock_layout(
3874                compressed_rep.map(|cr| cr.compression),
3875                compressed_def.map(|cd| cd.compression),
3876                value_encoding,
3877                rep_index_depth,
3878                serialized.num_buffers,
3879                None,
3880                &repdef.def_meaning,
3881                num_items,
3882            );
3883
3884            if let Some(rep_index) = rep_index {
3885                let view = rep_index.borrow_to_typed_slice::<u64>();
3886                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3887                debug_assert_eq!(total, num_rows);
3888
3889                data.push(rep_index);
3890            }
3891
3892            Ok(EncodedPage {
3893                num_rows,
3894                column_idx,
3895                data,
3896                description: PageEncoding::Structural(description),
3897                row_number,
3898            })
3899        }
3900    }
3901
3902    // For fixed-size data we encode < control word | data > for each value
3903    fn serialize_full_zip_fixed(
3904        fixed: FixedWidthDataBlock,
3905        mut repdef: ControlWordIterator,
3906        num_values: u64,
3907    ) -> SerializedFullZip {
3908        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3909        let mut zipped_data = Vec::with_capacity(len);
3910
3911        let max_rep_index_val = if repdef.has_repetition() {
3912            len as u64
3913        } else {
3914            // Setting this to 0 means we won't write a repetition index
3915            0
3916        };
3917        let mut rep_index_builder =
3918            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3919
3920        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
3921        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
3922        assert_eq!(
3923            fixed.bits_per_value % 8,
3924            0,
3925            "Non-byte aligned full-zip compression not yet supported"
3926        );
3927
3928        let bytes_per_value = fixed.bits_per_value as usize / 8;
3929        let mut offset = 0;
3930
3931        if bytes_per_value == 0 {
3932            // No data, just dump the repdef into the buffer
3933            while let Some(control) = repdef.append_next(&mut zipped_data) {
3934                if control.is_new_row {
3935                    // We have finished a row
3936                    debug_assert!(offset <= len);
3937                    // SAFETY: We know that `start <= len`
3938                    unsafe { rep_index_builder.append(offset as u64) };
3939                }
3940                offset = zipped_data.len();
3941            }
3942        } else {
3943            // We have data, zip it with the repdef
3944            let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3945            while let Some(control) = repdef.append_next(&mut zipped_data) {
3946                if control.is_new_row {
3947                    // We have finished a row
3948                    debug_assert!(offset <= len);
3949                    // SAFETY: We know that `start <= len`
3950                    unsafe { rep_index_builder.append(offset as u64) };
3951                }
3952                if control.is_visible {
3953                    let value = data_iter.next().unwrap();
3954                    zipped_data.extend_from_slice(value);
3955                }
3956                offset = zipped_data.len();
3957            }
3958        }
3959
3960        debug_assert_eq!(zipped_data.len(), len);
3961        // Put the final value in the rep index
3962        // SAFETY: `zipped_data.len() == len`
3963        unsafe {
3964            rep_index_builder.append(zipped_data.len() as u64);
3965        }
3966
3967        let zipped_data = LanceBuffer::from(zipped_data);
3968        let rep_index = rep_index_builder.into_data();
3969        let rep_index = if rep_index.is_empty() {
3970            None
3971        } else {
3972            Some(LanceBuffer::from(rep_index))
3973        };
3974        SerializedFullZip {
3975            values: zipped_data,
3976            repetition_index: rep_index,
3977        }
3978    }
3979
3980    // For variable-size data we encode < control word | length | data > for each value
3981    //
3982    // In addition, we create a second buffer, the repetition index
3983    fn serialize_full_zip_variable(
3984        variable: VariableWidthBlock,
3985        mut repdef: ControlWordIterator,
3986        num_items: u64,
3987    ) -> SerializedFullZip {
3988        let bytes_per_offset = variable.bits_per_offset as usize / 8;
3989        assert_eq!(
3990            variable.bits_per_offset % 8,
3991            0,
3992            "Only byte-aligned offsets supported"
3993        );
3994        let len = variable.data.len()
3995            + repdef.bytes_per_word() * num_items as usize
3996            + bytes_per_offset * variable.num_values as usize;
3997        let mut buf = Vec::with_capacity(len);
3998
3999        let max_rep_index_val = len as u64;
4000        let mut rep_index_builder =
4001            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4002
4003        // TODO: byte pack the item lengths with varint encoding
4004        match bytes_per_offset {
4005            4 => {
4006                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4007                let mut rep_offset = 0;
4008                let mut windows_iter = offs.as_ref().windows(2);
4009                while let Some(control) = repdef.append_next(&mut buf) {
4010                    if control.is_new_row {
4011                        // We have finished a row
4012                        debug_assert!(rep_offset <= len);
4013                        // SAFETY: We know that `buf.len() <= len`
4014                        unsafe { rep_index_builder.append(rep_offset as u64) };
4015                    }
4016                    if control.is_visible {
4017                        let window = windows_iter.next().unwrap();
4018                        if control.is_valid_item {
4019                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4020                            buf.extend_from_slice(
4021                                &variable.data[window[0] as usize..window[1] as usize],
4022                            );
4023                        }
4024                    }
4025                    rep_offset = buf.len();
4026                }
4027            }
4028            8 => {
4029                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4030                let mut rep_offset = 0;
4031                let mut windows_iter = offs.as_ref().windows(2);
4032                while let Some(control) = repdef.append_next(&mut buf) {
4033                    if control.is_new_row {
4034                        // We have finished a row
4035                        debug_assert!(rep_offset <= len);
4036                        // SAFETY: We know that `buf.len() <= len`
4037                        unsafe { rep_index_builder.append(rep_offset as u64) };
4038                    }
4039                    if control.is_visible {
4040                        let window = windows_iter.next().unwrap();
4041                        if control.is_valid_item {
4042                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4043                            buf.extend_from_slice(
4044                                &variable.data[window[0] as usize..window[1] as usize],
4045                            );
4046                        }
4047                    }
4048                    rep_offset = buf.len();
4049                }
4050            }
4051            _ => panic!("Unsupported offset size"),
4052        }
4053
4054        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
4055        // if we are over `len` then we have a bug.
4056        debug_assert!(buf.len() <= len);
4057        // Put the final value in the rep index
4058        // SAFETY: `zipped_data.len() == len`
4059        unsafe {
4060            rep_index_builder.append(buf.len() as u64);
4061        }
4062
4063        let zipped_data = LanceBuffer::from(buf);
4064        let rep_index = rep_index_builder.into_data();
4065        debug_assert!(!rep_index.is_empty());
4066        let rep_index = Some(LanceBuffer::from(rep_index));
4067        SerializedFullZip {
4068            values: zipped_data,
4069            repetition_index: rep_index,
4070        }
4071    }
4072
4073    /// Serializes data into a single buffer according to the full-zip format which zips
4074    /// together the repetition, definition, and value data into a single buffer.
4075    fn serialize_full_zip(
4076        compressed_data: PerValueDataBlock,
4077        repdef: ControlWordIterator,
4078        num_items: u64,
4079    ) -> SerializedFullZip {
4080        match compressed_data {
4081            PerValueDataBlock::Fixed(fixed) => {
4082                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4083            }
4084            PerValueDataBlock::Variable(var) => {
4085                Self::serialize_full_zip_variable(var, repdef, num_items)
4086            }
4087        }
4088    }
4089
4090    fn encode_full_zip(
4091        column_idx: u32,
4092        field: &Field,
4093        compression_strategy: &dyn CompressionStrategy,
4094        data: DataBlock,
4095        repdefs: Vec<RepDefBuilder>,
4096        row_number: u64,
4097        num_lists: u64,
4098    ) -> Result<EncodedPage> {
4099        let repdef = RepDefBuilder::serialize(repdefs);
4100        let max_rep = repdef
4101            .repetition_levels
4102            .as_ref()
4103            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4104        let max_def = repdef
4105            .definition_levels
4106            .as_ref()
4107            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4108
4109        // To handle FSL we just flatten
4110        // let data = data.flatten();
4111
4112        let (num_items, num_visible_items) =
4113            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4114                // If there are rep levels there may be "invisible" items and we need to encode
4115                // rep_levels.len() things which might be larger than data.num_values()
4116                (rep_levels.len() as u64, data.num_values())
4117            } else {
4118                // If there are no rep levels then we encode data.num_values() things
4119                (data.num_values(), data.num_values())
4120            };
4121
4122        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4123
4124        let repdef_iter = build_control_word_iterator(
4125            repdef.repetition_levels.as_deref(),
4126            max_rep,
4127            repdef.definition_levels.as_deref(),
4128            max_def,
4129            max_visible_def,
4130            num_items as usize,
4131        );
4132        let bits_rep = repdef_iter.bits_rep();
4133        let bits_def = repdef_iter.bits_def();
4134
4135        let compressor = compression_strategy.create_per_value(field, &data)?;
4136        let (compressed_data, value_encoding) = compressor.compress(data)?;
4137
4138        let description = match &compressed_data {
4139            PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4140                bits_rep,
4141                bits_def,
4142                fixed.bits_per_value as u32,
4143                value_encoding,
4144                &repdef.def_meaning,
4145                num_items as u32,
4146                num_visible_items as u32,
4147            ),
4148            PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4149                bits_rep,
4150                bits_def,
4151                variable.bits_per_offset as u32,
4152                value_encoding,
4153                &repdef.def_meaning,
4154                num_items as u32,
4155                num_visible_items as u32,
4156            ),
4157        };
4158
4159        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4160
4161        let data = if let Some(repindex) = zipped.repetition_index {
4162            vec![zipped.values, repindex]
4163        } else {
4164            vec![zipped.values]
4165        };
4166
4167        Ok(EncodedPage {
4168            num_rows: num_lists,
4169            column_idx,
4170            data,
4171            description: PageEncoding::Structural(description),
4172            row_number,
4173        })
4174    }
4175
4176    /// Estimates the total size of dictionary-encoded data
4177    ///
4178    /// Dictionary encoding splits data into two parts:
4179    /// 1. Dictionary: stores unique values
4180    /// 2. Indices: maps each value to a dictionary entry
4181    ///
4182    /// For FixedWidth (e.g., 128-bit Decimal):
4183    /// - Dictionary: cardinality × 16 bytes (128 bits per value)
4184    /// - Indices: num_values × 4 bytes (32-bit i32)
4185    ///
4186    /// For VariableWidth (strings/binary):
4187    /// - Dictionary values: cardinality × avg_value_size (actual data)
4188    /// - Dictionary offsets: cardinality × offset_size (32 or 64 bits)
4189    /// - Indices: num_values × offset_size (same as dictionary offsets)
4190    fn estimate_dict_size(data_block: &DataBlock) -> Option<u64> {
4191        let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4192            cardinality_array.as_primitive::<UInt64Type>().value(0)
4193        } else {
4194            return None;
4195        };
4196
4197        let num_values = data_block.num_values();
4198
4199        match data_block {
4200            DataBlock::FixedWidth(_) => {
4201                // Dictionary: cardinality unique values at 128 bits each
4202                let dict_size = cardinality * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
4203                // Indices: num_values indices at 32 bits each
4204                let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8);
4205                Some(dict_size + indices_size)
4206            }
4207            DataBlock::VariableWidth(var) => {
4208                // Only 32-bit and 64-bit offsets are supported
4209                if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4210                    return None;
4211                }
4212                let bits_per_offset = var.bits_per_offset as u64;
4213
4214                let data_size = data_block.data_size();
4215                let avg_value_size = data_size / num_values;
4216
4217                // Dictionary values: actual bytes of unique strings/binary
4218                let dict_values_size = cardinality * avg_value_size;
4219                // Dictionary offsets: pointers into dictionary values
4220                let dict_offsets_size = cardinality * (bits_per_offset / 8);
4221                // Indices: map each row to dictionary entry
4222                let indices_size = num_values * (bits_per_offset / 8);
4223
4224                Some(dict_values_size + dict_offsets_size + indices_size)
4225            }
4226            _ => None,
4227        }
4228    }
4229
4230    fn should_dictionary_encode(data_block: &DataBlock, field: &Field) -> bool {
4231        // Since we only dictionary encode FixedWidth and VariableWidth blocks for now, we skip
4232        // estimating the size
4233        if !matches!(
4234            data_block,
4235            DataBlock::FixedWidth(_) | DataBlock::VariableWidth(_)
4236        ) {
4237            return false;
4238        }
4239
4240        // Don't dictionary encode tiny arrays
4241        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4242            .ok()
4243            .and_then(|val| val.parse().ok())
4244            .unwrap_or(100);
4245        if data_block.num_values() < too_small {
4246            return false;
4247        }
4248
4249        // Get size ratio from metadata or env var, default to 0.8
4250        let threshold_ratio = field
4251            .metadata
4252            .get(DICT_SIZE_RATIO_META_KEY)
4253            .and_then(|val| val.parse::<f64>().ok())
4254            .or_else(|| {
4255                env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4256                    .ok()
4257                    .and_then(|val| val.parse().ok())
4258            })
4259            .unwrap_or(0.8);
4260
4261        // Validate size ratio is in valid range
4262        if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4263            panic!(
4264                "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4265                threshold_ratio
4266            );
4267        }
4268
4269        // Get raw data size
4270        let data_size = data_block.data_size();
4271
4272        // Estimate dictionary-encoded size
4273        let Some(encoded_size) = Self::estimate_dict_size(data_block) else {
4274            return false;
4275        };
4276
4277        let size_ratio_actual = if data_size > 0 {
4278            encoded_size as f64 / data_size as f64
4279        } else {
4280            return false;
4281        };
4282        size_ratio_actual < threshold_ratio
4283    }
4284
4285    // Creates an encode task, consuming all buffered data
4286    fn do_flush(
4287        &mut self,
4288        arrays: Vec<ArrayRef>,
4289        repdefs: Vec<RepDefBuilder>,
4290        row_number: u64,
4291        num_rows: u64,
4292    ) -> Result<Vec<EncodeTask>> {
4293        let column_idx = self.column_index;
4294        let compression_strategy = self.compression_strategy.clone();
4295        let field = self.field.clone();
4296        let encoding_metadata = self.encoding_metadata.clone();
4297        let task = spawn_cpu(move || {
4298            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4299
4300            if num_values == 0 {
4301                // We should not encode empty arrays.  So if we get here that should mean that we
4302                // either have all empty lists or all null lists (or a mix).  We still need to encode
4303                // the rep/def information but we can skip the data encoding.
4304                log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
4305                return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4306            }
4307            let num_nulls = arrays
4308                .iter()
4309                .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4310                .sum::<u64>();
4311
4312            if num_values == num_nulls {
4313                return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4314                    log::debug!(
4315                        "Encoding column {} with {} items ({} rows) using simple-null layout",
4316                        column_idx,
4317                        num_values,
4318                        num_rows
4319                    );
4320                    // Simple case, no rep/def and all nulls, we don't need to encode any data
4321                    Self::encode_simple_all_null(column_idx, num_values, row_number)
4322                } else {
4323                    log::debug!(
4324                        "Encoding column {} with {} items ({} rows) using complex-null layout",
4325                        column_idx,
4326                        num_values,
4327                        num_rows
4328                    );
4329                    // If we get here then we have definition levels and we need to store those
4330                    Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4331                };
4332            }
4333
4334            if let DataType::Struct(fields) = &field.data_type() {
4335                if fields.is_empty() {
4336                    if repdefs.iter().any(|rd| !rd.is_empty()) {
4337                        return Err(Error::InvalidInput { source: format!("Empty structs with rep/def information are not yet supported.  The field {} is an empty struct that either has nulls or is in a list.", field.name).into(), location: location!() });
4338                    }
4339                    // This is maybe a little confusing but the reader should never look at this anyways and it
4340                    // seems like overkill to invent a new layout just for "empty structs".
4341                    return Self::encode_simple_all_null(column_idx, num_values, row_number);
4342                }
4343            }
4344
4345            let data_block = DataBlock::from_arrays(&arrays, num_values);
4346
4347            let requires_full_zip_packed_struct =
4348                if let DataBlock::Struct(ref struct_data_block) = data_block {
4349                    struct_data_block.has_variable_width_child()
4350                } else {
4351                    false
4352                };
4353
4354            if requires_full_zip_packed_struct {
4355                log::debug!(
4356                    "Encoding column {} with {} items using full-zip packed struct layout",
4357                    column_idx,
4358                    num_values
4359                );
4360                return Self::encode_full_zip(
4361                    column_idx,
4362                    &field,
4363                    compression_strategy.as_ref(),
4364                    data_block,
4365                    repdefs,
4366                    row_number,
4367                    num_rows,
4368                );
4369            }
4370
4371            if let DataBlock::Dictionary(dict) = data_block {
4372                log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
4373                let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
4374                // TODO: https://github.com/lancedb/lance/issues/4809
4375                // If we compute stats on dictionary_data_block => panic.
4376                // If we don't compute stats on indices_data_block => panic.
4377                // This is messy.  Don't make me call compute_stat ever.
4378                indices_data_block.compute_stat();
4379                Self::encode_miniblock(
4380                    column_idx,
4381                    &field,
4382                    compression_strategy.as_ref(),
4383                    indices_data_block,
4384                    repdefs,
4385                    row_number,
4386                    Some(dictionary_data_block),
4387                    num_rows
4388                )
4389            } else if Self::should_dictionary_encode(&data_block, &field) {
4390                log::debug!(
4391                    "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4392                    column_idx,
4393                    num_values
4394                );
4395                let (indices_data_block, dictionary_data_block) =
4396                    dict::dictionary_encode(data_block);
4397                Self::encode_miniblock(
4398                    column_idx,
4399                    &field,
4400                    compression_strategy.as_ref(),
4401                    indices_data_block,
4402                    repdefs,
4403                    row_number,
4404                    Some(dictionary_data_block),
4405                    num_rows,
4406                )
4407            } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4408                log::debug!(
4409                    "Encoding column {} with {} items using mini-block layout",
4410                    column_idx,
4411                    num_values
4412                );
4413                Self::encode_miniblock(
4414                    column_idx,
4415                    &field,
4416                    compression_strategy.as_ref(),
4417                    data_block,
4418                    repdefs,
4419                    row_number,
4420                    None,
4421                    num_rows,
4422                )
4423            } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4424                log::debug!(
4425                    "Encoding column {} with {} items using full-zip layout",
4426                    column_idx,
4427                    num_values
4428                );
4429                Self::encode_full_zip(
4430                    column_idx,
4431                    &field,
4432                    compression_strategy.as_ref(),
4433                    data_block,
4434                    repdefs,
4435                    row_number,
4436                    num_rows,
4437                )
4438            } else {
4439                Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4440            }
4441        })
4442        .boxed();
4443        Ok(vec![task])
4444    }
4445
4446    fn extract_validity_buf(
4447        array: Arc<dyn Array>,
4448        repdef: &mut RepDefBuilder,
4449        keep_original_array: bool,
4450    ) -> Result<Arc<dyn Array>> {
4451        if let Some(validity) = array.nulls() {
4452            if keep_original_array {
4453                repdef.add_validity_bitmap(validity.clone());
4454            } else {
4455                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4456            }
4457            let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
4458            Ok(make_array(data_no_nulls))
4459        } else {
4460            repdef.add_no_null(array.len());
4461            Ok(array)
4462        }
4463    }
4464
4465    fn extract_validity(
4466        mut array: Arc<dyn Array>,
4467        repdef: &mut RepDefBuilder,
4468        keep_original_array: bool,
4469    ) -> Result<Arc<dyn Array>> {
4470        match array.data_type() {
4471            DataType::Null => {
4472                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4473                Ok(array)
4474            }
4475            DataType::Dictionary(_, _) => {
4476                array = dict::normalize_dict_nulls(array)?;
4477                Self::extract_validity_buf(array, repdef, keep_original_array)
4478            }
4479            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
4480            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
4481            // NOT want to increase the repdef depth.
4482            //
4483            // This would be quite catasrophic for something like vector embeddings.  Imagine we
4484            // had thousands of vectors and some were null but no vector contained null items.  If
4485            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
4486            // with a rep/def value for every single item in the vector.
4487            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4488        }
4489    }
4490}
4491
4492impl FieldEncoder for PrimitiveStructuralEncoder {
4493    // Buffers data, if there is enough to write a page then we create an encode task
4494    fn maybe_encode(
4495        &mut self,
4496        array: ArrayRef,
4497        _external_buffers: &mut OutOfLineBuffers,
4498        mut repdef: RepDefBuilder,
4499        row_number: u64,
4500        num_rows: u64,
4501    ) -> Result<Vec<EncodeTask>> {
4502        let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
4503        self.accumulated_repdefs.push(repdef);
4504
4505        if let Some((arrays, row_number, num_rows)) =
4506            self.accumulation_queue.insert(array, row_number, num_rows)
4507        {
4508            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4509            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4510        } else {
4511            Ok(vec![])
4512        }
4513    }
4514
4515    // If there is any data left in the buffer then create an encode task from it
4516    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4517        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4518            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4519            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4520        } else {
4521            Ok(vec![])
4522        }
4523    }
4524
4525    fn num_columns(&self) -> u32 {
4526        1
4527    }
4528
4529    fn finish(
4530        &mut self,
4531        _external_buffers: &mut OutOfLineBuffers,
4532    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4533        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4534    }
4535}
4536
4537#[cfg(test)]
4538#[allow(clippy::single_range_in_vec_init)]
4539mod tests {
4540    use super::{
4541        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4542        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4543        FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4544        StructuralPageScheduler,
4545    };
4546    use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
4547    use crate::data::BlockInfo;
4548    use crate::decoder::PageEncoding;
4549    use crate::encodings::logical::primitive::{
4550        ChunkDrainInstructions, PrimitiveStructuralEncoder,
4551    };
4552    use crate::format::pb21;
4553    use crate::format::pb21::compressive_encoding::Compression;
4554    use crate::testing::{check_round_trip_encoding_of_data, TestCases};
4555    use crate::version::LanceFileVersion;
4556    use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array};
4557    use arrow_schema::DataType;
4558    use std::collections::HashMap;
4559    use std::{collections::VecDeque, sync::Arc};
4560
4561    #[test]
4562    fn test_is_narrow() {
4563        let int8_array = Int8Array::from(vec![1, 2, 3]);
4564        let array_ref: ArrayRef = Arc::new(int8_array);
4565        let block = DataBlock::from_array(array_ref);
4566
4567        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4568
4569        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4570        let block = DataBlock::from_array(string_array);
4571        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4572
4573        let string_array = StringArray::from(vec![
4574            Some("hello world".repeat(100)),
4575            Some("world".to_string()),
4576        ]);
4577        let block = DataBlock::from_array(string_array);
4578        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4579    }
4580
4581    #[test]
4582    fn test_map_range() {
4583        // Null in the middle
4584        // [[A, B, C], [D, E], NULL, [F, G, H]]
4585        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4586        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4587        let max_visible_def = 0;
4588        let total_items = 8;
4589        let max_rep = 1;
4590
4591        let check = |range, expected_item_range, expected_level_range| {
4592            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4593                range,
4594                rep.as_ref(),
4595                def.as_ref(),
4596                max_rep,
4597                max_visible_def,
4598                total_items,
4599                PreambleAction::Absent,
4600            );
4601            assert_eq!(item_range, expected_item_range);
4602            assert_eq!(level_range, expected_level_range);
4603        };
4604
4605        check(0..1, 0..3, 0..3);
4606        check(1..2, 3..5, 3..5);
4607        check(2..3, 5..5, 5..6);
4608        check(3..4, 5..8, 6..9);
4609        check(0..2, 0..5, 0..5);
4610        check(1..3, 3..5, 3..6);
4611        check(2..4, 5..8, 5..9);
4612        check(0..3, 0..5, 0..6);
4613        check(1..4, 3..8, 3..9);
4614        check(0..4, 0..8, 0..9);
4615
4616        // Null at start
4617        // [NULL, [A, B], [C]]
4618        let rep = Some(vec![1, 1, 0, 1]);
4619        let def = Some(vec![1, 0, 0, 0]);
4620        let max_visible_def = 0;
4621        let total_items = 3;
4622
4623        let check = |range, expected_item_range, expected_level_range| {
4624            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4625                range,
4626                rep.as_ref(),
4627                def.as_ref(),
4628                max_rep,
4629                max_visible_def,
4630                total_items,
4631                PreambleAction::Absent,
4632            );
4633            assert_eq!(item_range, expected_item_range);
4634            assert_eq!(level_range, expected_level_range);
4635        };
4636
4637        check(0..1, 0..0, 0..1);
4638        check(1..2, 0..2, 1..3);
4639        check(2..3, 2..3, 3..4);
4640        check(0..2, 0..2, 0..3);
4641        check(1..3, 0..3, 1..4);
4642        check(0..3, 0..3, 0..4);
4643
4644        // Null at end
4645        // [[A], [B, C], NULL]
4646        let rep = Some(vec![1, 1, 0, 1]);
4647        let def = Some(vec![0, 0, 0, 1]);
4648        let max_visible_def = 0;
4649        let total_items = 3;
4650
4651        let check = |range, expected_item_range, expected_level_range| {
4652            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4653                range,
4654                rep.as_ref(),
4655                def.as_ref(),
4656                max_rep,
4657                max_visible_def,
4658                total_items,
4659                PreambleAction::Absent,
4660            );
4661            assert_eq!(item_range, expected_item_range);
4662            assert_eq!(level_range, expected_level_range);
4663        };
4664
4665        check(0..1, 0..1, 0..1);
4666        check(1..2, 1..3, 1..3);
4667        check(2..3, 3..3, 3..4);
4668        check(0..2, 0..3, 0..3);
4669        check(1..3, 1..3, 1..4);
4670        check(0..3, 0..3, 0..4);
4671
4672        // No nulls, with repetition
4673        // [[A, B], [C, D], [E, F]]
4674        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4675        let def: Option<&[u16]> = None;
4676        let max_visible_def = 0;
4677        let total_items = 6;
4678
4679        let check = |range, expected_item_range, expected_level_range| {
4680            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4681                range,
4682                rep.as_ref(),
4683                def.as_ref(),
4684                max_rep,
4685                max_visible_def,
4686                total_items,
4687                PreambleAction::Absent,
4688            );
4689            assert_eq!(item_range, expected_item_range);
4690            assert_eq!(level_range, expected_level_range);
4691        };
4692
4693        check(0..1, 0..2, 0..2);
4694        check(1..2, 2..4, 2..4);
4695        check(2..3, 4..6, 4..6);
4696        check(0..2, 0..4, 0..4);
4697        check(1..3, 2..6, 2..6);
4698        check(0..3, 0..6, 0..6);
4699
4700        // No repetition, with nulls (this case is trivial)
4701        // [A, B, NULL, C]
4702        let rep: Option<&[u16]> = None;
4703        let def = Some(vec![0, 0, 1, 0]);
4704        let max_visible_def = 1;
4705        let total_items = 4;
4706
4707        let check = |range, expected_item_range, expected_level_range| {
4708            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4709                range,
4710                rep.as_ref(),
4711                def.as_ref(),
4712                max_rep,
4713                max_visible_def,
4714                total_items,
4715                PreambleAction::Absent,
4716            );
4717            assert_eq!(item_range, expected_item_range);
4718            assert_eq!(level_range, expected_level_range);
4719        };
4720
4721        check(0..1, 0..1, 0..1);
4722        check(1..2, 1..2, 1..2);
4723        check(2..3, 2..3, 2..3);
4724        check(0..2, 0..2, 0..2);
4725        check(1..3, 1..3, 1..3);
4726        check(0..3, 0..3, 0..3);
4727
4728        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
4729        // [[..., A] [B, C], NULL]
4730        //
4731        // What we do will depend on the preamble action
4732        let rep = Some(vec![0, 1, 0, 1]);
4733        let def = Some(vec![0, 0, 0, 1]);
4734        let max_visible_def = 0;
4735        let total_items = 3;
4736
4737        let check = |range, expected_item_range, expected_level_range| {
4738            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4739                range,
4740                rep.as_ref(),
4741                def.as_ref(),
4742                max_rep,
4743                max_visible_def,
4744                total_items,
4745                PreambleAction::Take,
4746            );
4747            assert_eq!(item_range, expected_item_range);
4748            assert_eq!(level_range, expected_level_range);
4749        };
4750
4751        // If we are taking the preamble then the range must start at 0
4752        check(0..1, 0..3, 0..3);
4753        check(0..2, 0..3, 0..4);
4754
4755        let check = |range, expected_item_range, expected_level_range| {
4756            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4757                range,
4758                rep.as_ref(),
4759                def.as_ref(),
4760                max_rep,
4761                max_visible_def,
4762                total_items,
4763                PreambleAction::Skip,
4764            );
4765            assert_eq!(item_range, expected_item_range);
4766            assert_eq!(level_range, expected_level_range);
4767        };
4768
4769        check(0..1, 1..3, 1..3);
4770        check(1..2, 3..3, 3..4);
4771        check(0..2, 1..3, 1..4);
4772
4773        // Another preamble case but now it doesn't end with a new list
4774        // [[..., A], NULL, [D, E]]
4775        //
4776        // What we do will depend on the preamble action
4777        let rep = Some(vec![0, 1, 1, 0]);
4778        let def = Some(vec![0, 1, 0, 0]);
4779        let max_visible_def = 0;
4780        let total_items = 4;
4781
4782        let check = |range, expected_item_range, expected_level_range| {
4783            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4784                range,
4785                rep.as_ref(),
4786                def.as_ref(),
4787                max_rep,
4788                max_visible_def,
4789                total_items,
4790                PreambleAction::Take,
4791            );
4792            assert_eq!(item_range, expected_item_range);
4793            assert_eq!(level_range, expected_level_range);
4794        };
4795
4796        // If we are taking the preamble then the range must start at 0
4797        check(0..1, 0..1, 0..2);
4798        check(0..2, 0..3, 0..4);
4799
4800        let check = |range, expected_item_range, expected_level_range| {
4801            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4802                range,
4803                rep.as_ref(),
4804                def.as_ref(),
4805                max_rep,
4806                max_visible_def,
4807                total_items,
4808                PreambleAction::Skip,
4809            );
4810            assert_eq!(item_range, expected_item_range);
4811            assert_eq!(level_range, expected_level_range);
4812        };
4813
4814        // If we are taking the preamble then the range must start at 0
4815        check(0..1, 1..1, 1..2);
4816        check(1..2, 1..3, 2..4);
4817        check(0..2, 1..3, 1..4);
4818
4819        // Now a preamble case without any definition levels
4820        // [[..., A] [B, C], [D]]
4821        let rep = Some(vec![0, 1, 0, 1]);
4822        let def: Option<Vec<u16>> = None;
4823        let max_visible_def = 0;
4824        let total_items = 4;
4825
4826        let check = |range, expected_item_range, expected_level_range| {
4827            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4828                range,
4829                rep.as_ref(),
4830                def.as_ref(),
4831                max_rep,
4832                max_visible_def,
4833                total_items,
4834                PreambleAction::Take,
4835            );
4836            assert_eq!(item_range, expected_item_range);
4837            assert_eq!(level_range, expected_level_range);
4838        };
4839
4840        // If we are taking the preamble then the range must start at 0
4841        check(0..1, 0..3, 0..3);
4842        check(0..2, 0..4, 0..4);
4843
4844        let check = |range, expected_item_range, expected_level_range| {
4845            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4846                range,
4847                rep.as_ref(),
4848                def.as_ref(),
4849                max_rep,
4850                max_visible_def,
4851                total_items,
4852                PreambleAction::Skip,
4853            );
4854            assert_eq!(item_range, expected_item_range);
4855            assert_eq!(level_range, expected_level_range);
4856        };
4857
4858        check(0..1, 1..3, 1..3);
4859        check(1..2, 3..4, 3..4);
4860        check(0..2, 1..4, 1..4);
4861
4862        // If we have nested lists then non-top level lists may be empty/null
4863        // and we need to make sure we still handle them as invisible items (we
4864        // failed to do this previously)
4865        let rep = Some(vec![2, 1, 2, 0, 1, 2]);
4866        let def = Some(vec![0, 1, 2, 0, 0, 0]);
4867        let max_rep = 2;
4868        let max_visible_def = 0;
4869        let total_items = 4;
4870
4871        let check = |range, expected_item_range, expected_level_range| {
4872            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4873                range,
4874                rep.as_ref(),
4875                def.as_ref(),
4876                max_rep,
4877                max_visible_def,
4878                total_items,
4879                PreambleAction::Absent,
4880            );
4881            assert_eq!(item_range, expected_item_range);
4882            assert_eq!(level_range, expected_level_range);
4883        };
4884
4885        check(0..3, 0..4, 0..6);
4886        check(0..1, 0..1, 0..2);
4887        check(1..2, 1..3, 2..5);
4888        check(2..3, 3..4, 5..6);
4889
4890        // Invisible items in a preamble that we are taking (regressing a previous failure)
4891        let rep = Some(vec![0, 0, 1, 0, 1, 1]);
4892        let def = Some(vec![0, 1, 0, 0, 0, 0]);
4893        let max_rep = 1;
4894        let max_visible_def = 0;
4895        let total_items = 5;
4896
4897        let check = |range, expected_item_range, expected_level_range| {
4898            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4899                range,
4900                rep.as_ref(),
4901                def.as_ref(),
4902                max_rep,
4903                max_visible_def,
4904                total_items,
4905                PreambleAction::Take,
4906            );
4907            assert_eq!(item_range, expected_item_range);
4908            assert_eq!(level_range, expected_level_range);
4909        };
4910
4911        check(0..0, 0..1, 0..2);
4912        check(0..1, 0..3, 0..4);
4913        check(0..2, 0..4, 0..5);
4914
4915        // Skip preamble (with invis items) and skip a few rows (with invis items)
4916        // and then take a few rows but not all the rows
4917        let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
4918        let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
4919        let max_rep = 1;
4920        let max_visible_def = 0;
4921        let total_items = 5;
4922
4923        let check = |range, expected_item_range, expected_level_range| {
4924            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4925                range,
4926                rep.as_ref(),
4927                def.as_ref(),
4928                max_rep,
4929                max_visible_def,
4930                total_items,
4931                PreambleAction::Skip,
4932            );
4933            assert_eq!(item_range, expected_item_range);
4934            assert_eq!(level_range, expected_level_range);
4935        };
4936
4937        check(2..3, 2..4, 5..7);
4938    }
4939
4940    #[test]
4941    fn test_schedule_instructions() {
4942        // Convert repetition index to bytes for testing
4943        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
4944        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
4945        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
4946
4947        let check = |user_ranges, expected_instructions| {
4948            let instructions =
4949                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4950            assert_eq!(instructions, expected_instructions);
4951        };
4952
4953        // The instructions we expect if we're grabbing the whole range
4954        let expected_take_all = vec![
4955            ChunkInstructions {
4956                chunk_idx: 0,
4957                preamble: PreambleAction::Absent,
4958                rows_to_skip: 0,
4959                rows_to_take: 6,
4960                take_trailer: true,
4961            },
4962            ChunkInstructions {
4963                chunk_idx: 1,
4964                preamble: PreambleAction::Take,
4965                rows_to_skip: 0,
4966                rows_to_take: 2,
4967                take_trailer: false,
4968            },
4969            ChunkInstructions {
4970                chunk_idx: 2,
4971                preamble: PreambleAction::Absent,
4972                rows_to_skip: 0,
4973                rows_to_take: 5,
4974                take_trailer: true,
4975            },
4976            ChunkInstructions {
4977                chunk_idx: 3,
4978                preamble: PreambleAction::Take,
4979                rows_to_skip: 0,
4980                rows_to_take: 1,
4981                take_trailer: false,
4982            },
4983        ];
4984
4985        // Take all as 1 range
4986        check(&[0..14], expected_take_all.clone());
4987
4988        // Take all a individual rows
4989        check(
4990            &[
4991                0..1,
4992                1..2,
4993                2..3,
4994                3..4,
4995                4..5,
4996                5..6,
4997                6..7,
4998                7..8,
4999                8..9,
5000                9..10,
5001                10..11,
5002                11..12,
5003                12..13,
5004                13..14,
5005            ],
5006            expected_take_all,
5007        );
5008
5009        // Test some partial takes
5010
5011        // 2 rows in the same chunk but not contiguous
5012        check(
5013            &[0..1, 3..4],
5014            vec![
5015                ChunkInstructions {
5016                    chunk_idx: 0,
5017                    preamble: PreambleAction::Absent,
5018                    rows_to_skip: 0,
5019                    rows_to_take: 1,
5020                    take_trailer: false,
5021                },
5022                ChunkInstructions {
5023                    chunk_idx: 0,
5024                    preamble: PreambleAction::Absent,
5025                    rows_to_skip: 3,
5026                    rows_to_take: 1,
5027                    take_trailer: false,
5028                },
5029            ],
5030        );
5031
5032        // Taking just a trailer/preamble
5033        check(
5034            &[5..6],
5035            vec![
5036                ChunkInstructions {
5037                    chunk_idx: 0,
5038                    preamble: PreambleAction::Absent,
5039                    rows_to_skip: 5,
5040                    rows_to_take: 1,
5041                    take_trailer: true,
5042                },
5043                ChunkInstructions {
5044                    chunk_idx: 1,
5045                    preamble: PreambleAction::Take,
5046                    rows_to_skip: 0,
5047                    rows_to_take: 0,
5048                    take_trailer: false,
5049                },
5050            ],
5051        );
5052
5053        // Skipping an entire chunk
5054        check(
5055            &[7..10],
5056            vec![
5057                ChunkInstructions {
5058                    chunk_idx: 1,
5059                    preamble: PreambleAction::Skip,
5060                    rows_to_skip: 1,
5061                    rows_to_take: 1,
5062                    take_trailer: false,
5063                },
5064                ChunkInstructions {
5065                    chunk_idx: 2,
5066                    preamble: PreambleAction::Absent,
5067                    rows_to_skip: 0,
5068                    rows_to_take: 2,
5069                    take_trailer: false,
5070                },
5071            ],
5072        );
5073    }
5074
5075    #[test]
5076    fn test_drain_instructions() {
5077        fn drain_from_instructions(
5078            instructions: &mut VecDeque<ChunkInstructions>,
5079            mut rows_desired: u64,
5080            need_preamble: &mut bool,
5081            skip_in_chunk: &mut u64,
5082        ) -> Vec<ChunkDrainInstructions> {
5083            // Note: instructions.len() is an upper bound, we typically take much fewer
5084            let mut drain_instructions = Vec::with_capacity(instructions.len());
5085            while rows_desired > 0 || *need_preamble {
5086                let (next_instructions, consumed_chunk) = instructions
5087                    .front()
5088                    .unwrap()
5089                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5090                if consumed_chunk {
5091                    instructions.pop_front();
5092                }
5093                drain_instructions.push(next_instructions);
5094            }
5095            drain_instructions
5096        }
5097
5098        // Convert repetition index to bytes for testing
5099        let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5100        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5101        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5102        let user_ranges = vec![1..7, 10..14];
5103
5104        // First, schedule the ranges
5105        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5106
5107        let mut to_drain = VecDeque::from(scheduled.clone());
5108
5109        // Now we drain in batches of 4
5110
5111        let mut need_preamble = false;
5112        let mut skip_in_chunk = 0;
5113
5114        let next_batch =
5115            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5116
5117        assert!(!need_preamble);
5118        assert_eq!(skip_in_chunk, 4);
5119        assert_eq!(
5120            next_batch,
5121            vec![ChunkDrainInstructions {
5122                chunk_instructions: scheduled[0].clone(),
5123                rows_to_take: 4,
5124                rows_to_skip: 0,
5125                preamble_action: PreambleAction::Absent,
5126            }]
5127        );
5128
5129        let next_batch =
5130            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5131
5132        assert!(!need_preamble);
5133        assert_eq!(skip_in_chunk, 2);
5134
5135        assert_eq!(
5136            next_batch,
5137            vec![
5138                ChunkDrainInstructions {
5139                    chunk_instructions: scheduled[0].clone(),
5140                    rows_to_take: 1,
5141                    rows_to_skip: 4,
5142                    preamble_action: PreambleAction::Absent,
5143                },
5144                ChunkDrainInstructions {
5145                    chunk_instructions: scheduled[1].clone(),
5146                    rows_to_take: 1,
5147                    rows_to_skip: 0,
5148                    preamble_action: PreambleAction::Take,
5149                },
5150                ChunkDrainInstructions {
5151                    chunk_instructions: scheduled[2].clone(),
5152                    rows_to_take: 2,
5153                    rows_to_skip: 0,
5154                    preamble_action: PreambleAction::Absent,
5155                }
5156            ]
5157        );
5158
5159        let next_batch =
5160            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5161
5162        assert!(!need_preamble);
5163        assert_eq!(skip_in_chunk, 0);
5164
5165        assert_eq!(
5166            next_batch,
5167            vec![
5168                ChunkDrainInstructions {
5169                    chunk_instructions: scheduled[2].clone(),
5170                    rows_to_take: 1,
5171                    rows_to_skip: 2,
5172                    preamble_action: PreambleAction::Absent,
5173                },
5174                ChunkDrainInstructions {
5175                    chunk_instructions: scheduled[3].clone(),
5176                    rows_to_take: 1,
5177                    rows_to_skip: 0,
5178                    preamble_action: PreambleAction::Take,
5179                },
5180            ]
5181        );
5182
5183        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
5184        let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
5185        let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5186        let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5187        let user_ranges = vec![0..28];
5188
5189        // First, schedule the ranges
5190        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5191
5192        let mut to_drain = VecDeque::from(scheduled.clone());
5193
5194        // Drain first chunk and some of second chunk
5195
5196        let mut need_preamble = false;
5197        let mut skip_in_chunk = 0;
5198
5199        let next_batch =
5200            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5201
5202        assert_eq!(
5203            next_batch,
5204            vec![
5205                ChunkDrainInstructions {
5206                    chunk_instructions: scheduled[0].clone(),
5207                    rows_to_take: 6,
5208                    rows_to_skip: 0,
5209                    preamble_action: PreambleAction::Absent,
5210                },
5211                ChunkDrainInstructions {
5212                    chunk_instructions: scheduled[1].clone(),
5213                    rows_to_take: 1,
5214                    rows_to_skip: 0,
5215                    preamble_action: PreambleAction::Take,
5216                },
5217            ]
5218        );
5219
5220        assert!(!need_preamble);
5221        assert_eq!(skip_in_chunk, 1);
5222
5223        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
5224        // we get a drain task to take the preamble of the third chunk (and nothing else)
5225        let next_batch =
5226            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5227
5228        assert_eq!(
5229            next_batch,
5230            vec![
5231                ChunkDrainInstructions {
5232                    chunk_instructions: scheduled[1].clone(),
5233                    rows_to_take: 2,
5234                    rows_to_skip: 1,
5235                    preamble_action: PreambleAction::Skip,
5236                },
5237                ChunkDrainInstructions {
5238                    chunk_instructions: scheduled[2].clone(),
5239                    rows_to_take: 0,
5240                    rows_to_skip: 0,
5241                    preamble_action: PreambleAction::Take,
5242                },
5243            ]
5244        );
5245
5246        assert!(!need_preamble);
5247        assert_eq!(skip_in_chunk, 0);
5248    }
5249
5250    #[tokio::test]
5251    async fn test_fullzip_repetition_index_caching() {
5252        use crate::testing::SimulatedScheduler;
5253
5254        // Simplified FixedPerValueDecompressor for testing
5255        #[derive(Debug)]
5256        struct TestFixedDecompressor;
5257
5258        impl FixedPerValueDecompressor for TestFixedDecompressor {
5259            fn decompress(
5260                &self,
5261                _data: FixedWidthDataBlock,
5262                _num_rows: u64,
5263            ) -> crate::Result<DataBlock> {
5264                unimplemented!("Test decompressor")
5265            }
5266
5267            fn bits_per_value(&self) -> u64 {
5268                32
5269            }
5270        }
5271
5272        // Create test repetition index data
5273        let rows_in_page = 100u64;
5274        let bytes_per_value = 4u64;
5275        let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
5276
5277        // Create mock repetition index data
5278        let mut rep_index_data = Vec::new();
5279        for i in 0..=rows_in_page {
5280            let offset = (i * 100) as u32; // Each row starts at i * 100 bytes
5281            rep_index_data.extend_from_slice(&offset.to_le_bytes());
5282        }
5283
5284        // Simulate storage with the repetition index at position 1000
5285        let mut full_data = vec![0u8; 1000];
5286        full_data.extend_from_slice(&rep_index_data);
5287        full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after
5288
5289        let data = bytes::Bytes::from(full_data);
5290        let io = Arc::new(SimulatedScheduler::new(data));
5291        let _cache = Arc::new(lance_core::cache::LanceCache::with_capacity(1024 * 1024));
5292
5293        // Create FullZipScheduler with repetition index
5294        let mut scheduler = FullZipScheduler {
5295            data_buf_position: 0,
5296            rep_index: Some(FullZipRepIndexDetails {
5297                buf_position: 1000,
5298                bytes_per_value,
5299            }),
5300            priority: 0,
5301            rows_in_page,
5302            bits_per_offset: 32,
5303            details: Arc::new(FullZipDecodeDetails {
5304                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5305                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5306                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5307                max_rep: 0,
5308                max_visible_def: 0,
5309            }),
5310            cached_state: None,
5311            enable_cache: true, // Enable caching for test
5312        };
5313
5314        // First initialization should load and cache the repetition index
5315        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5316        let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5317
5318        // Verify that we got a FullZipCacheableState (not NoCachedPageData)
5319        let is_cached = cached_data1
5320            .clone()
5321            .as_arc_any()
5322            .downcast::<FullZipCacheableState>()
5323            .is_ok();
5324        assert!(
5325            is_cached,
5326            "Expected FullZipCacheableState, got NoCachedPageData"
5327        );
5328
5329        // Load the cached data into the scheduler
5330        scheduler.load(&cached_data1);
5331
5332        // Verify that cached_state is now populated
5333        assert!(
5334            scheduler.cached_state.is_some(),
5335            "cached_state should be populated after load"
5336        );
5337
5338        // Verify the cached data contains the repetition index
5339        let cached_state = scheduler.cached_state.as_ref().unwrap();
5340
5341        // Test that schedule_ranges_rep uses the cached data
5342        let ranges = vec![0..10, 20..30];
5343        let result = scheduler.schedule_ranges_rep(
5344            &ranges,
5345            &io_dyn,
5346            FullZipRepIndexDetails {
5347                buf_position: 1000,
5348                bytes_per_value,
5349            },
5350        );
5351
5352        // The result should be OK (not an error)
5353        assert!(
5354            result.is_ok(),
5355            "schedule_ranges_rep should succeed with cached data"
5356        );
5357
5358        // Second scheduler instance should be able to use the cached data
5359        let mut scheduler2 = FullZipScheduler {
5360            data_buf_position: 0,
5361            rep_index: Some(FullZipRepIndexDetails {
5362                buf_position: 1000,
5363                bytes_per_value,
5364            }),
5365            priority: 0,
5366            rows_in_page,
5367            bits_per_offset: 32,
5368            details: scheduler.details.clone(),
5369            cached_state: None,
5370            enable_cache: true, // Enable caching for test
5371        };
5372
5373        // Load cached data from the first scheduler
5374        scheduler2.load(&cached_data1);
5375        assert!(
5376            scheduler2.cached_state.is_some(),
5377            "Second scheduler should have cached_state after load"
5378        );
5379
5380        // Verify that both schedulers have the same cached data
5381        let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5382        assert!(
5383            Arc::ptr_eq(cached_state, cached_state2),
5384            "Both schedulers should share the same cached data"
5385        );
5386    }
5387
5388    #[tokio::test]
5389    async fn test_fullzip_cache_config_controls_caching() {
5390        use crate::testing::SimulatedScheduler;
5391
5392        // Simplified FixedPerValueDecompressor for testing
5393        #[derive(Debug)]
5394        struct TestFixedDecompressor;
5395
5396        impl FixedPerValueDecompressor for TestFixedDecompressor {
5397            fn decompress(
5398                &self,
5399                _data: FixedWidthDataBlock,
5400                _num_rows: u64,
5401            ) -> crate::Result<DataBlock> {
5402                unimplemented!("Test decompressor")
5403            }
5404
5405            fn bits_per_value(&self) -> u64 {
5406                32
5407            }
5408        }
5409
5410        // Test that enable_cache flag actually controls caching behavior
5411        let rows_in_page = 1000_u64;
5412        let bytes_per_value = 4_u64;
5413
5414        // Create simulated data
5415        let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5416        let value_data = vec![0u8; 4000]; // Dummy value data
5417        let mut full_data = vec![0u8; 1000]; // Padding before rep index
5418        full_data.extend_from_slice(&rep_index_data);
5419        full_data.extend_from_slice(&value_data);
5420
5421        let data = bytes::Bytes::from(full_data);
5422        let io = Arc::new(SimulatedScheduler::new(data));
5423
5424        // Test 1: With caching disabled
5425        let mut scheduler_no_cache = FullZipScheduler {
5426            data_buf_position: 0,
5427            rep_index: Some(FullZipRepIndexDetails {
5428                buf_position: 1000,
5429                bytes_per_value,
5430            }),
5431            priority: 0,
5432            rows_in_page,
5433            bits_per_offset: 32,
5434            details: Arc::new(FullZipDecodeDetails {
5435                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5436                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5437                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5438                max_rep: 0,
5439                max_visible_def: 0,
5440            }),
5441            cached_state: None,
5442            enable_cache: false, // Caching disabled
5443        };
5444
5445        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5446        let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5447
5448        // Should return NoCachedPageData when caching is disabled
5449        assert!(
5450            cached_data
5451                .as_arc_any()
5452                .downcast_ref::<super::NoCachedPageData>()
5453                .is_some(),
5454            "With enable_cache=false, should return NoCachedPageData"
5455        );
5456
5457        // Test 2: With caching enabled
5458        let mut scheduler_with_cache = FullZipScheduler {
5459            data_buf_position: 0,
5460            rep_index: Some(FullZipRepIndexDetails {
5461                buf_position: 1000,
5462                bytes_per_value,
5463            }),
5464            priority: 0,
5465            rows_in_page,
5466            bits_per_offset: 32,
5467            details: Arc::new(FullZipDecodeDetails {
5468                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5469                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5470                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5471                max_rep: 0,
5472                max_visible_def: 0,
5473            }),
5474            cached_state: None,
5475            enable_cache: true, // Caching enabled
5476        };
5477
5478        let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5479
5480        // Should return FullZipCacheableState when caching is enabled
5481        assert!(
5482            cached_data2
5483                .as_arc_any()
5484                .downcast_ref::<super::FullZipCacheableState>()
5485                .is_some(),
5486            "With enable_cache=true, should return FullZipCacheableState"
5487        );
5488    }
5489
5490    /// This test is used to reproduce fuzz test https://github.com/lancedb/lance/issues/4492
5491    #[tokio::test]
5492    async fn test_fuzz_issue_4492_empty_rep_values() {
5493        use lance_datagen::{array, gen_batch, RowCount, Seed};
5494
5495        let seed = 1823859942947654717u64;
5496        let num_rows = 2741usize;
5497
5498        // Generate the exact same data that caused the failure
5499        let batch_gen = gen_batch().with_seed(Seed::from(seed));
5500        let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
5501        let list_generator = array::rand_list_any(base_generator, false);
5502
5503        let batch = batch_gen
5504            .anon_col(list_generator)
5505            .into_batch_rows(RowCount::from(num_rows as u64))
5506            .unwrap();
5507
5508        let list_array = batch.column(0).clone();
5509
5510        // Force miniblock encoding
5511        let mut metadata = HashMap::new();
5512        metadata.insert(
5513            STRUCTURAL_ENCODING_META_KEY.to_string(),
5514            STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5515        );
5516
5517        let test_cases = TestCases::default()
5518            .with_min_file_version(LanceFileVersion::V2_1)
5519            .with_batch_size(100)
5520            .with_range(0..num_rows.min(500) as u64)
5521            .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
5522
5523        check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
5524    }
5525
5526    #[tokio::test]
5527    async fn test_large_dictionary_general_compression() {
5528        use arrow_array::{ArrayRef, StringArray};
5529        use std::collections::HashMap;
5530        use std::sync::Arc;
5531
5532        // Create large string dictionary data (>32KiB) with low cardinality
5533        // Use 100 unique strings, each 500 bytes long = 50KB dictionary
5534        let unique_values: Vec<String> = (0..100)
5535            .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
5536            .collect();
5537
5538        // Repeat these strings many times to create a large array
5539        let repeated_strings: Vec<_> = unique_values
5540            .iter()
5541            .cycle()
5542            .take(100_000)
5543            .map(|s| Some(s.as_str()))
5544            .collect();
5545
5546        let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
5547
5548        // Configure test to use V2_2 and verify encoding
5549        let test_cases = TestCases::default()
5550            .with_min_file_version(LanceFileVersion::V2_2)
5551            .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
5552                assert_eq!(cols.len(), 1);
5553                let col = &cols[0];
5554
5555                // Navigate to the dictionary encoding in the page layout
5556                if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) {
5557                    // Check that dictionary is wrapped with general compression
5558                    if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout {
5559                        if let Some(dictionary_encoding) = &mini_block.dictionary {
5560                            match dictionary_encoding.compression.as_ref() {
5561                                Some(Compression::General(general)) => {
5562                                    // Verify it's using LZ4 or Zstd
5563                                    let compression = general.compression.as_ref().unwrap();
5564                                    assert!(
5565                                        compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4
5566                                        || compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd,
5567                                        "Expected LZ4 or Zstd compression for large dictionary"
5568                                    );
5569                                }
5570                                _ => panic!("Expected General compression for large dictionary"),
5571                            }
5572                        }
5573                    }
5574                }
5575            }));
5576
5577        check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
5578    }
5579
5580    // Dictionary encoding decision tests
5581    /// Helper to create FixedWidth test data block with exact cardinality stat injected
5582    /// to ensure consistent test behavior (avoids HLL estimation error)
5583    fn create_test_fixed_data_block(num_values: u64, cardinality: u64) -> DataBlock {
5584        use crate::statistics::Stat;
5585
5586        let block_info = BlockInfo::default();
5587
5588        // Manually inject exact cardinality stat for consistent test behavior
5589        let cardinality_array = Arc::new(UInt64Array::from(vec![cardinality]));
5590        block_info
5591            .0
5592            .write()
5593            .unwrap()
5594            .insert(Stat::Cardinality, cardinality_array);
5595
5596        DataBlock::FixedWidth(FixedWidthDataBlock {
5597            bits_per_value: 32,
5598            data: crate::buffer::LanceBuffer::from(vec![0u8; (num_values * 4) as usize]),
5599            num_values,
5600            block_info,
5601        })
5602    }
5603
5604    /// Helper to create VariableWidth (string) test data block with exact cardinality
5605    fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
5606        use crate::statistics::Stat;
5607        use arrow_array::StringArray;
5608
5609        assert!(cardinality <= num_values && cardinality > 0);
5610
5611        let mut values = Vec::with_capacity(num_values as usize);
5612        for i in 0..num_values {
5613            values.push(format!("value_{:016}", i % cardinality));
5614        }
5615
5616        let array = StringArray::from(values);
5617        let block = DataBlock::from_array(Arc::new(array) as ArrayRef);
5618
5619        // Manually inject stats for consistent test behavior
5620        if let DataBlock::VariableWidth(ref var_block) = block {
5621            let mut info = var_block.block_info.0.write().unwrap();
5622            // Cardinality: exact value to avoid HLL estimation error
5623            info.insert(
5624                Stat::Cardinality,
5625                Arc::new(UInt64Array::from(vec![cardinality])),
5626            );
5627        }
5628
5629        block
5630    }
5631
5632    #[test]
5633    fn test_estimate_dict_size_fixed_width() {
5634        use crate::encodings::logical::primitive::dict::{
5635            DICT_FIXED_WIDTH_BITS_PER_VALUE, DICT_INDICES_BITS_PER_VALUE,
5636        };
5637
5638        let block = create_test_fixed_data_block(1000, 400);
5639        let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5640
5641        // Dictionary: 400 * 16 bytes (128-bit values)
5642        // Indices: 1000 * 4 bytes (32-bit i32)
5643        let expected_dict_size = 400 * (DICT_FIXED_WIDTH_BITS_PER_VALUE / 8);
5644        let expected_indices_size = 1000 * (DICT_INDICES_BITS_PER_VALUE / 8);
5645        let expected_total = expected_dict_size + expected_indices_size;
5646
5647        assert_eq!(estimated_size, expected_total);
5648    }
5649
5650    #[test]
5651    fn test_estimate_dict_size_variable_width() {
5652        let block = create_test_variable_width_block(1000, 400);
5653        let estimated_size = PrimitiveStructuralEncoder::estimate_dict_size(&block).unwrap();
5654
5655        // Get actual data size
5656        let data_size = block.data_size();
5657        let avg_value_size = data_size / 1000;
5658
5659        let expected = 400 * avg_value_size + 400 * 4 + 1000 * 4;
5660
5661        assert_eq!(estimated_size, expected);
5662    }
5663
5664    #[test]
5665    fn test_should_dictionary_encode() {
5666        use crate::constants::DICT_SIZE_RATIO_META_KEY;
5667        use lance_core::datatypes::Field as LanceField;
5668
5669        // Create data where dict encoding saves space
5670        let block = create_test_variable_width_block(1000, 10);
5671
5672        let mut metadata = HashMap::new();
5673        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5674        let arrow_field =
5675            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5676        let field = LanceField::try_from(&arrow_field).unwrap();
5677
5678        let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5679
5680        assert!(result, "Should use dictionary encode based on size");
5681    }
5682
5683    #[test]
5684    fn test_should_not_dictionary_encode() {
5685        use crate::constants::DICT_SIZE_RATIO_META_KEY;
5686        use lance_core::datatypes::Field as LanceField;
5687
5688        let block = create_test_fixed_data_block(1000, 10);
5689
5690        let mut metadata = HashMap::new();
5691        metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5692        let arrow_field =
5693            arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5694        let field = LanceField::try_from(&arrow_field).unwrap();
5695
5696        let result = PrimitiveStructuralEncoder::should_dictionary_encode(&block, &field);
5697
5698        assert!(!result, "Should not use dictionary encode based on size");
5699    }
5700}