lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use arrow::array::AsArray;
16use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
17use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
18use arrow_schema::{DataType, Field as ArrowField};
19use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
20use itertools::Itertools;
21use lance_arrow::deepcopy::deep_copy_nulls;
22use lance_core::{
23    cache::{Context, DeepSizeOf},
24    datatypes::{
25        STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
26    },
27    error::Error,
28    utils::bit::pad_bytes,
29    utils::hash::U8SliceKey,
30};
31use log::trace;
32use snafu::location;
33
34use crate::{
35    compression::{
36        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
37    },
38    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
39    utils::bytepack::BytepackedIntegerEncoder,
40};
41use crate::{
42    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
43    encodings::logical::primitive::fullzip::PerValueDataBlock,
44};
45use crate::{
46    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
47};
48use crate::{
49    encodings::logical::primitive::miniblock::MiniBlockCompressed,
50    statistics::{ComputeStat, GetStat, Stat},
51};
52use crate::{
53    repdef::{
54        build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
55        ControlWordParser, DefinitionInterpretation, RepDefSlicer,
56    },
57    utils::accumulation::AccumulationQueue,
58};
59use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
60
61use crate::{
62    buffer::LanceBuffer,
63    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
64    decoder::{
65        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPage,
66        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
67        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
68        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
69    },
70    encoder::{
71        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
72    },
73    format::{pb, ProtobufUtils},
74    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
75    EncodingsIo,
76};
77
78pub mod fullzip;
79pub mod miniblock;
80
81const FILL_BYTE: u8 = 0xFE;
82
83/// A trait for figuring out how to schedule the data within
84/// a single page.
85trait StructuralPageScheduler: std::fmt::Debug + Send {
86    /// Fetches any metadata required for the page
87    fn initialize<'a>(
88        &'a mut self,
89        io: &Arc<dyn EncodingsIo>,
90    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
91    /// Loads metadata from a previous initialize call
92    fn load(&mut self, data: &Arc<dyn CachedPageData>);
93    /// Schedules the read of the given ranges in the page
94    fn schedule_ranges(
95        &self,
96        ranges: &[Range<u64>],
97        io: &Arc<dyn EncodingsIo>,
98    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
99}
100
101/// Metadata describing the decoded size of a mini-block
102#[derive(Debug)]
103struct ChunkMeta {
104    num_values: u64,
105    chunk_size_bytes: u64,
106    offset_bytes: u64,
107}
108
109/// A mini-block chunk that has been decoded and decompressed
110#[derive(Debug)]
111struct DecodedMiniBlockChunk {
112    rep: Option<ScalarBuffer<u16>>,
113    def: Option<ScalarBuffer<u16>>,
114    values: DataBlock,
115}
116
117/// A task to decode a one or more mini-blocks of data into an output batch
118///
119/// Note: Two batches might share the same mini-block of data.  When this happens
120/// then each batch gets a copy of the block and each batch decodes the block independently.
121///
122/// This means we have duplicated work but it is necessary to avoid having to synchronize
123/// the decoding of the block. (TODO: test this theory)
124#[derive(Debug)]
125struct DecodeMiniBlockTask {
126    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
127    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
128    value_decompressor: Arc<dyn MiniBlockDecompressor>,
129    dictionary_data: Option<Arc<DataBlock>>,
130    def_meaning: Arc<[DefinitionInterpretation]>,
131    num_buffers: u64,
132    max_visible_level: u16,
133    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
134}
135
136impl DecodeMiniBlockTask {
137    fn decode_levels(
138        rep_decompressor: &dyn BlockDecompressor,
139        levels: LanceBuffer,
140        num_levels: u16,
141    ) -> Result<ScalarBuffer<u16>> {
142        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
143        let mut rep = rep.as_fixed_width().unwrap();
144        debug_assert_eq!(rep.num_values, num_levels as u64);
145        debug_assert_eq!(rep.bits_per_value, 16);
146        Ok(rep.data.borrow_to_typed_slice::<u16>())
147    }
148
149    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
150    // values from `level_buf` starting at `offset`.
151    //
152    // We need to handle both the case where `levels` is None (no nulls encountered
153    // yet) and the case where `level_buf` is None (the input we are copying from has
154    // no nulls)
155    fn extend_levels(
156        range: Range<u64>,
157        levels: &mut Option<LevelBuffer>,
158        level_buf: &Option<impl AsRef<[u16]>>,
159        dest_offset: usize,
160    ) {
161        if let Some(level_buf) = level_buf {
162            if levels.is_none() {
163                // This is the first non-empty def buf we've hit, fill in the past
164                // with 0 (valid)
165                let mut new_levels_vec =
166                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
167                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
168                *levels = Some(new_levels_vec);
169            }
170            levels.as_mut().unwrap().extend(
171                level_buf.as_ref()[range.start as usize..range.end as usize]
172                    .iter()
173                    .copied(),
174            );
175        } else if let Some(levels) = levels {
176            let num_values = (range.end - range.start) as usize;
177            // This is an all-valid level_buf but we had nulls earlier and so we
178            // need to materialize it
179            levels.extend(iter::repeat_n(0, num_values));
180        }
181    }
182
183    /// Maps a range of rows to a range of items and a range of levels
184    ///
185    /// If there is no repetition information this just returns the range as-is.
186    ///
187    /// If there is repetition information then we need to do some work to figure out what
188    /// range of items corresponds to the requested range of rows.
189    ///
190    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
191    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
192    ///
193    /// Idx: 0 1 2 3 4 5 6
194    /// Rep: 1 0 0 1 0 1 0
195    ///
196    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
197    ///
198    /// If there are invisible items then we don't count them when calculating the range of items we
199    /// are interested in but we do count them when calculating the range of levels we are interested
200    /// in.  As a result we have to return both the item range (first return value) and the level range
201    /// (second return value).
202    ///
203    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
204    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
205    ///
206    /// Idx: 0 1 2 3 4 5 6 7 8
207    /// Rep: 1 0 0 1 0 1 1 0 0
208    /// Def: 0 0 0 0 0 1 0 0 0
209    /// Itm: 1 2 3 4 5 6 7 8
210    ///
211    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
212    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
213    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
214    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
215    ///
216    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
217    ///
218    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
219    fn map_range(
220        range: Range<u64>,
221        rep: Option<&impl AsRef<[u16]>>,
222        def: Option<&impl AsRef<[u16]>>,
223        max_rep: u16,
224        max_visible_def: u16,
225        // The total number of items (not rows) in the chunk.  This is not quite the same as
226        // rep.len() / def.len() because it doesn't count invisible items
227        total_items: u64,
228        preamble_action: PreambleAction,
229    ) -> (Range<u64>, Range<u64>) {
230        if let Some(rep) = rep {
231            let mut rep = rep.as_ref();
232            // If there is a preamble and we need to skip it then do that first.  The work is the same
233            // whether there is def information or not
234            let mut items_in_preamble = 0;
235            let first_row_start = match preamble_action {
236                PreambleAction::Skip | PreambleAction::Take => {
237                    let first_row_start = if let Some(def) = def.as_ref() {
238                        let mut first_row_start = None;
239                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
240                            if *rep == max_rep {
241                                first_row_start = Some(idx);
242                                break;
243                            }
244                            if *def <= max_visible_def {
245                                items_in_preamble += 1;
246                            }
247                        }
248                        first_row_start
249                    } else {
250                        let first_row_start = rep.iter().position(|&r| r == max_rep);
251                        items_in_preamble = first_row_start.unwrap_or(rep.len());
252                        first_row_start
253                    };
254                    // It is possible for a chunk to be entirely partial values but if it is then it
255                    // should never show up as a preamble to skip
256                    if first_row_start.is_none() {
257                        assert!(preamble_action == PreambleAction::Take);
258                        return (0..total_items, 0..rep.len() as u64);
259                    }
260                    let first_row_start = first_row_start.unwrap() as u64;
261                    rep = &rep[first_row_start as usize..];
262                    first_row_start
263                }
264                PreambleAction::Absent => {
265                    debug_assert!(rep[0] == max_rep);
266                    0
267                }
268            };
269
270            // We hit this case when all we needed was the preamble
271            if range.start == range.end {
272                debug_assert!(preamble_action == PreambleAction::Take);
273                return (0..items_in_preamble as u64, 0..first_row_start);
274            }
275            assert!(range.start < range.end);
276
277            let mut rows_seen = 0;
278            let mut new_start = 0;
279            let mut new_levels_start = 0;
280
281            if let Some(def) = def {
282                let def = &def.as_ref()[first_row_start as usize..];
283
284                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
285                let mut lead_invis_seen = 0;
286
287                if range.start > 0 {
288                    if def[0] > max_visible_def {
289                        lead_invis_seen += 1;
290                    }
291                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
292                        if *rep == max_rep {
293                            rows_seen += 1;
294                            if rows_seen == range.start {
295                                new_start = idx as u64 + 1 - lead_invis_seen;
296                                new_levels_start = idx as u64 + 1;
297                                break;
298                            }
299                            if *def > max_visible_def {
300                                lead_invis_seen += 1;
301                            }
302                        }
303                    }
304                }
305
306                rows_seen += 1;
307
308                let mut new_end = u64::MAX;
309                let mut new_levels_end = rep.len() as u64;
310                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
311                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
312                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
313                    .iter()
314                    .zip(&def[(new_levels_start + 1) as usize..])
315                    .enumerate()
316                {
317                    if *rep == max_rep {
318                        rows_seen += 1;
319                        if rows_seen == range.end + 1 {
320                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
321                            new_levels_end = idx as u64 + new_levels_start + 1;
322                            break;
323                        }
324                        if *def > max_visible_def {
325                            tail_invis_seen += 1;
326                        }
327                    }
328                }
329
330                if new_end == u64::MAX {
331                    new_levels_end = rep.len() as u64;
332                    // This is the total number of visible items (minus any items in the preamble)
333                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
334                    new_end = rep.len() as u64 - total_invis_seen;
335                }
336
337                assert_ne!(new_end, u64::MAX);
338
339                // Adjust for any skipped preamble
340                if preamble_action == PreambleAction::Skip {
341                    // TODO: Should this be items_in_preamble?  If so, add a
342                    // unit test for this case
343                    new_start += first_row_start;
344                    new_end += first_row_start;
345                    new_levels_start += first_row_start;
346                    new_levels_end += first_row_start;
347                } else if preamble_action == PreambleAction::Take {
348                    debug_assert_eq!(new_start, 0);
349                    debug_assert_eq!(new_levels_start, 0);
350                    new_end += first_row_start;
351                    new_levels_end += first_row_start;
352                }
353
354                (new_start..new_end, new_levels_start..new_levels_end)
355            } else {
356                // Easy case, there are no invisible items, so we don't need to check for them
357                // The items range and levels range will be the same.  We do still need to walk
358                // the rep levels to find the row boundaries
359
360                // range.start == 0 always maps to 0, otherwise we need to walk
361                if range.start > 0 {
362                    for (idx, rep) in rep.iter().skip(1).enumerate() {
363                        if *rep == max_rep {
364                            rows_seen += 1;
365                            if rows_seen == range.start {
366                                new_start = idx as u64 + 1;
367                                break;
368                            }
369                        }
370                    }
371                }
372                let mut new_end = rep.len() as u64;
373                // range.end == max_items always maps to rep.len(), otherwise we need to walk
374                if range.end < total_items {
375                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
376                        if *rep == max_rep {
377                            rows_seen += 1;
378                            if rows_seen == range.end {
379                                new_end = idx as u64 + new_start + 1;
380                                break;
381                            }
382                        }
383                    }
384                }
385
386                // Adjust for any skipped preamble
387                if preamble_action == PreambleAction::Skip {
388                    new_start += first_row_start;
389                    new_end += first_row_start;
390                } else if preamble_action == PreambleAction::Take {
391                    debug_assert_eq!(new_start, 0);
392                    new_end += first_row_start;
393                }
394
395                (new_start..new_end, new_start..new_end)
396            }
397        } else {
398            // No repetition info, easy case, just use the range as-is and the item
399            // and level ranges are the same
400            (range.clone(), range)
401        }
402    }
403
404    // Unserialize a miniblock into a collection of vectors
405    fn decode_miniblock_chunk(
406        &self,
407        buf: &LanceBuffer,
408        items_in_chunk: u64,
409    ) -> Result<DecodedMiniBlockChunk> {
410        let mut offset = 0;
411        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
412        offset += 2;
413
414        let rep_size = if self.rep_decompressor.is_some() {
415            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
416            offset += 2;
417            Some(rep_size)
418        } else {
419            None
420        };
421        let def_size = if self.def_decompressor.is_some() {
422            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
423            offset += 2;
424            Some(def_size)
425        } else {
426            None
427        };
428        let buffer_sizes = (0..self.num_buffers)
429            .map(|_| {
430                let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
431                offset += 2;
432                size
433            })
434            .collect::<Vec<_>>();
435
436        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
437
438        let rep = rep_size.map(|rep_size| {
439            let rep = buf.slice_with_length(offset, rep_size as usize);
440            offset += rep_size as usize;
441            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
442            rep
443        });
444
445        let def = def_size.map(|def_size| {
446            let def = buf.slice_with_length(offset, def_size as usize);
447            offset += def_size as usize;
448            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
449            def
450        });
451
452        let buffers = buffer_sizes
453            .into_iter()
454            .map(|buf_size| {
455                let buf = buf.slice_with_length(offset, buf_size as usize);
456                offset += buf_size as usize;
457                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
458                buf
459            })
460            .collect::<Vec<_>>();
461
462        let values = self
463            .value_decompressor
464            .decompress(buffers, items_in_chunk)?;
465
466        let rep = rep
467            .map(|rep| {
468                Self::decode_levels(
469                    self.rep_decompressor.as_ref().unwrap().as_ref(),
470                    rep,
471                    num_levels,
472                )
473            })
474            .transpose()?;
475        let def = def
476            .map(|def| {
477                Self::decode_levels(
478                    self.def_decompressor.as_ref().unwrap().as_ref(),
479                    def,
480                    num_levels,
481                )
482            })
483            .transpose()?;
484
485        Ok(DecodedMiniBlockChunk { rep, def, values })
486    }
487}
488
489impl DecodePageTask for DecodeMiniBlockTask {
490    fn decode(self: Box<Self>) -> Result<DecodedPage> {
491        // First, we create output buffers for the rep and def and data
492        let mut repbuf: Option<LevelBuffer> = None;
493        let mut defbuf: Option<LevelBuffer> = None;
494
495        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
496
497        // This is probably an over-estimate but it's quick and easy to calculate
498        let estimated_size_bytes = self
499            .instructions
500            .iter()
501            .map(|(_, chunk)| chunk.data.len())
502            .sum::<usize>()
503            * 2;
504        let mut data_builder =
505            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
506
507        // We need to keep track of the offset into repbuf/defbuf that we are building up
508        let mut level_offset = 0;
509        // Now we iterate through each instruction and process it
510        for (instructions, chunk) in self.instructions.iter() {
511            // TODO: It's very possible that we have duplicate `buf` in self.instructions and we
512            // don't want to decode the buf again and again on the same thread.
513
514            let DecodedMiniBlockChunk { rep, def, values } =
515                self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
516
517            // Our instructions tell us which rows we want to take from this chunk
518            let row_range_start =
519                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
520            let row_range_end = row_range_start + instructions.rows_to_take;
521
522            // We use the rep info to map the row range to an item range / levels range
523            let (item_range, level_range) = Self::map_range(
524                row_range_start..row_range_end,
525                rep.as_ref(),
526                def.as_ref(),
527                max_rep,
528                self.max_visible_level,
529                chunk.items_in_chunk,
530                instructions.preamble_action,
531            );
532
533            // Now we append the data to the output buffers
534            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
535            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
536            level_offset += (level_range.end - level_range.start) as usize;
537            data_builder.append(&values, item_range);
538        }
539
540        let data = data_builder.finish();
541
542        let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
543
544        // if dictionary encoding is applied, do dictionary decode here.
545        if let Some(dictionary) = &self.dictionary_data {
546            // assume the indices are uniformly distributed.
547            let estimated_size_bytes = dictionary.data_size()
548                * (data.num_values() + dictionary.num_values() - 1)
549                / dictionary.num_values();
550            let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
551
552            // if dictionary encoding is applied, indices are of type `Int32`
553            if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
554                let indices = fixed_width_data_block.data.borrow_to_typed_slice::<i32>();
555                let indices = indices.as_ref();
556
557                indices.iter().for_each(|&idx| {
558                    data_builder.append(dictionary, idx as u64..idx as u64 + 1);
559                });
560
561                let data = data_builder.finish();
562                return Ok(DecodedPage {
563                    data,
564                    repdef: unraveler,
565                });
566            }
567        }
568
569        Ok(DecodedPage {
570            data,
571            repdef: unraveler,
572        })
573    }
574}
575
576/// A chunk that has been loaded by the miniblock scheduler (but not
577/// yet decoded)
578#[derive(Debug)]
579struct LoadedChunk {
580    data: LanceBuffer,
581    items_in_chunk: u64,
582    byte_range: Range<u64>,
583    chunk_idx: usize,
584}
585
586impl Clone for LoadedChunk {
587    fn clone(&self) -> Self {
588        Self {
589            // Safe as we always create borrowed buffers here
590            data: self.data.try_clone().unwrap(),
591            items_in_chunk: self.items_in_chunk,
592            byte_range: self.byte_range.clone(),
593            chunk_idx: self.chunk_idx,
594        }
595    }
596}
597
598/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
599/// details on the different layouts.
600#[derive(Debug)]
601struct MiniBlockDecoder {
602    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
603    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
604    value_decompressor: Arc<dyn MiniBlockDecompressor>,
605    def_meaning: Arc<[DefinitionInterpretation]>,
606    loaded_chunks: VecDeque<LoadedChunk>,
607    instructions: VecDeque<ChunkInstructions>,
608    offset_in_current_chunk: u64,
609    num_rows: u64,
610    num_buffers: u64,
611    dictionary: Option<Arc<DataBlock>>,
612}
613
614/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
615/// process for miniblock encoded data.
616impl StructuralPageDecoder for MiniBlockDecoder {
617    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
618        let mut items_desired = num_rows;
619        let mut need_preamble = false;
620        let mut skip_in_chunk = self.offset_in_current_chunk;
621        let mut drain_instructions = Vec::new();
622        while items_desired > 0 || need_preamble {
623            let (instructions, consumed) = self
624                .instructions
625                .front()
626                .unwrap()
627                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
628
629            while self.loaded_chunks.front().unwrap().chunk_idx
630                != instructions.chunk_instructions.chunk_idx
631            {
632                self.loaded_chunks.pop_front();
633            }
634            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
635            if consumed {
636                self.instructions.pop_front();
637            }
638        }
639        // We can throw away need_preamble here because it must be false.  If it were true it would mean
640        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
641        self.offset_in_current_chunk = skip_in_chunk;
642
643        let max_visible_level = self
644            .def_meaning
645            .iter()
646            .take_while(|l| !l.is_list())
647            .map(|l| l.num_def_levels())
648            .sum::<u16>();
649
650        Ok(Box::new(DecodeMiniBlockTask {
651            instructions: drain_instructions,
652            def_decompressor: self.def_decompressor.clone(),
653            rep_decompressor: self.rep_decompressor.clone(),
654            value_decompressor: self.value_decompressor.clone(),
655            dictionary_data: self.dictionary.clone(),
656            def_meaning: self.def_meaning.clone(),
657            num_buffers: self.num_buffers,
658            max_visible_level,
659        }))
660    }
661
662    fn num_rows(&self) -> u64 {
663        self.num_rows
664    }
665}
666
667#[derive(Debug)]
668struct CachedComplexAllNullState {
669    rep: Option<ScalarBuffer<u16>>,
670    def: Option<ScalarBuffer<u16>>,
671}
672
673impl DeepSizeOf for CachedComplexAllNullState {
674    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
675        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
676            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
677    }
678}
679
680impl CachedPageData for CachedComplexAllNullState {
681    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
682        self
683    }
684}
685
686/// A scheduler for all-null data that has repetition and definition levels
687///
688/// We still need to do some I/O in this case because we need to figure out what kind of null we
689/// are dealing with (null list, null struct, what level null struct, etc.)
690///
691/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
692/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
693/// and fast so...maybe not :)
694#[derive(Debug)]
695pub struct ComplexAllNullScheduler {
696    // Set from protobuf
697    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
698    def_meaning: Arc<[DefinitionInterpretation]>,
699    repdef: Option<Arc<CachedComplexAllNullState>>,
700}
701
702impl ComplexAllNullScheduler {
703    pub fn new(
704        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
705        def_meaning: Arc<[DefinitionInterpretation]>,
706    ) -> Self {
707        Self {
708            buffer_offsets_and_sizes,
709            def_meaning,
710            repdef: None,
711        }
712    }
713}
714
715impl StructuralPageScheduler for ComplexAllNullScheduler {
716    fn initialize<'a>(
717        &'a mut self,
718        io: &Arc<dyn EncodingsIo>,
719    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
720        // Fully load the rep & def buffers, as needed
721        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
722        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
723        let has_rep = rep_size > 0;
724        let has_def = def_size > 0;
725
726        let mut reads = Vec::with_capacity(2);
727        if has_rep {
728            reads.push(rep_pos..rep_pos + rep_size);
729        }
730        if has_def {
731            reads.push(def_pos..def_pos + def_size);
732        }
733
734        let data = io.submit_request(reads, 0);
735
736        async move {
737            let data = data.await?;
738            let mut data_iter = data.into_iter();
739
740            let rep = if has_rep {
741                let rep = data_iter.next().unwrap();
742                let mut rep = LanceBuffer::from_bytes(rep, 2);
743                let rep = rep.borrow_to_typed_slice::<u16>();
744                Some(rep)
745            } else {
746                None
747            };
748
749            let def = if has_def {
750                let def = data_iter.next().unwrap();
751                let mut def = LanceBuffer::from_bytes(def, 2);
752                let def = def.borrow_to_typed_slice::<u16>();
753                Some(def)
754            } else {
755                None
756            };
757
758            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
759
760            self.repdef = Some(repdef.clone());
761
762            Ok(repdef as Arc<dyn CachedPageData>)
763        }
764        .boxed()
765    }
766
767    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
768        self.repdef = Some(
769            data.clone()
770                .as_arc_any()
771                .downcast::<CachedComplexAllNullState>()
772                .unwrap(),
773        );
774    }
775
776    fn schedule_ranges(
777        &self,
778        ranges: &[Range<u64>],
779        _io: &Arc<dyn EncodingsIo>,
780    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
781        let ranges = VecDeque::from_iter(ranges.iter().cloned());
782        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
783        Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
784            ranges,
785            rep: self.repdef.as_ref().unwrap().rep.clone(),
786            def: self.repdef.as_ref().unwrap().def.clone(),
787            num_rows,
788            def_meaning: self.def_meaning.clone(),
789        }) as Box<dyn StructuralPageDecoder>))
790        .boxed())
791    }
792}
793
794#[derive(Debug)]
795pub struct ComplexAllNullPageDecoder {
796    ranges: VecDeque<Range<u64>>,
797    rep: Option<ScalarBuffer<u16>>,
798    def: Option<ScalarBuffer<u16>>,
799    num_rows: u64,
800    def_meaning: Arc<[DefinitionInterpretation]>,
801}
802
803impl ComplexAllNullPageDecoder {
804    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
805        let mut rows_desired = num_rows;
806        let mut ranges = Vec::with_capacity(self.ranges.len());
807        while rows_desired > 0 {
808            let front = self.ranges.front_mut().unwrap();
809            let avail = front.end - front.start;
810            if avail > rows_desired {
811                ranges.push(front.start..front.start + rows_desired);
812                front.start += rows_desired;
813                rows_desired = 0;
814            } else {
815                ranges.push(self.ranges.pop_front().unwrap());
816                rows_desired -= avail;
817            }
818        }
819        ranges
820    }
821}
822
823impl StructuralPageDecoder for ComplexAllNullPageDecoder {
824    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
825        let drained_ranges = self.drain_ranges(num_rows);
826        Ok(Box::new(DecodeComplexAllNullTask {
827            ranges: drained_ranges,
828            rep: self.rep.clone(),
829            def: self.def.clone(),
830            def_meaning: self.def_meaning.clone(),
831        }))
832    }
833
834    fn num_rows(&self) -> u64 {
835        self.num_rows
836    }
837}
838
839/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
840/// for the null data.
841#[derive(Debug)]
842pub struct DecodeComplexAllNullTask {
843    ranges: Vec<Range<u64>>,
844    rep: Option<ScalarBuffer<u16>>,
845    def: Option<ScalarBuffer<u16>>,
846    def_meaning: Arc<[DefinitionInterpretation]>,
847}
848
849impl DecodeComplexAllNullTask {
850    fn decode_level(
851        &self,
852        levels: &Option<ScalarBuffer<u16>>,
853        num_values: u64,
854    ) -> Option<Vec<u16>> {
855        levels.as_ref().map(|levels| {
856            let mut referenced_levels = Vec::with_capacity(num_values as usize);
857            for range in &self.ranges {
858                referenced_levels.extend(
859                    levels[range.start as usize..range.end as usize]
860                        .iter()
861                        .copied(),
862                );
863            }
864            referenced_levels
865        })
866    }
867}
868
869impl DecodePageTask for DecodeComplexAllNullTask {
870    fn decode(self: Box<Self>) -> Result<DecodedPage> {
871        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
872        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
873        let rep = self.decode_level(&self.rep, num_values);
874        let def = self.decode_level(&self.def, num_values);
875        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
876        Ok(DecodedPage {
877            data,
878            repdef: unraveler,
879        })
880    }
881}
882
883/// A scheduler for simple all-null data
884///
885/// "simple" all-null data is data that is all null and only has a single level of definition and
886/// no repetition.  We don't need to read any data at all in this case.
887#[derive(Debug, Default)]
888pub struct SimpleAllNullScheduler {}
889
890impl StructuralPageScheduler for SimpleAllNullScheduler {
891    fn initialize<'a>(
892        &'a mut self,
893        _io: &Arc<dyn EncodingsIo>,
894    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
895        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
896    }
897
898    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
899
900    fn schedule_ranges(
901        &self,
902        ranges: &[Range<u64>],
903        _io: &Arc<dyn EncodingsIo>,
904    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
905        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
906        Ok(std::future::ready(Ok(
907            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
908        ))
909        .boxed())
910    }
911}
912
913/// A page decode task for all-null data without any
914/// repetition and only a single level of definition
915#[derive(Debug)]
916struct SimpleAllNullDecodePageTask {
917    num_values: u64,
918}
919impl DecodePageTask for SimpleAllNullDecodePageTask {
920    fn decode(self: Box<Self>) -> Result<DecodedPage> {
921        let unraveler = RepDefUnraveler::new(
922            None,
923            Some(vec![1; self.num_values as usize]),
924            Arc::new([DefinitionInterpretation::NullableItem]),
925        );
926        Ok(DecodedPage {
927            data: DataBlock::AllNull(AllNullDataBlock {
928                num_values: self.num_values,
929            }),
930            repdef: unraveler,
931        })
932    }
933}
934
935#[derive(Debug)]
936pub struct SimpleAllNullPageDecoder {
937    num_rows: u64,
938}
939
940impl StructuralPageDecoder for SimpleAllNullPageDecoder {
941    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
942        Ok(Box::new(SimpleAllNullDecodePageTask {
943            num_values: num_rows,
944        }))
945    }
946
947    fn num_rows(&self) -> u64 {
948        self.num_rows
949    }
950}
951
952#[derive(Debug, Clone)]
953struct MiniBlockSchedulerDictionary {
954    // These come from the protobuf
955    dictionary_decompressor: Arc<dyn BlockDecompressor>,
956    dictionary_buf_position_and_size: (u64, u64),
957    dictionary_data_alignment: u64,
958    num_dictionary_items: u64,
959}
960
961#[derive(Debug)]
962struct RepIndexBlock {
963    // The index of the first row that starts after the beginning of this block.  If the block
964    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
965    // then this will be a row that starts in some future block.
966    first_row: u64,
967    // The number of rows in the block, including the trailer but not the preamble.
968    // Can be 0 if the block is entirely preamble
969    starts_including_trailer: u64,
970    // Whether the block has a preamble
971    has_preamble: bool,
972    // Whether the block has a trailer
973    has_trailer: bool,
974}
975
976impl DeepSizeOf for RepIndexBlock {
977    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
978        0
979    }
980}
981
982#[derive(Debug)]
983struct RepetitionIndex {
984    blocks: Vec<RepIndexBlock>,
985}
986
987impl DeepSizeOf for RepetitionIndex {
988    fn deep_size_of_children(&self, context: &mut Context) -> usize {
989        self.blocks.deep_size_of_children(context)
990    }
991}
992
993impl RepetitionIndex {
994    fn decode(rep_index: &[Vec<u64>]) -> Self {
995        let mut chunk_has_preamble = false;
996        let mut offset = 0;
997        let mut blocks = Vec::with_capacity(rep_index.len());
998        for chunk_rep in rep_index {
999            let ends_count = chunk_rep[0];
1000            let partial_count = chunk_rep[1];
1001
1002            let chunk_has_trailer = partial_count > 0;
1003            let mut starts_including_trailer = ends_count;
1004            if chunk_has_trailer {
1005                starts_including_trailer += 1;
1006            }
1007            if chunk_has_preamble {
1008                starts_including_trailer -= 1;
1009            }
1010
1011            blocks.push(RepIndexBlock {
1012                first_row: offset,
1013                starts_including_trailer,
1014                has_preamble: chunk_has_preamble,
1015                has_trailer: chunk_has_trailer,
1016            });
1017
1018            chunk_has_preamble = chunk_has_trailer;
1019            offset += starts_including_trailer;
1020        }
1021
1022        Self { blocks }
1023    }
1024}
1025
1026/// State that is loaded once and cached for future lookups
1027#[derive(Debug)]
1028struct MiniBlockCacheableState {
1029    /// Metadata that describes each chunk in the page
1030    chunk_meta: Vec<ChunkMeta>,
1031    /// The decoded repetition index
1032    rep_index: RepetitionIndex,
1033    /// The dictionary for the page, if any
1034    dictionary: Option<Arc<DataBlock>>,
1035}
1036
1037impl DeepSizeOf for MiniBlockCacheableState {
1038    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1039        self.rep_index.deep_size_of_children(context)
1040            + self
1041                .dictionary
1042                .as_ref()
1043                .map(|dict| dict.data_size() as usize)
1044                .unwrap_or(0)
1045    }
1046}
1047
1048impl CachedPageData for MiniBlockCacheableState {
1049    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1050        self
1051    }
1052}
1053
1054/// A scheduler for a page that has been encoded with the mini-block layout
1055///
1056/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1057/// in practice.
1058///
1059/// First, during initialization, we load the chunk metadata, the repetition index,
1060/// and the dictionary (these last two may not be present)
1061///
1062/// Then, during scheduling, we use the user's requested row ranges and the repetition
1063/// index to determine which chunks we need and which rows we need from those chunks.
1064///
1065/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1066/// from the user is 40..60 then we need to:
1067///
1068///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1069///    then read 3 items for the 11th row of our range.
1070///  - Read the second chunk and read the remaining items in our 11th row and then read
1071///    the remaining 9 full rows.
1072///
1073/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1074/// The first two decode tasks will just need the first chunk.  The third decode task will
1075/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1076/// chunk.  The final decode task will just need the second chunk.
1077///
1078/// The above prose descriptions are what are represented by [`ChunkInstructions`] and
1079/// [`ChunkDrainInstructions`].
1080#[derive(Debug)]
1081pub struct MiniBlockScheduler {
1082    // These come from the protobuf
1083    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1084    priority: u64,
1085    items_in_page: u64,
1086    repetition_index_depth: u16,
1087    num_buffers: u64,
1088    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1089    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1090    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1091    def_meaning: Arc<[DefinitionInterpretation]>,
1092    dictionary: Option<MiniBlockSchedulerDictionary>,
1093    // This is set after initialization
1094    page_meta: Option<Arc<MiniBlockCacheableState>>,
1095}
1096
1097impl MiniBlockScheduler {
1098    fn try_new(
1099        buffer_offsets_and_sizes: &[(u64, u64)],
1100        priority: u64,
1101        items_in_page: u64,
1102        layout: &pb::MiniBlockLayout,
1103        decompressors: &dyn DecompressionStrategy,
1104    ) -> Result<Self> {
1105        let rep_decompressor = layout
1106            .rep_compression
1107            .as_ref()
1108            .map(|rep_compression| {
1109                decompressors
1110                    .create_block_decompressor(rep_compression)
1111                    .map(Arc::from)
1112            })
1113            .transpose()?;
1114        let def_decompressor = layout
1115            .def_compression
1116            .as_ref()
1117            .map(|def_compression| {
1118                decompressors
1119                    .create_block_decompressor(def_compression)
1120                    .map(Arc::from)
1121            })
1122            .transpose()?;
1123        let def_meaning = layout
1124            .layers
1125            .iter()
1126            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1127            .collect::<Vec<_>>();
1128        let value_decompressor = decompressors
1129            .create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
1130
1131        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1132            let num_dictionary_items = layout.num_dictionary_items;
1133            match dictionary_encoding.array_encoding.as_ref().unwrap() {
1134                pb::array_encoding::ArrayEncoding::Variable(_) => {
1135                    Some(MiniBlockSchedulerDictionary {
1136                        dictionary_decompressor: decompressors
1137                            .create_block_decompressor(dictionary_encoding)?
1138                            .into(),
1139                        dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1140                        dictionary_data_alignment: 4,
1141                        num_dictionary_items,
1142                    })
1143                }
1144                pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1145                    dictionary_decompressor: decompressors
1146                        .create_block_decompressor(dictionary_encoding)?
1147                        .into(),
1148                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1149                    dictionary_data_alignment: 16,
1150                    num_dictionary_items,
1151                }),
1152                _ => {
1153                    unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1154                }
1155            }
1156        } else {
1157            None
1158        };
1159
1160        Ok(Self {
1161            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1162            rep_decompressor,
1163            def_decompressor,
1164            value_decompressor: value_decompressor.into(),
1165            repetition_index_depth: layout.repetition_index_depth as u16,
1166            num_buffers: layout.num_buffers,
1167            priority,
1168            items_in_page,
1169            dictionary,
1170            def_meaning: def_meaning.into(),
1171            page_meta: None,
1172        })
1173    }
1174
1175    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1176        let page_meta = self.page_meta.as_ref().unwrap();
1177        chunk_indices
1178            .iter()
1179            .map(|&chunk_idx| {
1180                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1181                let bytes_start = chunk_meta.offset_bytes;
1182                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1183                LoadedChunk {
1184                    byte_range: bytes_start..bytes_end,
1185                    items_in_chunk: chunk_meta.num_values,
1186                    chunk_idx,
1187                    data: LanceBuffer::empty(),
1188                }
1189            })
1190            .collect()
1191    }
1192}
1193
1194#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1195enum PreambleAction {
1196    Take,
1197    Skip,
1198    Absent,
1199}
1200
1201// TODO: Add test cases for the all-preamble and all-trailer cases
1202
1203// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1204// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1205// us how exactly to read from the chunk.
1206#[derive(Clone, Debug, PartialEq, Eq)]
1207struct ChunkInstructions {
1208    // The index of the chunk to read
1209    chunk_idx: usize,
1210    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1211    // is no repetition index there is never a preamble.
1212    //
1213    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1214    // that spans several chunks.
1215    preamble: PreambleAction,
1216    // How many complete rows (not including the preamble or trailer) to skip
1217    //
1218    // If this is non-zero then premable must not be Take
1219    rows_to_skip: u64,
1220    // How many complete (non-preamble / non-trailer) rows to take
1221    rows_to_take: u64,
1222    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1223    // never a trailer.
1224    //
1225    // It's possible for a chunk to be entirely trailer.  This would mean the chunk starts with the beginning
1226    // of a list and that list is continued in the next chunk.
1227    //
1228    // If this is true then we want to include the trailer
1229    take_trailer: bool,
1230}
1231
1232// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1233// start decoding them, based on a batch size, which might not align with what we scheduled.
1234//
1235// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1236//
1237// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1238// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1239// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1240// then take 10 rows.
1241//
1242// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1243//  -no preamble, skip 5, take 10, take trailer
1244//  -take preamble, skip 0, take 50, no trailer
1245//
1246// and we are draining 20 rows then the drain instructions for the first batch will be:
1247//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1248//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1249#[derive(Debug, PartialEq, Eq)]
1250struct ChunkDrainInstructions {
1251    chunk_instructions: ChunkInstructions,
1252    rows_to_skip: u64,
1253    rows_to_take: u64,
1254    preamble_action: PreambleAction,
1255}
1256
1257impl ChunkInstructions {
1258    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1259    //
1260    // We assume that `user_ranges` are in sorted order and non-overlapping
1261    //
1262    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1263    fn schedule_instructions(rep_index: &RepetitionIndex, user_ranges: &[Range<u64>]) -> Vec<Self> {
1264        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1265        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1266        // per row which can happen with lists.
1267        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1268
1269        for user_range in user_ranges {
1270            let mut rows_needed = user_range.end - user_range.start;
1271            let mut need_preamble = false;
1272
1273            // Need to find the first chunk with a first row >= user_range.start.  If there are
1274            // multiple chunks with the same first row we need to take the first one.
1275            let mut block_index = match rep_index
1276                .blocks
1277                .binary_search_by_key(&user_range.start, |block| block.first_row)
1278            {
1279                Ok(idx) => {
1280                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1281                    // are grabbing first eligible chunk
1282                    let mut idx = idx;
1283                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1284                        idx -= 1;
1285                    }
1286                    idx
1287                }
1288                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1289                Err(idx) => idx - 1,
1290            };
1291
1292            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1293
1294            while rows_needed > 0 || need_preamble {
1295                let chunk = &rep_index.blocks[block_index];
1296                let rows_avail = chunk.starts_including_trailer - to_skip;
1297                debug_assert!(rows_avail > 0);
1298
1299                let rows_to_take = rows_avail.min(rows_needed);
1300                rows_needed -= rows_to_take;
1301
1302                let mut take_trailer = false;
1303                let preamble = if chunk.has_preamble {
1304                    if need_preamble {
1305                        PreambleAction::Take
1306                    } else {
1307                        PreambleAction::Skip
1308                    }
1309                } else {
1310                    PreambleAction::Absent
1311                };
1312                let mut rows_to_take_no_trailer = rows_to_take;
1313
1314                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1315                if rows_to_take == rows_avail && chunk.has_trailer {
1316                    take_trailer = true;
1317                    need_preamble = true;
1318                    rows_to_take_no_trailer -= 1;
1319                } else {
1320                    need_preamble = false;
1321                };
1322
1323                chunk_instructions.push(Self {
1324                    preamble,
1325                    chunk_idx: block_index,
1326                    rows_to_skip: to_skip,
1327                    rows_to_take: rows_to_take_no_trailer,
1328                    take_trailer,
1329                });
1330
1331                to_skip = 0;
1332                block_index += 1;
1333            }
1334        }
1335
1336        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1337        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1338        // rows of chunk 0 starting at 0")
1339        if user_ranges.len() > 1 {
1340            // TODO: Could probably optimize this allocation away
1341            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1342            let mut instructions_iter = chunk_instructions.into_iter();
1343            merged_instructions.push(instructions_iter.next().unwrap());
1344            for instruction in instructions_iter {
1345                let last = merged_instructions.last_mut().unwrap();
1346                if last.chunk_idx == instruction.chunk_idx
1347                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1348                {
1349                    last.rows_to_take += instruction.rows_to_take;
1350                    last.take_trailer |= instruction.take_trailer;
1351                } else {
1352                    merged_instructions.push(instruction);
1353                }
1354            }
1355            merged_instructions
1356        } else {
1357            chunk_instructions
1358        }
1359    }
1360
1361    fn drain_from_instruction(
1362        &self,
1363        rows_desired: &mut u64,
1364        need_preamble: &mut bool,
1365        skip_in_chunk: &mut u64,
1366    ) -> (ChunkDrainInstructions, bool) {
1367        // If we need the premable then we shouldn't be skipping anything
1368        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1369        let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1370        let has_preamble = self.preamble != PreambleAction::Absent;
1371        let preamble_action = match (*need_preamble, has_preamble) {
1372            (true, true) => PreambleAction::Take,
1373            (true, false) => panic!("Need preamble but there isn't one"),
1374            (false, true) => PreambleAction::Skip,
1375            (false, false) => PreambleAction::Absent,
1376        };
1377
1378        // Did the scheduled chunk have a trailer?  If so, we have one extra row available
1379        if self.take_trailer {
1380            rows_avail += 1;
1381        }
1382
1383        // How many rows are we actually taking in this take step (including the preamble
1384        // and trailer both as individual rows)
1385        let rows_taking = if *rows_desired >= rows_avail {
1386            // We want all the rows.  If there is a trailer we are grabbing it and will need
1387            // the preamble of the next chunk
1388            *need_preamble = self.take_trailer;
1389            rows_avail
1390        } else {
1391            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1392            // it so we will not need the preamble
1393            *need_preamble = false;
1394            *rows_desired
1395        };
1396        let rows_skipped = *skip_in_chunk;
1397
1398        // Update the state for the next iteration
1399        let consumed_chunk = if *rows_desired >= rows_avail {
1400            *rows_desired -= rows_avail;
1401            *skip_in_chunk = 0;
1402            true
1403        } else {
1404            *skip_in_chunk += *rows_desired;
1405            *rows_desired = 0;
1406            false
1407        };
1408
1409        (
1410            ChunkDrainInstructions {
1411                chunk_instructions: self.clone(),
1412                rows_to_skip: rows_skipped,
1413                rows_to_take: rows_taking,
1414                preamble_action,
1415            },
1416            consumed_chunk,
1417        )
1418    }
1419}
1420
1421impl StructuralPageScheduler for MiniBlockScheduler {
1422    fn initialize<'a>(
1423        &'a mut self,
1424        io: &Arc<dyn EncodingsIo>,
1425    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1426        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1427        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1428        // need.
1429        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1430        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1431        let mut bufs_needed = 1;
1432        if self.dictionary.is_some() {
1433            bufs_needed += 1;
1434        }
1435        if self.repetition_index_depth > 0 {
1436            bufs_needed += 1;
1437        }
1438        let mut required_ranges = Vec::with_capacity(bufs_needed);
1439        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1440        if let Some(ref dictionary) = self.dictionary {
1441            required_ranges.push(
1442                dictionary.dictionary_buf_position_and_size.0
1443                    ..dictionary.dictionary_buf_position_and_size.0
1444                        + dictionary.dictionary_buf_position_and_size.1,
1445            );
1446        }
1447        if self.repetition_index_depth > 0 {
1448            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1449            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1450        }
1451        let io_req = io.submit_request(required_ranges, 0);
1452
1453        async move {
1454            let mut buffers = io_req.await?.into_iter().fuse();
1455            let meta_bytes = buffers.next().unwrap();
1456            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1457            let rep_index_bytes = buffers.next();
1458
1459            // Parse the metadata and build the chunk meta
1460            assert!(meta_bytes.len() % 2 == 0);
1461            let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1462            let words = bytes.borrow_to_typed_slice::<u16>();
1463            let words = words.as_ref();
1464
1465            let mut chunk_meta = Vec::with_capacity(words.len());
1466
1467            let mut rows_counter = 0;
1468            let mut offset_bytes = value_buf_position;
1469            for (word_idx, word) in words.iter().enumerate() {
1470                let log_num_values = word & 0x0F;
1471                let divided_bytes = word >> 4;
1472                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1473                debug_assert!(num_bytes > 0);
1474                let num_values = if word_idx < words.len() - 1 {
1475                    debug_assert!(log_num_values > 0);
1476                    1 << log_num_values
1477                } else {
1478                    debug_assert!(
1479                        log_num_values == 0
1480                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1481                    );
1482                    self.items_in_page - rows_counter
1483                };
1484                rows_counter += num_values;
1485
1486                chunk_meta.push(ChunkMeta {
1487                    num_values,
1488                    chunk_size_bytes: num_bytes as u64,
1489                    offset_bytes,
1490                });
1491                offset_bytes += num_bytes as u64;
1492            }
1493
1494            // Build the repetition index
1495            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1496                // If we have a repetition index then we use that
1497                // TODO: Compress the repetition index :)
1498                assert!(rep_index_data.len() % 8 == 0);
1499                let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1500                let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1501                // Unflatten
1502                repetition_index_vals
1503                    .as_ref()
1504                    .chunks_exact(self.repetition_index_depth as usize + 1)
1505                    .map(|c| c.to_vec())
1506                    .collect::<Vec<_>>()
1507            } else {
1508                // Default rep index is just the number of items in each chunk
1509                // with 0 partials/leftovers
1510                chunk_meta
1511                    .iter()
1512                    .map(|c| vec![c.num_values, 0])
1513                    .collect::<Vec<_>>()
1514            };
1515
1516            let mut page_meta = MiniBlockCacheableState {
1517                chunk_meta,
1518                rep_index: RepetitionIndex::decode(&rep_index),
1519                dictionary: None,
1520            };
1521
1522            // decode dictionary
1523            if let Some(ref mut dictionary) = self.dictionary {
1524                let dictionary_data = dictionary_bytes.unwrap();
1525                page_meta.dictionary =
1526                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1527                        LanceBuffer::from_bytes(
1528                            dictionary_data,
1529                            dictionary.dictionary_data_alignment,
1530                        ),
1531                        dictionary.num_dictionary_items,
1532                    )?));
1533            };
1534            let page_meta = Arc::new(page_meta);
1535            self.page_meta = Some(page_meta.clone());
1536            Ok(page_meta as Arc<dyn CachedPageData>)
1537        }
1538        .boxed()
1539    }
1540
1541    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1542        self.page_meta = Some(
1543            data.clone()
1544                .as_arc_any()
1545                .downcast::<MiniBlockCacheableState>()
1546                .unwrap(),
1547        );
1548    }
1549
1550    fn schedule_ranges(
1551        &self,
1552        ranges: &[Range<u64>],
1553        io: &Arc<dyn EncodingsIo>,
1554    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1555        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1556
1557        let page_meta = self.page_meta.as_ref().unwrap();
1558
1559        let chunk_instructions =
1560            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1561
1562        debug_assert_eq!(
1563            num_rows,
1564            chunk_instructions
1565                .iter()
1566                .map(|ci| {
1567                    let taken = ci.rows_to_take;
1568                    if ci.take_trailer {
1569                        taken + 1
1570                    } else {
1571                        taken
1572                    }
1573                })
1574                .sum::<u64>()
1575        );
1576
1577        let chunks_needed = chunk_instructions
1578            .iter()
1579            .map(|ci| ci.chunk_idx)
1580            .unique()
1581            .collect::<Vec<_>>();
1582        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1583        let chunk_ranges = loaded_chunks
1584            .iter()
1585            .map(|c| c.byte_range.clone())
1586            .collect::<Vec<_>>();
1587        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1588
1589        let rep_decompressor = self.rep_decompressor.clone();
1590        let def_decompressor = self.def_decompressor.clone();
1591        let value_decompressor = self.value_decompressor.clone();
1592        let num_buffers = self.num_buffers;
1593        let dictionary = page_meta
1594            .dictionary
1595            .as_ref()
1596            .map(|dictionary| dictionary.clone());
1597        let def_meaning = self.def_meaning.clone();
1598
1599        let res = async move {
1600            let loaded_chunk_data = loaded_chunk_data.await?;
1601            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1602                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1603            }
1604
1605            Ok(Box::new(MiniBlockDecoder {
1606                rep_decompressor,
1607                def_decompressor,
1608                value_decompressor,
1609                def_meaning,
1610                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1611                instructions: VecDeque::from(chunk_instructions),
1612                offset_in_current_chunk: 0,
1613                dictionary,
1614                num_rows,
1615                num_buffers,
1616            }) as Box<dyn StructuralPageDecoder>)
1617        }
1618        .boxed();
1619        Ok(res)
1620    }
1621}
1622
1623#[derive(Debug)]
1624struct FullZipRepIndexDetails {
1625    buf_position: u64,
1626    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1627}
1628
1629#[derive(Debug)]
1630enum PerValueDecompressor {
1631    Fixed(Arc<dyn FixedPerValueDecompressor>),
1632    Variable(Arc<dyn VariablePerValueDecompressor>),
1633}
1634
1635#[derive(Debug)]
1636struct FullZipDecodeDetails {
1637    value_decompressor: PerValueDecompressor,
1638    def_meaning: Arc<[DefinitionInterpretation]>,
1639    ctrl_word_parser: ControlWordParser,
1640    max_rep: u16,
1641    max_visible_def: u16,
1642}
1643
1644/// A scheduler for full-zip encoded data
1645///
1646/// When the data type has a fixed-width then we simply need to map from
1647/// row ranges to byte ranges using the fixed-width of the data type.
1648///
1649/// When the data type is variable-width or has any repetition then a
1650/// repetition index is required.
1651#[derive(Debug)]
1652pub struct FullZipScheduler {
1653    data_buf_position: u64,
1654    rep_index: Option<FullZipRepIndexDetails>,
1655    priority: u64,
1656    rows_in_page: u64,
1657    bits_per_offset: u8,
1658    details: Arc<FullZipDecodeDetails>,
1659    /// Cached state containing the decoded repetition index
1660    cached_state: Option<Arc<FullZipCacheableState>>,
1661}
1662
1663impl FullZipScheduler {
1664    fn try_new(
1665        buffer_offsets_and_sizes: &[(u64, u64)],
1666        priority: u64,
1667        rows_in_page: u64,
1668        layout: &pb::FullZipLayout,
1669        decompressors: &dyn DecompressionStrategy,
1670    ) -> Result<Self> {
1671        // We don't need the data_buf_size because either the data type is
1672        // fixed-width (and we can tell size from rows_in_page) or it is not
1673        // and we have a repetition index.
1674        let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1675        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1676            let num_reps = rows_in_page + 1;
1677            let bytes_per_rep = len / num_reps;
1678            debug_assert_eq!(len % num_reps, 0);
1679            debug_assert!(
1680                bytes_per_rep == 1
1681                    || bytes_per_rep == 2
1682                    || bytes_per_rep == 4
1683                    || bytes_per_rep == 8
1684            );
1685            FullZipRepIndexDetails {
1686                buf_position: *pos,
1687                bytes_per_value: bytes_per_rep,
1688            }
1689        });
1690
1691        let value_decompressor = match layout.details {
1692            Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1693                let decompressor = decompressors.create_fixed_per_value_decompressor(
1694                    layout.value_compression.as_ref().unwrap(),
1695                )?;
1696                PerValueDecompressor::Fixed(decompressor.into())
1697            }
1698            Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1699                let decompressor = decompressors.create_variable_per_value_decompressor(
1700                    layout.value_compression.as_ref().unwrap(),
1701                )?;
1702                PerValueDecompressor::Variable(decompressor.into())
1703            }
1704            None => {
1705                panic!("Full-zip layout must have a `details` field");
1706            }
1707        };
1708        let ctrl_word_parser = ControlWordParser::new(
1709            layout.bits_rep.try_into().unwrap(),
1710            layout.bits_def.try_into().unwrap(),
1711        );
1712        let def_meaning = layout
1713            .layers
1714            .iter()
1715            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1716            .collect::<Vec<_>>();
1717
1718        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1719        let max_visible_def = def_meaning
1720            .iter()
1721            .filter(|d| !d.is_list())
1722            .map(|d| d.num_def_levels())
1723            .sum();
1724
1725        let bits_per_offset = match layout.details {
1726            Some(pb::full_zip_layout::Details::BitsPerValue(_)) => 32,
1727            Some(pb::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1728                bits_per_offset as u8
1729            }
1730            None => panic!("Full-zip layout must have a `details` field"),
1731        };
1732
1733        let details = Arc::new(FullZipDecodeDetails {
1734            value_decompressor,
1735            def_meaning: def_meaning.into(),
1736            ctrl_word_parser,
1737            max_rep,
1738            max_visible_def,
1739        });
1740        Ok(Self {
1741            data_buf_position,
1742            rep_index,
1743            details,
1744            priority,
1745            rows_in_page,
1746            bits_per_offset,
1747            cached_state: None,
1748        })
1749    }
1750
1751    /// Schedules indirectly by first fetching the data ranges from the
1752    /// repetition index and then fetching the data
1753    ///
1754    /// This approach is needed whenever we have a repetition index and
1755    /// the data has a variable length.
1756    #[allow(clippy::too_many_arguments)]
1757    async fn indirect_schedule_ranges(
1758        data_buffer_pos: u64,
1759        row_ranges: Vec<Range<u64>>,
1760        rep_index_ranges: Vec<Range<u64>>,
1761        bytes_per_rep: u64,
1762        io: Arc<dyn EncodingsIo>,
1763        priority: u64,
1764        bits_per_offset: u8,
1765        details: Arc<FullZipDecodeDetails>,
1766    ) -> Result<Box<dyn StructuralPageDecoder>> {
1767        let byte_ranges = io
1768            .submit_request(rep_index_ranges, priority)
1769            .await?
1770            .into_iter()
1771            .map(|d| LanceBuffer::from_bytes(d, 1))
1772            .collect::<Vec<_>>();
1773        let byte_ranges = LanceBuffer::concat(&byte_ranges);
1774        let byte_ranges = ByteUnpacker::new(byte_ranges, bytes_per_rep as usize)
1775            .chunks(2)
1776            .into_iter()
1777            .map(|mut c| {
1778                let start = c.next().unwrap() + data_buffer_pos;
1779                let end = c.next().unwrap() + data_buffer_pos;
1780                start..end
1781            })
1782            .collect::<Vec<_>>();
1783
1784        let data = io.submit_request(byte_ranges, priority);
1785
1786        let data = data.await?;
1787        let data = data
1788            .into_iter()
1789            .map(|d| LanceBuffer::from_bytes(d, 1))
1790            .collect();
1791        let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();
1792
1793        match &details.value_decompressor {
1794            PerValueDecompressor::Fixed(decompressor) => {
1795                let bits_per_value = decompressor.bits_per_value();
1796                if bits_per_value == 0 {
1797                    return Err(lance_core::Error::Internal {
1798                        message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1799                        location: location!(),
1800                    });
1801                }
1802                if bits_per_value % 8 != 0 {
1803                    // Unlikely we will ever want this since full-zip values are so large the few bits we shave off don't
1804                    // make much difference.
1805                    return Err(lance_core::Error::NotSupported {
1806                        source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1807                        location: location!(),
1808                    });
1809                }
1810                let bytes_per_value = bits_per_value / 8;
1811                let total_bytes_per_value =
1812                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1813                Ok(Box::new(FixedFullZipDecoder {
1814                    details,
1815                    data,
1816                    num_rows,
1817                    offset_in_current: 0,
1818                    bytes_per_value: bytes_per_value as usize,
1819                    total_bytes_per_value,
1820                }) as Box<dyn StructuralPageDecoder>)
1821            }
1822            PerValueDecompressor::Variable(_decompressor) => {
1823                // Variable full-zip
1824
1825                Ok(Box::new(VariableFullZipDecoder::new(
1826                    details,
1827                    data,
1828                    num_rows,
1829                    bits_per_offset,
1830                    bits_per_offset,
1831                )))
1832            }
1833        }
1834    }
1835
1836    /// Schedules ranges directly using cached repetition index data
1837    fn schedule_ranges_with_cached_rep(
1838        &self,
1839        ranges: &[Range<u64>],
1840        io: &Arc<dyn EncodingsIo>,
1841        cached_rep_data: &LanceBuffer,
1842        bytes_per_value: u64,
1843    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1844        use crate::utils::bytepack::ByteUnpacker;
1845
1846        // Extract byte ranges directly from the cached repetition index
1847        let byte_ranges: Vec<Range<u64>> = ranges
1848            .iter()
1849            .map(|r| {
1850                // Get start and end values from the cached buffer
1851                let start_offset = (r.start * bytes_per_value) as usize;
1852                let end_offset = (r.end * bytes_per_value) as usize;
1853
1854                // Use ByteUnpacker to read single values
1855                let start_slice =
1856                    &cached_rep_data[start_offset..start_offset + bytes_per_value as usize];
1857                let start_val =
1858                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
1859                        .next()
1860                        .unwrap();
1861
1862                let end_slice = &cached_rep_data[end_offset..end_offset + bytes_per_value as usize];
1863                let end_val =
1864                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
1865                        .next()
1866                        .unwrap();
1867
1868                (self.data_buf_position + start_val)..(self.data_buf_position + end_val)
1869            })
1870            .collect();
1871
1872        let data = io.submit_request(byte_ranges, self.priority);
1873        let row_ranges = ranges.to_vec();
1874        let details = self.details.clone();
1875        let bits_per_offset = self.bits_per_offset;
1876
1877        Ok(async move {
1878            let data = data.await?;
1879            let data = data
1880                .into_iter()
1881                .map(|d| LanceBuffer::from_bytes(d, 1))
1882                .collect();
1883            let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();
1884
1885            match &details.value_decompressor {
1886                PerValueDecompressor::Fixed(decompressor) => {
1887                    let bits_per_value = decompressor.bits_per_value();
1888                    if bits_per_value == 0 {
1889                        return Err(lance_core::Error::Internal {
1890                            message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1891                            location: location!(),
1892                        });
1893                    }
1894                    if bits_per_value % 8 != 0 {
1895                        return Err(lance_core::Error::NotSupported {
1896                            source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1897                            location: location!(),
1898                        });
1899                    }
1900                    let bytes_per_value = bits_per_value / 8;
1901                    let total_bytes_per_value =
1902                        bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1903                    Ok(Box::new(FixedFullZipDecoder {
1904                        details,
1905                        data,
1906                        num_rows,
1907                        offset_in_current: 0,
1908                        bytes_per_value: bytes_per_value as usize,
1909                        total_bytes_per_value,
1910                    }) as Box<dyn StructuralPageDecoder>)
1911                }
1912                PerValueDecompressor::Variable(_decompressor) => {
1913                    Ok(Box::new(VariableFullZipDecoder::new(
1914                        details,
1915                        data,
1916                        num_rows,
1917                        bits_per_offset,
1918                        bits_per_offset,
1919                    )) as Box<dyn StructuralPageDecoder>)
1920                }
1921            }
1922        }
1923        .boxed())
1924    }
1925
1926    /// Schedules ranges in the presence of a repetition index
1927    fn schedule_ranges_rep(
1928        &self,
1929        ranges: &[Range<u64>],
1930        io: &Arc<dyn EncodingsIo>,
1931        rep_index: &FullZipRepIndexDetails,
1932    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1933        // Check if we have cached repetition index data
1934        if let Some(cached_state) = &self.cached_state {
1935            return self.schedule_ranges_with_cached_rep(
1936                ranges,
1937                io,
1938                &cached_state.rep_index_buffer,
1939                rep_index.bytes_per_value,
1940            );
1941        }
1942
1943        // Fall back to loading from disk
1944        let rep_index_ranges = ranges
1945            .iter()
1946            .flat_map(|r| {
1947                let first_val_start =
1948                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
1949                let first_val_end = first_val_start + rep_index.bytes_per_value;
1950                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
1951                let last_val_end = last_val_start + rep_index.bytes_per_value;
1952                [first_val_start..first_val_end, last_val_start..last_val_end]
1953            })
1954            .collect::<Vec<_>>();
1955
1956        // Create the decoder
1957
1958        Ok(Self::indirect_schedule_ranges(
1959            self.data_buf_position,
1960            ranges.to_vec(),
1961            rep_index_ranges,
1962            rep_index.bytes_per_value,
1963            io.clone(),
1964            self.priority,
1965            self.bits_per_offset,
1966            self.details.clone(),
1967        )
1968        .boxed())
1969    }
1970
1971    // In the simple case there is no repetition and we just have large fixed-width
1972    // rows of data.  We can just map row ranges to byte ranges directly using the
1973    // fixed-width of the data type.
1974    fn schedule_ranges_simple(
1975        &self,
1976        ranges: &[Range<u64>],
1977        io: &dyn EncodingsIo,
1978    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1979        // Convert row ranges to item ranges (i.e. multiply by items per row)
1980        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1981
1982        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
1983            unreachable!()
1984        };
1985
1986        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
1987        let bits_per_value = decompressor.bits_per_value();
1988        assert_eq!(bits_per_value % 8, 0);
1989        let bytes_per_value = bits_per_value / 8;
1990        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
1991        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
1992        let byte_ranges = ranges.iter().map(|r| {
1993            debug_assert!(r.end <= self.rows_in_page);
1994            let start = self.data_buf_position + r.start * total_bytes_per_value;
1995            let end = self.data_buf_position + r.end * total_bytes_per_value;
1996            start..end
1997        });
1998
1999        // Request byte ranges
2000        let data = io.submit_request(byte_ranges.collect(), self.priority);
2001
2002        let details = self.details.clone();
2003
2004        Ok(async move {
2005            let data = data.await?;
2006            let data = data
2007                .into_iter()
2008                .map(|d| LanceBuffer::from_bytes(d, 1))
2009                .collect();
2010            Ok(Box::new(FixedFullZipDecoder {
2011                details,
2012                data,
2013                num_rows,
2014                offset_in_current: 0,
2015                bytes_per_value: bytes_per_value as usize,
2016                total_bytes_per_value: total_bytes_per_value as usize,
2017            }) as Box<dyn StructuralPageDecoder>)
2018        }
2019        .boxed())
2020    }
2021}
2022
2023/// Cacheable state for FullZip encoding, storing the decoded repetition index
2024#[derive(Debug)]
2025struct FullZipCacheableState {
2026    /// The raw repetition index buffer for future decoding
2027    rep_index_buffer: LanceBuffer,
2028}
2029
2030impl DeepSizeOf for FullZipCacheableState {
2031    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2032        self.rep_index_buffer.len()
2033    }
2034}
2035
2036impl CachedPageData for FullZipCacheableState {
2037    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2038        self
2039    }
2040}
2041
2042impl StructuralPageScheduler for FullZipScheduler {
2043    /// Initializes the scheduler. If there's a repetition index, loads and caches it.
2044    /// Otherwise returns NoCachedPageData.
2045    fn initialize<'a>(
2046        &'a mut self,
2047        io: &Arc<dyn EncodingsIo>,
2048    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2049        // Check if we have a repetition index
2050        if let Some(rep_index) = &self.rep_index {
2051            // Calculate the total size of the repetition index
2052            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2053            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2054
2055            // Load the repetition index buffer
2056            let io_clone = io.clone();
2057            let future = async move {
2058                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2059                let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2060
2061                // Create and return the cacheable state
2062                Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2063            };
2064
2065            future.boxed()
2066        } else {
2067            // No repetition index, skip caching
2068            std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2069        }
2070    }
2071
2072    /// Loads previously cached repetition index data from the cache system.
2073    /// This method is called when a scheduler instance needs to use cached data
2074    /// that was initialized by another instance or in a previous operation.
2075    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2076        // Try to downcast to our specific cache type
2077        if let Ok(cached_state) = cache
2078            .clone()
2079            .as_arc_any()
2080            .downcast::<FullZipCacheableState>()
2081        {
2082            // Store the cached state for use in schedule_ranges
2083            self.cached_state = Some(cached_state);
2084        }
2085    }
2086
2087    fn schedule_ranges(
2088        &self,
2089        ranges: &[Range<u64>],
2090        io: &Arc<dyn EncodingsIo>,
2091    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2092        if let Some(rep_index) = self.rep_index.as_ref() {
2093            self.schedule_ranges_rep(ranges, io, rep_index)
2094        } else {
2095            self.schedule_ranges_simple(ranges, io.as_ref())
2096        }
2097    }
2098}
2099
2100/// A decoder for full-zip encoded data when the data has a fixed-width
2101///
2102/// Here we need to unzip the control words from the values themselves and
2103/// then decompress the requested values.
2104///
2105/// We use a PerValueDecompressor because we will only be decompressing the
2106/// requested data.  This decoder / scheduler does not do any read amplification.
2107#[derive(Debug)]
2108struct FixedFullZipDecoder {
2109    details: Arc<FullZipDecodeDetails>,
2110    data: VecDeque<LanceBuffer>,
2111    offset_in_current: usize,
2112    bytes_per_value: usize,
2113    total_bytes_per_value: usize,
2114    num_rows: u64,
2115}
2116
2117impl FixedFullZipDecoder {
2118    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2119        debug_assert!(num_rows > 0);
2120        let cur_buf = self.data.front_mut().unwrap();
2121        let start = self.offset_in_current;
2122        if self.details.ctrl_word_parser.has_rep() {
2123            // This is a slightly slower path.  In order to figure out where to split we need to
2124            // examine the rep index so we can convert num_lists to num_rows
2125            let mut rows_started = 0;
2126            // We always need at least one value.  Now loop through until we have passed num_rows
2127            // values
2128            let mut num_items = 0;
2129            while self.offset_in_current < cur_buf.len() {
2130                let control = self.details.ctrl_word_parser.parse_desc(
2131                    &cur_buf[self.offset_in_current..],
2132                    self.details.max_rep,
2133                    self.details.max_visible_def,
2134                );
2135                if control.is_new_row {
2136                    if rows_started == num_rows {
2137                        break;
2138                    }
2139                    rows_started += 1;
2140                }
2141                num_items += 1;
2142                if control.is_visible {
2143                    self.offset_in_current += self.total_bytes_per_value;
2144                } else {
2145                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2146                }
2147            }
2148
2149            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2150            if self.offset_in_current == cur_buf.len() {
2151                self.data.pop_front();
2152                self.offset_in_current = 0;
2153            }
2154
2155            FullZipDecodeTaskItem {
2156                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2157                    data: task_slice,
2158                    bits_per_value: self.bytes_per_value as u64 * 8,
2159                    num_values: num_items,
2160                    block_info: BlockInfo::new(),
2161                }),
2162                rows_in_buf: rows_started,
2163            }
2164        } else {
2165            // If there's no repetition we can calculate the slicing point by just multiplying
2166            // the number of rows by the total bytes per value
2167            let cur_buf = self.data.front_mut().unwrap();
2168            let bytes_avail = cur_buf.len() - self.offset_in_current;
2169            let offset_in_cur = self.offset_in_current;
2170
2171            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2172            let mut rows_taken = num_rows;
2173            let task_slice = if bytes_needed >= bytes_avail {
2174                self.offset_in_current = 0;
2175                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2176                self.data
2177                    .pop_front()
2178                    .unwrap()
2179                    .slice_with_length(offset_in_cur, bytes_avail)
2180            } else {
2181                self.offset_in_current += bytes_needed;
2182                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2183            };
2184            FullZipDecodeTaskItem {
2185                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2186                    data: task_slice,
2187                    bits_per_value: self.bytes_per_value as u64 * 8,
2188                    num_values: rows_taken,
2189                    block_info: BlockInfo::new(),
2190                }),
2191                rows_in_buf: rows_taken,
2192            }
2193        }
2194    }
2195}
2196
2197impl StructuralPageDecoder for FixedFullZipDecoder {
2198    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2199        let mut task_data = Vec::with_capacity(self.data.len());
2200        let mut remaining = num_rows;
2201        while remaining > 0 {
2202            let task_item = self.slice_next_task(remaining);
2203            remaining -= task_item.rows_in_buf;
2204            task_data.push(task_item);
2205        }
2206        Ok(Box::new(FixedFullZipDecodeTask {
2207            details: self.details.clone(),
2208            data: task_data,
2209            bytes_per_value: self.bytes_per_value,
2210            num_rows: num_rows as usize,
2211        }))
2212    }
2213
2214    fn num_rows(&self) -> u64 {
2215        self.num_rows
2216    }
2217}
2218
2219/// A decoder for full-zip encoded data when the data has a variable-width
2220///
2221/// Here we need to unzip the control words AND lengths from the values and
2222/// then decompress the requested values.
2223#[derive(Debug)]
2224struct VariableFullZipDecoder {
2225    details: Arc<FullZipDecodeDetails>,
2226    decompressor: Arc<dyn VariablePerValueDecompressor>,
2227    data: LanceBuffer,
2228    offsets: LanceBuffer,
2229    rep: ScalarBuffer<u16>,
2230    def: ScalarBuffer<u16>,
2231    repdef_starts: Vec<usize>,
2232    data_starts: Vec<usize>,
2233    offset_starts: Vec<usize>,
2234    visible_item_counts: Vec<u64>,
2235    bits_per_offset: u8,
2236    current_idx: usize,
2237    num_rows: u64,
2238}
2239
2240impl VariableFullZipDecoder {
2241    fn new(
2242        details: Arc<FullZipDecodeDetails>,
2243        data: VecDeque<LanceBuffer>,
2244        num_rows: u64,
2245        in_bits_per_length: u8,
2246        out_bits_per_offset: u8,
2247    ) -> Self {
2248        let decompressor = match details.value_decompressor {
2249            PerValueDecompressor::Variable(ref d) => d.clone(),
2250            _ => unreachable!(),
2251        };
2252
2253        assert_eq!(in_bits_per_length % 8, 0);
2254        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2255
2256        let mut decoder = Self {
2257            details,
2258            decompressor,
2259            data: LanceBuffer::empty(),
2260            offsets: LanceBuffer::empty(),
2261            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2262            def: LanceBuffer::empty().borrow_to_typed_slice(),
2263            bits_per_offset: out_bits_per_offset,
2264            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2265            data_starts: Vec::with_capacity(num_rows as usize + 1),
2266            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2267            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2268            current_idx: 0,
2269            num_rows,
2270        };
2271
2272        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2273        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2274        // unpacked to know where the values start and end.
2275        //
2276        // We don't want to unzip on the decode thread because that is a single-threaded path
2277        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2278        //
2279        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2280        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2281        // we load all the data into memory and then throw it away only to load it all into memory again during
2282        // the decode.
2283        //
2284        // There are some alternatives to investigate:
2285        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2286        //     range in between.  This will give us the break points that we need for slicing and won't increase
2287        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2288        //     even when doing a full scan.
2289        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2290        //     has to do more work but the work is all fused.
2291        //   - We could just try doing this work on the decode thread and see if it is a problem.
2292        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2293
2294        decoder
2295    }
2296
2297    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2298        match bits_per_offset {
2299            8 => *data.get_unchecked(0) as u64,
2300            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2301            32 => u32::from_le_bytes([
2302                *data.get_unchecked(0),
2303                *data.get_unchecked(1),
2304                *data.get_unchecked(2),
2305                *data.get_unchecked(3),
2306            ]) as u64,
2307            64 => u64::from_le_bytes([
2308                *data.get_unchecked(0),
2309                *data.get_unchecked(1),
2310                *data.get_unchecked(2),
2311                *data.get_unchecked(3),
2312                *data.get_unchecked(4),
2313                *data.get_unchecked(5),
2314                *data.get_unchecked(6),
2315                *data.get_unchecked(7),
2316            ]),
2317            _ => unreachable!(),
2318        }
2319    }
2320
2321    fn unzip(
2322        &mut self,
2323        data: VecDeque<LanceBuffer>,
2324        in_bits_per_length: u8,
2325        out_bits_per_offset: u8,
2326        num_rows: u64,
2327    ) {
2328        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2329        let mut rep = Vec::with_capacity(num_rows as usize);
2330        let mut def = Vec::with_capacity(num_rows as usize);
2331        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2332
2333        // This undercounts if there are lists
2334        // It can also overcount if there are invisible items
2335        let bytes_per_offset = out_bits_per_offset as usize / 8;
2336        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2337        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2338
2339        let bytes_per_length = in_bits_per_length as usize / 8;
2340        let bytes_lengths = bytes_per_length * num_rows as usize;
2341
2342        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2343        // This overcounts since bytes_lengths and bytes_cw are undercounts
2344        // It can also undercount if there are invisible items (hence the saturating_sub)
2345        let mut unzipped_data =
2346            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2347
2348        let mut current_offset = 0_u64;
2349        let mut visible_item_count = 0_u64;
2350        for databuf in data.into_iter() {
2351            let mut databuf = databuf.as_ref();
2352            while !databuf.is_empty() {
2353                let data_start = unzipped_data.len();
2354                let offset_start = offsets_data.len();
2355                // We might have only-rep or only-def, neither, or both.  They move at the same
2356                // speed though so we only need one index into it
2357                let repdef_start = rep.len().max(def.len());
2358                // TODO: Kind of inefficient we parse the control word twice here
2359                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2360                    databuf,
2361                    self.details.max_rep,
2362                    self.details.max_visible_def,
2363                );
2364                self.details
2365                    .ctrl_word_parser
2366                    .parse(databuf, &mut rep, &mut def);
2367                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2368
2369                if ctrl_desc.is_new_row {
2370                    self.repdef_starts.push(repdef_start);
2371                    self.data_starts.push(data_start);
2372                    self.offset_starts.push(offset_start);
2373                    self.visible_item_counts.push(visible_item_count);
2374                }
2375                if ctrl_desc.is_visible {
2376                    visible_item_count += 1;
2377                    if ctrl_desc.is_valid_item {
2378                        // Safety: Data should have at least bytes_per_length bytes remaining
2379                        debug_assert!(databuf.len() >= bytes_per_length);
2380                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2381                        match out_bits_per_offset {
2382                            32 => offsets_data
2383                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2384                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2385                            _ => unreachable!(),
2386                        };
2387                        databuf = &databuf[bytes_per_offset..];
2388                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2389                        databuf = &databuf[length as usize..];
2390                        current_offset += length;
2391                    } else {
2392                        // Null items still get an offset
2393                        match out_bits_per_offset {
2394                            32 => offsets_data
2395                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2396                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2397                            _ => unreachable!(),
2398                        }
2399                    }
2400                }
2401            }
2402        }
2403        self.repdef_starts.push(rep.len().max(def.len()));
2404        self.data_starts.push(unzipped_data.len());
2405        self.offset_starts.push(offsets_data.len());
2406        self.visible_item_counts.push(visible_item_count);
2407        match out_bits_per_offset {
2408            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2409            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2410            _ => unreachable!(),
2411        };
2412        self.rep = ScalarBuffer::from(rep);
2413        self.def = ScalarBuffer::from(def);
2414        self.data = LanceBuffer::Owned(unzipped_data);
2415        self.offsets = LanceBuffer::Owned(offsets_data);
2416    }
2417}
2418
2419impl StructuralPageDecoder for VariableFullZipDecoder {
2420    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2421        let start = self.current_idx;
2422        let end = start + num_rows as usize;
2423
2424        // This might seem a little peculiar.  We are returning the entire data for every single
2425        // batch.  This is because the offsets are relative to the start of the data.  In other words
2426        // imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
2427        // and we return in batches of two.  The second set of offsets will be [20, 30, 40].
2428        //
2429        // So either we pay for a copy to normalize the offsets or we just return the entire data buffer
2430        // which is slightly cheaper.
2431        let data = self.data.borrow_and_clone();
2432
2433        let offset_start = self.offset_starts[start];
2434        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2435        let offsets = self
2436            .offsets
2437            .slice_with_length(offset_start, offset_end - offset_start);
2438
2439        let repdef_start = self.repdef_starts[start];
2440        let repdef_end = self.repdef_starts[end];
2441        let rep = if self.rep.is_empty() {
2442            self.rep.clone()
2443        } else {
2444            self.rep.slice(repdef_start, repdef_end - repdef_start)
2445        };
2446        let def = if self.def.is_empty() {
2447            self.def.clone()
2448        } else {
2449            self.def.slice(repdef_start, repdef_end - repdef_start)
2450        };
2451
2452        let visible_item_counts_start = self.visible_item_counts[start];
2453        let visible_item_counts_end = self.visible_item_counts[end];
2454        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2455
2456        self.current_idx += num_rows as usize;
2457
2458        Ok(Box::new(VariableFullZipDecodeTask {
2459            details: self.details.clone(),
2460            decompressor: self.decompressor.clone(),
2461            data,
2462            offsets,
2463            bits_per_offset: self.bits_per_offset,
2464            num_visible_items,
2465            rep,
2466            def,
2467        }))
2468    }
2469
2470    fn num_rows(&self) -> u64 {
2471        self.num_rows
2472    }
2473}
2474
2475#[derive(Debug)]
2476struct VariableFullZipDecodeTask {
2477    details: Arc<FullZipDecodeDetails>,
2478    decompressor: Arc<dyn VariablePerValueDecompressor>,
2479    data: LanceBuffer,
2480    offsets: LanceBuffer,
2481    bits_per_offset: u8,
2482    num_visible_items: u64,
2483    rep: ScalarBuffer<u16>,
2484    def: ScalarBuffer<u16>,
2485}
2486
2487impl DecodePageTask for VariableFullZipDecodeTask {
2488    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2489        let block = VariableWidthBlock {
2490            data: self.data,
2491            offsets: self.offsets,
2492            bits_per_offset: self.bits_per_offset,
2493            num_values: self.num_visible_items,
2494            block_info: BlockInfo::new(),
2495        };
2496        let decomopressed = self.decompressor.decompress(block)?;
2497        let rep = self.rep.to_vec();
2498        let def = self.def.to_vec();
2499        let unraveler =
2500            RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2501        Ok(DecodedPage {
2502            data: decomopressed,
2503            repdef: unraveler,
2504        })
2505    }
2506}
2507
2508#[derive(Debug)]
2509struct FullZipDecodeTaskItem {
2510    data: PerValueDataBlock,
2511    rows_in_buf: u64,
2512}
2513
2514/// A task to unzip and decompress full-zip encoded data when that data
2515/// has a fixed-width.
2516#[derive(Debug)]
2517struct FixedFullZipDecodeTask {
2518    details: Arc<FullZipDecodeDetails>,
2519    data: Vec<FullZipDecodeTaskItem>,
2520    num_rows: usize,
2521    bytes_per_value: usize,
2522}
2523
2524impl DecodePageTask for FixedFullZipDecodeTask {
2525    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2526        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2527        let estimated_size_bytes = self
2528            .data
2529            .iter()
2530            .map(|task_item| task_item.data.data_size() as usize)
2531            .sum::<usize>()
2532            * 2;
2533        let mut data_builder =
2534            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2535
2536        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2537            // Fast path, no need to unzip because there is no rep/def
2538            //
2539            // We decompress each buffer and add it to our output buffer
2540            for task_item in self.data.into_iter() {
2541                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2542                    unreachable!()
2543                };
2544                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2545                else {
2546                    unreachable!()
2547                };
2548                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2549                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2550                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2551            }
2552
2553            let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2554
2555            Ok(DecodedPage {
2556                data: data_builder.finish(),
2557                repdef: unraveler,
2558            })
2559        } else {
2560            // Slow path, unzipping needed
2561            let mut rep = Vec::with_capacity(self.num_rows);
2562            let mut def = Vec::with_capacity(self.num_rows);
2563
2564            for task_item in self.data.into_iter() {
2565                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2566                    unreachable!()
2567                };
2568                let mut buf_slice = fixed_data.data.as_ref();
2569                let num_values = fixed_data.num_values as usize;
2570                // We will be unzipping repdef in to `rep` and `def` and the
2571                // values into `values` (which contains the compressed values)
2572                let mut values = Vec::with_capacity(
2573                    fixed_data.data.len()
2574                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2575                );
2576                let mut visible_items = 0;
2577                for _ in 0..num_values {
2578                    // Extract rep/def
2579                    self.details
2580                        .ctrl_word_parser
2581                        .parse(buf_slice, &mut rep, &mut def);
2582                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2583
2584                    let is_visible = def
2585                        .last()
2586                        .map(|d| *d <= self.details.max_visible_def)
2587                        .unwrap_or(true);
2588                    if is_visible {
2589                        // Extract value
2590                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2591                        buf_slice = &buf_slice[self.bytes_per_value..];
2592                        visible_items += 1;
2593                    }
2594                }
2595
2596                // Finally, we decompress the values and add them to our output buffer
2597                let values_buf = LanceBuffer::Owned(values);
2598                let fixed_data = FixedWidthDataBlock {
2599                    bits_per_value: self.bytes_per_value as u64 * 8,
2600                    block_info: BlockInfo::new(),
2601                    data: values_buf,
2602                    num_values: visible_items,
2603                };
2604                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2605                else {
2606                    unreachable!()
2607                };
2608                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2609                data_builder.append(&decompressed, 0..visible_items);
2610            }
2611
2612            let repetition = if rep.is_empty() { None } else { Some(rep) };
2613            let definition = if def.is_empty() { None } else { Some(def) };
2614
2615            let unraveler =
2616                RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2617            let data = data_builder.finish();
2618
2619            Ok(DecodedPage {
2620                data,
2621                repdef: unraveler,
2622            })
2623        }
2624    }
2625}
2626
2627#[derive(Debug)]
2628struct StructuralPrimitiveFieldSchedulingJob<'a> {
2629    scheduler: &'a StructuralPrimitiveFieldScheduler,
2630    ranges: Vec<Range<u64>>,
2631    page_idx: usize,
2632    range_idx: usize,
2633    global_row_offset: u64,
2634}
2635
2636impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2637    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2638        Self {
2639            scheduler,
2640            ranges,
2641            page_idx: 0,
2642            range_idx: 0,
2643            global_row_offset: 0,
2644        }
2645    }
2646}
2647
2648impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2649    fn schedule_next(
2650        &mut self,
2651        context: &mut SchedulerContext,
2652    ) -> Result<Option<ScheduledScanLine>> {
2653        if self.range_idx >= self.ranges.len() {
2654            return Ok(None);
2655        }
2656        // Get our current range
2657        let mut range = self.ranges[self.range_idx].clone();
2658        let priority = range.start;
2659
2660        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2661        trace!(
2662            "Current range is {:?} and current page has {} rows",
2663            range,
2664            cur_page.num_rows
2665        );
2666        // Skip entire pages until we have some overlap with our next range
2667        while cur_page.num_rows + self.global_row_offset <= range.start {
2668            self.global_row_offset += cur_page.num_rows;
2669            self.page_idx += 1;
2670            trace!("Skipping entire page of {} rows", cur_page.num_rows);
2671            cur_page = &self.scheduler.page_schedulers[self.page_idx];
2672        }
2673
2674        // Now the cur_page has overlap with range.  Continue looping through ranges
2675        // until we find a range that exceeds the current page
2676
2677        let mut ranges_in_page = Vec::new();
2678        while cur_page.num_rows + self.global_row_offset > range.start {
2679            range.start = range.start.max(self.global_row_offset);
2680            let start_in_page = range.start - self.global_row_offset;
2681            let end_in_page = start_in_page + (range.end - range.start);
2682            let end_in_page = end_in_page.min(cur_page.num_rows);
2683            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2684
2685            ranges_in_page.push(start_in_page..end_in_page);
2686            if last_in_range {
2687                self.range_idx += 1;
2688                if self.range_idx == self.ranges.len() {
2689                    break;
2690                }
2691                range = self.ranges[self.range_idx].clone();
2692            } else {
2693                break;
2694            }
2695        }
2696
2697        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2698        trace!(
2699            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2700            num_rows_in_next,
2701            ranges_in_page.len(),
2702            cur_page.num_rows,
2703            priority,
2704            self.scheduler.column_index,
2705            cur_page.page_index,
2706        );
2707
2708        self.global_row_offset += cur_page.num_rows;
2709        self.page_idx += 1;
2710
2711        let page_decoder = cur_page
2712            .scheduler
2713            .schedule_ranges(&ranges_in_page, context.io())?;
2714
2715        let cur_path = context.current_path();
2716        let page_index = cur_page.page_index;
2717        let unloaded_page = async move {
2718            let page_decoder = page_decoder.await?;
2719            Ok(LoadedPage {
2720                decoder: page_decoder,
2721                path: cur_path,
2722                page_index,
2723            })
2724        }
2725        .boxed();
2726
2727        Ok(Some(ScheduledScanLine {
2728            decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2729            rows_scheduled: num_rows_in_next,
2730        }))
2731    }
2732}
2733
2734#[derive(Debug)]
2735struct PageInfoAndScheduler {
2736    page_index: usize,
2737    num_rows: u64,
2738    scheduler: Box<dyn StructuralPageScheduler>,
2739}
2740
2741/// A scheduler for a leaf node
2742///
2743/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
2744/// appropriate for the layout of the page.
2745#[derive(Debug)]
2746pub struct StructuralPrimitiveFieldScheduler {
2747    page_schedulers: Vec<PageInfoAndScheduler>,
2748    column_index: u32,
2749}
2750
2751impl StructuralPrimitiveFieldScheduler {
2752    pub fn try_new(
2753        column_info: &ColumnInfo,
2754        decompressors: &dyn DecompressionStrategy,
2755    ) -> Result<Self> {
2756        let page_schedulers = column_info
2757            .page_infos
2758            .iter()
2759            .enumerate()
2760            .map(|(page_index, page_info)| {
2761                Self::page_info_to_scheduler(
2762                    page_info,
2763                    page_index,
2764                    column_info.index as usize,
2765                    decompressors,
2766                )
2767            })
2768            .collect::<Result<Vec<_>>>()?;
2769        Ok(Self {
2770            page_schedulers,
2771            column_index: column_info.index,
2772        })
2773    }
2774
2775    fn page_info_to_scheduler(
2776        page_info: &PageInfo,
2777        page_index: usize,
2778        _column_index: usize,
2779        decompressors: &dyn DecompressionStrategy,
2780    ) -> Result<PageInfoAndScheduler> {
2781        let scheduler: Box<dyn StructuralPageScheduler> =
2782            match page_info.encoding.as_structural().layout.as_ref() {
2783                Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2784                    Box::new(MiniBlockScheduler::try_new(
2785                        &page_info.buffer_offsets_and_sizes,
2786                        page_info.priority,
2787                        mini_block.num_items,
2788                        mini_block,
2789                        decompressors,
2790                    )?)
2791                }
2792                Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2793                    Box::new(FullZipScheduler::try_new(
2794                        &page_info.buffer_offsets_and_sizes,
2795                        page_info.priority,
2796                        page_info.num_rows,
2797                        full_zip,
2798                        decompressors,
2799                    )?)
2800                }
2801                Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2802                    let def_meaning = all_null
2803                        .layers
2804                        .iter()
2805                        .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2806                        .collect::<Vec<_>>();
2807                    if def_meaning.len() == 1
2808                        && def_meaning[0] == DefinitionInterpretation::NullableItem
2809                    {
2810                        Box::new(SimpleAllNullScheduler::default())
2811                            as Box<dyn StructuralPageScheduler>
2812                    } else {
2813                        Box::new(ComplexAllNullScheduler::new(
2814                            page_info.buffer_offsets_and_sizes.clone(),
2815                            def_meaning.into(),
2816                        )) as Box<dyn StructuralPageScheduler>
2817                    }
2818                }
2819                _ => todo!(),
2820            };
2821        Ok(PageInfoAndScheduler {
2822            page_index,
2823            num_rows: page_info.num_rows,
2824            scheduler,
2825        })
2826    }
2827}
2828
2829pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2830    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2831}
2832
2833pub struct NoCachedPageData;
2834
2835impl DeepSizeOf for NoCachedPageData {
2836    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2837        0
2838    }
2839}
2840impl CachedPageData for NoCachedPageData {
2841    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2842        self
2843    }
2844}
2845
2846pub struct CachedFieldData {
2847    pages: Vec<Arc<dyn CachedPageData>>,
2848}
2849
2850impl DeepSizeOf for CachedFieldData {
2851    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2852        self.pages.deep_size_of_children(ctx)
2853    }
2854}
2855
2856impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2857    fn initialize<'a>(
2858        &'a mut self,
2859        _filter: &'a FilterExpression,
2860        context: &'a SchedulerContext,
2861    ) -> BoxFuture<'a, Result<()>> {
2862        let cache_key = self.column_index.to_string();
2863        if let Some(cached_data) = context.cache().get::<CachedFieldData>(&cache_key) {
2864            self.page_schedulers
2865                .iter_mut()
2866                .zip(cached_data.pages.iter())
2867                .for_each(|(page_scheduler, cached_data)| {
2868                    page_scheduler.scheduler.load(cached_data);
2869                });
2870            return std::future::ready(Ok(())).boxed();
2871        };
2872
2873        let cache = context.cache().clone();
2874        let page_data = self
2875            .page_schedulers
2876            .iter_mut()
2877            .map(|s| s.scheduler.initialize(context.io()))
2878            .collect::<FuturesOrdered<_>>();
2879
2880        async move {
2881            let page_data = page_data.try_collect::<Vec<_>>().await?;
2882            let cached_data = Arc::new(CachedFieldData { pages: page_data });
2883            cache.insert::<CachedFieldData>(&cache_key, cached_data);
2884            Ok(())
2885        }
2886        .boxed()
2887    }
2888
2889    fn schedule_ranges<'a>(
2890        &'a self,
2891        ranges: &[Range<u64>],
2892        _filter: &FilterExpression,
2893    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2894        let ranges = ranges.to_vec();
2895        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2896            self, ranges,
2897        )))
2898    }
2899}
2900
2901/// Takes the output from several pages decoders and
2902/// concatenates them.
2903#[derive(Debug)]
2904pub struct StructuralCompositeDecodeArrayTask {
2905    tasks: Vec<Box<dyn DecodePageTask>>,
2906    should_validate: bool,
2907    data_type: DataType,
2908}
2909
2910impl StructuralCompositeDecodeArrayTask {
2911    fn restore_validity(
2912        array: Arc<dyn Array>,
2913        unraveler: &mut CompositeRepDefUnraveler,
2914    ) -> Arc<dyn Array> {
2915        let validity = unraveler.unravel_validity(array.len());
2916        let Some(validity) = validity else {
2917            return array;
2918        };
2919        if array.data_type() == &DataType::Null {
2920            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
2921            return array;
2922        }
2923        assert_eq!(validity.len(), array.len());
2924        // SAFETY: We've should have already asserted the buffers are all valid, we are just
2925        // adding null buffers to the array here
2926        make_array(unsafe {
2927            array
2928                .to_data()
2929                .into_builder()
2930                .nulls(Some(validity))
2931                .build_unchecked()
2932        })
2933    }
2934}
2935
2936impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
2937    fn decode(self: Box<Self>) -> Result<DecodedArray> {
2938        let mut arrays = Vec::with_capacity(self.tasks.len());
2939        let mut unravelers = Vec::with_capacity(self.tasks.len());
2940        for task in self.tasks {
2941            let decoded = task.decode()?;
2942            unravelers.push(decoded.repdef);
2943
2944            let array = make_array(
2945                decoded
2946                    .data
2947                    .into_arrow(self.data_type.clone(), self.should_validate)?,
2948            );
2949
2950            arrays.push(array);
2951        }
2952        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
2953        let array = arrow_select::concat::concat(&array_refs)?;
2954        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
2955
2956        let array = Self::restore_validity(array, &mut repdef);
2957
2958        Ok(DecodedArray { array, repdef })
2959    }
2960}
2961
2962#[derive(Debug)]
2963pub struct StructuralPrimitiveFieldDecoder {
2964    field: Arc<ArrowField>,
2965    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
2966    should_validate: bool,
2967    rows_drained_in_current: u64,
2968}
2969
2970impl StructuralPrimitiveFieldDecoder {
2971    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
2972        Self {
2973            field: field.clone(),
2974            page_decoders: VecDeque::new(),
2975            should_validate,
2976            rows_drained_in_current: 0,
2977        }
2978    }
2979}
2980
2981impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
2982    fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
2983        assert!(child.path.is_empty());
2984        self.page_decoders.push_back(child.decoder);
2985        Ok(())
2986    }
2987
2988    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
2989        let mut remaining = num_rows;
2990        let mut tasks = Vec::new();
2991        while remaining > 0 {
2992            let cur_page = self.page_decoders.front_mut().unwrap();
2993            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
2994            let to_take = num_in_page.min(remaining);
2995
2996            let task = cur_page.drain(to_take)?;
2997            tasks.push(task);
2998
2999            if to_take == num_in_page {
3000                self.page_decoders.pop_front();
3001                self.rows_drained_in_current = 0;
3002            } else {
3003                self.rows_drained_in_current += to_take;
3004            }
3005
3006            remaining -= to_take;
3007        }
3008        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3009            tasks,
3010            should_validate: self.should_validate,
3011            data_type: self.field.data_type().clone(),
3012        }))
3013    }
3014
3015    fn data_type(&self) -> &DataType {
3016        self.field.data_type()
3017    }
3018}
3019
3020/// The serialized representation of full-zip data
3021struct SerializedFullZip {
3022    /// The zipped values buffer
3023    values: LanceBuffer,
3024    /// The repetition index (only present if there is repetition)
3025    repetition_index: Option<LanceBuffer>,
3026}
3027
3028// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3029// to allow us to store a chunk size in 12 bits.
3030//
3031// If we directly record the size in bytes with 12 bits we would be limited to
3032// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3033// words we can store the # of words instead which gives us 32KiB.  We want
3034// at least 24KiB so we can handle even the worst case of
3035// - 4Ki values compressed into an 8186 byte buffer
3036// - 4 bytes to describe rep & def lengths
3037// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3038//   plan for it)
3039//
3040// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3041// values like offsets to be stored in a mini-block and safely read back out.  It also
3042// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3043// needed).
3044//
3045// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3046// the page" and "aligned to 8 bytes from the start of the file."
3047const MINIBLOCK_ALIGNMENT: usize = 8;
3048
3049/// An encoder for primitive (leaf) arrays
3050///
3051/// This encoder is fairly complicated and follows a number of paths depending
3052/// on the data.
3053///
3054/// First, we convert the validity & offsets information into repetition and
3055/// definition levels.  Then we compress the data itself into a single buffer.
3056///
3057/// If the data is narrow then we encode the data in small chunks (each chunk
3058/// should be a few disk sectors and contains a buffer of repetition, a buffer
3059/// of definition, and a buffer of value data).  This approach is called
3060/// "mini-block".  These mini-blocks are stored into a single data buffer.
3061///
3062/// If the data is wide then we zip together the repetition and definition value
3063/// with the value data into a single buffer.  This approach is called "zipped".
3064///
3065/// If there is any repetition information then we create a repetition index (TODO)
3066///
3067/// In addition, the compression process may create zero or more metadata buffers.
3068/// For example, a dictionary compression will create dictionary metadata.  Any
3069/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3070/// stored in a separate buffer on disk and read at initialization time.
3071///
3072/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3073/// at (roughly) the end of the file so there is, at most, one read per column of
3074/// metadata per file.
3075pub struct PrimitiveStructuralEncoder {
3076    // Accumulates arrays until we have enough data to justify a disk page
3077    accumulation_queue: AccumulationQueue,
3078
3079    keep_original_array: bool,
3080    accumulated_repdefs: Vec<RepDefBuilder>,
3081    // The compression strategy we will use to compress the data
3082    compression_strategy: Arc<dyn CompressionStrategy>,
3083    column_index: u32,
3084    field: Field,
3085    encoding_metadata: Arc<HashMap<String, String>>,
3086}
3087
3088struct CompressedLevelsChunk {
3089    data: LanceBuffer,
3090    num_levels: u16,
3091}
3092
3093struct CompressedLevels {
3094    data: Vec<CompressedLevelsChunk>,
3095    compression: pb::ArrayEncoding,
3096    rep_index: Option<LanceBuffer>,
3097}
3098
3099struct SerializedMiniBlockPage {
3100    num_buffers: u64,
3101    data: LanceBuffer,
3102    metadata: LanceBuffer,
3103}
3104
3105impl PrimitiveStructuralEncoder {
3106    pub fn try_new(
3107        options: &EncodingOptions,
3108        compression_strategy: Arc<dyn CompressionStrategy>,
3109        column_index: u32,
3110        field: Field,
3111        encoding_metadata: Arc<HashMap<String, String>>,
3112    ) -> Result<Self> {
3113        Ok(Self {
3114            accumulation_queue: AccumulationQueue::new(
3115                options.cache_bytes_per_column,
3116                column_index,
3117                options.keep_original_array,
3118            ),
3119            keep_original_array: options.keep_original_array,
3120            accumulated_repdefs: Vec::new(),
3121            column_index,
3122            compression_strategy,
3123            field,
3124            encoding_metadata,
3125        })
3126    }
3127
3128    // TODO: This is a heuristic we may need to tune at some point
3129    //
3130    // As data gets narrow then the "zipping" process gets too expensive
3131    //   and we prefer mini-block
3132    // As data gets wide then the # of values per block shrinks (very wide)
3133    //   data doesn't even fit in a mini-block and the block overhead gets
3134    //   too large and we prefer zipped.
3135    fn is_narrow(data_block: &DataBlock) -> bool {
3136        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3137
3138        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3139            let max_len_array = max_len_array
3140                .as_any()
3141                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3142                .unwrap();
3143            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3144                return true;
3145            }
3146        }
3147        false
3148    }
3149
3150    fn prefers_miniblock(
3151        data_block: &DataBlock,
3152        encoding_metadata: &HashMap<String, String>,
3153    ) -> bool {
3154        // If the user specifically requested miniblock then use it
3155        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3156            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3157        }
3158        // Otherwise only use miniblock if it is narrow
3159        Self::is_narrow(data_block)
3160    }
3161
3162    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3163        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3164        // user specifically requested not to use it (in which case we're probably going
3165        // to emit an error)
3166        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3167            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3168        }
3169        true
3170    }
3171
3172    // Converts value data, repetition levels, and definition levels into a single
3173    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3174    // which tells us the size of each block.  Finally, if repetition is present then
3175    // we also create a buffer for the repetition index.
3176    //
3177    // Each chunk is serialized as:
3178    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3179    //
3180    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3181    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3182    //
3183    // Each block has a u16 word of metadata.  The upper 12 bits contain 1/6 the
3184    // # of bytes in the block (if the block does not have an even number of bytes
3185    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3186    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3187    // 0xA)  All blocks except the last must have power-of-two number of values.
3188    // This not only makes metadata smaller but it makes decoding easier since
3189    // batch sizes are typically a power of 2.  4 bits would allow us to express
3190    // up to 16Ki values but we restrict this further to 4Ki values.
3191    //
3192    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3193    //
3194    // All metadata words are serialized (as little endian) into a single buffer
3195    // of metadata values.
3196    //
3197    // If there is repetition then we also create a repetition index.  This is a
3198    // single buffer of integer vectors (stored in row major order).  There is one
3199    // entry for each chunk.  The size of the vector is based on the depth of random
3200    // access we want to support.
3201    //
3202    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3203    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3204    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3205    // levels of nested access and so on.
3206    //
3207    // The first number in the vector is the number of top-level rows that complete in
3208    // the chunk.  The second number is the number of second-level rows that complete
3209    // after the final top-level row completed (or beginning of the chunk if no top-level
3210    // row completes in the chunk).  And so on.  The final number in the vector is always
3211    // the number of leftover items not covered by earlier entries in the vector.
3212    //
3213    // Currently we are limited to 0 levels of nested access but that will change in the
3214    // future.
3215    //
3216    // The repetition index and the chunk metadata are read at initialization time and
3217    // cached in memory.
3218    fn serialize_miniblocks(
3219        miniblocks: MiniBlockCompressed,
3220        rep: Option<Vec<CompressedLevelsChunk>>,
3221        def: Option<Vec<CompressedLevelsChunk>>,
3222    ) -> SerializedMiniBlockPage {
3223        let bytes_rep = rep
3224            .as_ref()
3225            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3226            .unwrap_or(0);
3227        let bytes_def = def
3228            .as_ref()
3229            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3230            .unwrap_or(0);
3231        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3232        let mut num_buffers = miniblocks.data.len();
3233        if rep.is_some() {
3234            num_buffers += 1;
3235        }
3236        if def.is_some() {
3237            num_buffers += 1;
3238        }
3239        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3240        let max_extra = 9 * num_buffers;
3241        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3242        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3243
3244        let mut rep_iter = rep.map(|r| r.into_iter());
3245        let mut def_iter = def.map(|d| d.into_iter());
3246
3247        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3248        for chunk in miniblocks.chunks {
3249            let start_pos = data_buffer.len();
3250            // Start of chunk should be aligned
3251            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3252
3253            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3254            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3255
3256            // Write the number of levels, or 0 if there is no rep/def
3257            let num_levels = rep
3258                .as_ref()
3259                .map(|r| r.num_levels)
3260                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3261            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3262
3263            // Write the buffer lengths
3264            if let Some(rep) = rep.as_ref() {
3265                let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3266                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3267            }
3268            if let Some(def) = def.as_ref() {
3269                let bytes_def = u16::try_from(def.data.len()).unwrap();
3270                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3271            }
3272
3273            for buffer_size in &chunk.buffer_sizes {
3274                let bytes = *buffer_size;
3275                data_buffer.extend_from_slice(&bytes.to_le_bytes());
3276            }
3277
3278            // Pad
3279            let add_padding = |data_buffer: &mut Vec<u8>| {
3280                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3281                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3282            };
3283            add_padding(&mut data_buffer);
3284
3285            // Write the buffers themselves
3286            if let Some(rep) = rep.as_ref() {
3287                data_buffer.extend_from_slice(&rep.data);
3288                add_padding(&mut data_buffer);
3289            }
3290            if let Some(def) = def.as_ref() {
3291                data_buffer.extend_from_slice(&def.data);
3292                add_padding(&mut data_buffer);
3293            }
3294            for (buffer_size, (buffer, buffer_offset)) in chunk
3295                .buffer_sizes
3296                .iter()
3297                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3298            {
3299                let start = *buffer_offset;
3300                let end = start + *buffer_size as usize;
3301                *buffer_offset += *buffer_size as usize;
3302                data_buffer.extend_from_slice(&buffer[start..end]);
3303                add_padding(&mut data_buffer);
3304            }
3305
3306            let chunk_bytes = data_buffer.len() - start_pos;
3307            assert!(chunk_bytes <= 16 * 1024);
3308            assert!(chunk_bytes > 0);
3309            assert_eq!(chunk_bytes % 8, 0);
3310            // We subtract 1 here from chunk_bytes because we want to be able to express
3311            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3312            // 0xFFF
3313            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3314            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3315
3316            let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3317            meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3318        }
3319
3320        let data_buffer = LanceBuffer::Owned(data_buffer);
3321        let metadata_buffer = LanceBuffer::Owned(meta_buffer);
3322
3323        SerializedMiniBlockPage {
3324            num_buffers: miniblocks.data.len() as u64,
3325            data: data_buffer,
3326            metadata: metadata_buffer,
3327        }
3328    }
3329
3330    /// Compresses a buffer of levels into chunks
3331    ///
3332    /// If these are repetition levels then we also calculate the repetition index here (that
3333    /// is the third return value)
3334    fn compress_levels(
3335        mut levels: RepDefSlicer<'_>,
3336        num_elements: u64,
3337        compression_strategy: &dyn CompressionStrategy,
3338        chunks: &[MiniBlockChunk],
3339        // This will be 0 if we are compressing def levels
3340        max_rep: u16,
3341    ) -> Result<CompressedLevels> {
3342        let mut rep_index = if max_rep > 0 {
3343            Vec::with_capacity(chunks.len())
3344        } else {
3345            vec![]
3346        };
3347        // Make the levels into a FixedWidth data block
3348        let num_levels = levels.num_levels() as u64;
3349        let mut levels_buf = levels.all_levels().try_clone().unwrap();
3350        let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3351            data: levels_buf.borrow_and_clone(),
3352            bits_per_value: 16,
3353            num_values: num_levels,
3354            block_info: BlockInfo::new(),
3355        });
3356        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3357        // Pick a block compressor
3358        let (compressor, compressor_desc) =
3359            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3360        // Compress blocks of levels (sized according to the chunks)
3361        let mut level_chunks = Vec::with_capacity(chunks.len());
3362        let mut values_counter = 0;
3363        for (chunk_idx, chunk) in chunks.iter().enumerate() {
3364            let chunk_num_values = chunk.num_values(values_counter, num_elements);
3365            values_counter += chunk_num_values;
3366            let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3367                levels.slice_next(chunk_num_values as usize)
3368            } else {
3369                levels.slice_rest()
3370            };
3371            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3372            if max_rep > 0 {
3373                // If max_rep > 0 then we are working with rep levels and we need
3374                // to calculate the repetition index.  The repetition index for a
3375                // chunk is currently 2 values (in the future it may be more).
3376                //
3377                // The first value is the number of rows that _finish_ in the
3378                // chunk.
3379                //
3380                // The second value is the number of "leftovers" after the last
3381                // finished row in the chunk.
3382                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3383                let rep_values = rep_values.as_ref();
3384
3385                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3386                // will count it in the previous chunk)
3387                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3388                let num_leftovers = if chunk_idx < chunks.len() - 1 {
3389                    rep_values
3390                        .iter()
3391                        .rev()
3392                        .position(|v| *v == max_rep)
3393                        // # of leftovers includes the max_rep spot
3394                        .map(|pos| pos + 1)
3395                        .unwrap_or(rep_values.len())
3396                } else {
3397                    // Last chunk can't have leftovers
3398                    0
3399                };
3400
3401                if chunk_idx != 0 && rep_values[0] == max_rep {
3402                    // This chunk starts with a new row and so, if we thought we had leftovers
3403                    // in the previous chunk, we were mistaken
3404                    // TODO: Can use unchecked here
3405                    let rep_len = rep_index.len();
3406                    if rep_index[rep_len - 1] != 0 {
3407                        // We thought we had leftovers but that was actually a full row
3408                        rep_index[rep_len - 2] += 1;
3409                        rep_index[rep_len - 1] = 0;
3410                    }
3411                }
3412
3413                if chunk_idx == chunks.len() - 1 {
3414                    // The final list
3415                    num_rows += 1;
3416                }
3417                rep_index.push(num_rows as u64);
3418                rep_index.push(num_leftovers as u64);
3419            }
3420            let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3421                data: chunk_levels,
3422                bits_per_value: 16,
3423                num_values: num_chunk_levels,
3424                block_info: BlockInfo::new(),
3425            });
3426            let compressed_levels = compressor.compress(chunk_levels_block)?;
3427            level_chunks.push(CompressedLevelsChunk {
3428                data: compressed_levels,
3429                num_levels: num_chunk_levels as u16,
3430            });
3431        }
3432        debug_assert_eq!(levels.num_levels_remaining(), 0);
3433        let rep_index = if rep_index.is_empty() {
3434            None
3435        } else {
3436            Some(LanceBuffer::reinterpret_vec(rep_index))
3437        };
3438        Ok(CompressedLevels {
3439            data: level_chunks,
3440            compression: compressor_desc,
3441            rep_index,
3442        })
3443    }
3444
3445    fn encode_simple_all_null(
3446        column_idx: u32,
3447        num_rows: u64,
3448        row_number: u64,
3449    ) -> Result<EncodedPage> {
3450        let description = ProtobufUtils::simple_all_null_layout();
3451        Ok(EncodedPage {
3452            column_idx,
3453            data: vec![],
3454            description: PageEncoding::Structural(description),
3455            num_rows,
3456            row_number,
3457        })
3458    }
3459
3460    // Encodes a page where all values are null but we have rep/def
3461    // information that we need to store (e.g. to distinguish between
3462    // different kinds of null)
3463    fn encode_complex_all_null(
3464        column_idx: u32,
3465        repdefs: Vec<RepDefBuilder>,
3466        row_number: u64,
3467        num_rows: u64,
3468    ) -> Result<EncodedPage> {
3469        let repdef = RepDefBuilder::serialize(repdefs);
3470
3471        // TODO: Actually compress repdef
3472        let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3473            LanceBuffer::reinterpret_slice(rep.clone())
3474        } else {
3475            LanceBuffer::empty()
3476        };
3477
3478        let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3479            LanceBuffer::reinterpret_slice(def.clone())
3480        } else {
3481            LanceBuffer::empty()
3482        };
3483
3484        let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3485        Ok(EncodedPage {
3486            column_idx,
3487            data: vec![rep_bytes, def_bytes],
3488            description: PageEncoding::Structural(description),
3489            num_rows,
3490            row_number,
3491        })
3492    }
3493
3494    #[allow(clippy::too_many_arguments)]
3495    fn encode_miniblock(
3496        column_idx: u32,
3497        field: &Field,
3498        compression_strategy: &dyn CompressionStrategy,
3499        data: DataBlock,
3500        repdefs: Vec<RepDefBuilder>,
3501        row_number: u64,
3502        dictionary_data: Option<DataBlock>,
3503        num_rows: u64,
3504    ) -> Result<EncodedPage> {
3505        let repdef = RepDefBuilder::serialize(repdefs);
3506
3507        if let DataBlock::AllNull(_null_block) = data {
3508            // If we got here then all the data is null but we have rep/def information that
3509            // we need to store.
3510            todo!()
3511        }
3512
3513        let num_items = data.num_values();
3514
3515        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3516        let (compressed_data, value_encoding) = compressor.compress(data)?;
3517
3518        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3519
3520        let mut compressed_rep = repdef
3521            .rep_slicer()
3522            .map(|rep_slicer| {
3523                Self::compress_levels(
3524                    rep_slicer,
3525                    num_items,
3526                    compression_strategy,
3527                    &compressed_data.chunks,
3528                    max_rep,
3529                )
3530            })
3531            .transpose()?;
3532
3533        let (rep_index, rep_index_depth) =
3534            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3535                Some(rep_index) => (Some(rep_index.borrow_and_clone()), 1),
3536                None => (None, 0),
3537            };
3538
3539        let mut compressed_def = repdef
3540            .def_slicer()
3541            .map(|def_slicer| {
3542                Self::compress_levels(
3543                    def_slicer,
3544                    num_items,
3545                    compression_strategy,
3546                    &compressed_data.chunks,
3547                    /*max_rep=*/ 0,
3548                )
3549            })
3550            .transpose()?;
3551
3552        // TODO: Parquet sparsely encodes values here.  We could do the same but
3553        // then we won't have log2 values per chunk.  This means more metadata
3554        // and potentially more decoder asymmetry.  However, it may be worth
3555        // investigating at some point
3556
3557        let rep_data = compressed_rep
3558            .as_mut()
3559            .map(|cr| std::mem::take(&mut cr.data));
3560        let def_data = compressed_def
3561            .as_mut()
3562            .map(|cd| std::mem::take(&mut cd.data));
3563
3564        let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3565
3566        // Metadata, Data, Dictionary, (maybe) Repetition Index
3567        let mut data = Vec::with_capacity(4);
3568        data.push(serialized.metadata);
3569        data.push(serialized.data);
3570
3571        if let Some(dictionary_data) = dictionary_data {
3572            let num_dictionary_items = dictionary_data.num_values();
3573            // field in `create_block_compressor` is not used currently.
3574            let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3575
3576            let (compressor, dictionary_encoding) = compression_strategy
3577                .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3578            let dictionary_buffer = compressor.compress(dictionary_data)?;
3579
3580            data.push(dictionary_buffer);
3581            if let Some(rep_index) = rep_index {
3582                data.push(rep_index);
3583            }
3584
3585            let description = ProtobufUtils::miniblock_layout(
3586                compressed_rep.map(|cr| cr.compression),
3587                compressed_def.map(|cd| cd.compression),
3588                value_encoding,
3589                rep_index_depth,
3590                serialized.num_buffers,
3591                Some((dictionary_encoding, num_dictionary_items)),
3592                &repdef.def_meaning,
3593                num_items,
3594            );
3595            Ok(EncodedPage {
3596                num_rows,
3597                column_idx,
3598                data,
3599                description: PageEncoding::Structural(description),
3600                row_number,
3601            })
3602        } else {
3603            let description = ProtobufUtils::miniblock_layout(
3604                compressed_rep.map(|cr| cr.compression),
3605                compressed_def.map(|cd| cd.compression),
3606                value_encoding,
3607                rep_index_depth,
3608                serialized.num_buffers,
3609                None,
3610                &repdef.def_meaning,
3611                num_items,
3612            );
3613
3614            if let Some(mut rep_index) = rep_index {
3615                let view = rep_index.borrow_to_typed_slice::<u64>();
3616                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3617                debug_assert_eq!(total, num_rows);
3618
3619                data.push(rep_index);
3620            }
3621
3622            Ok(EncodedPage {
3623                num_rows,
3624                column_idx,
3625                data,
3626                description: PageEncoding::Structural(description),
3627                row_number,
3628            })
3629        }
3630    }
3631
3632    // For fixed-size data we encode < control word | data > for each value
3633    fn serialize_full_zip_fixed(
3634        fixed: FixedWidthDataBlock,
3635        mut repdef: ControlWordIterator,
3636        num_values: u64,
3637    ) -> SerializedFullZip {
3638        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3639        let mut zipped_data = Vec::with_capacity(len);
3640
3641        let max_rep_index_val = if repdef.has_repetition() {
3642            len as u64
3643        } else {
3644            // Setting this to 0 means we won't write a repetition index
3645            0
3646        };
3647        let mut rep_index_builder =
3648            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3649
3650        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
3651        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
3652        assert_eq!(
3653            fixed.bits_per_value % 8,
3654            0,
3655            "Non-byte aligned full-zip compression not yet supported"
3656        );
3657
3658        let bytes_per_value = fixed.bits_per_value as usize / 8;
3659
3660        let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3661        let mut offset = 0;
3662        while let Some(control) = repdef.append_next(&mut zipped_data) {
3663            if control.is_new_row {
3664                // We have finished a row
3665                debug_assert!(offset <= len);
3666                // SAFETY: We know that `start <= len`
3667                unsafe { rep_index_builder.append(offset as u64) };
3668            }
3669            if control.is_visible {
3670                let value = data_iter.next().unwrap();
3671                zipped_data.extend_from_slice(value);
3672            }
3673            offset = zipped_data.len();
3674        }
3675
3676        debug_assert_eq!(zipped_data.len(), len);
3677        // Put the final value in the rep index
3678        // SAFETY: `zipped_data.len() == len`
3679        unsafe {
3680            rep_index_builder.append(zipped_data.len() as u64);
3681        }
3682
3683        let zipped_data = LanceBuffer::Owned(zipped_data);
3684        let rep_index = rep_index_builder.into_data();
3685        let rep_index = if rep_index.is_empty() {
3686            None
3687        } else {
3688            Some(LanceBuffer::Owned(rep_index))
3689        };
3690        SerializedFullZip {
3691            values: zipped_data,
3692            repetition_index: rep_index,
3693        }
3694    }
3695
3696    // For variable-size data we encode < control word | length | data > for each value
3697    //
3698    // In addition, we create a second buffer, the repetition index
3699    fn serialize_full_zip_variable(
3700        mut variable: VariableWidthBlock,
3701        mut repdef: ControlWordIterator,
3702        num_items: u64,
3703    ) -> SerializedFullZip {
3704        let bytes_per_offset = variable.bits_per_offset as usize / 8;
3705        assert_eq!(
3706            variable.bits_per_offset % 8,
3707            0,
3708            "Only byte-aligned offsets supported"
3709        );
3710        let len = variable.data.len()
3711            + repdef.bytes_per_word() * num_items as usize
3712            + bytes_per_offset * variable.num_values as usize;
3713        let mut buf = Vec::with_capacity(len);
3714
3715        let max_rep_index_val = len as u64;
3716        let mut rep_index_builder =
3717            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
3718
3719        // TODO: byte pack the item lengths with varint encoding
3720        match bytes_per_offset {
3721            4 => {
3722                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
3723                let mut rep_offset = 0;
3724                let mut windows_iter = offs.as_ref().windows(2);
3725                while let Some(control) = repdef.append_next(&mut buf) {
3726                    if control.is_new_row {
3727                        // We have finished a row
3728                        debug_assert!(rep_offset <= len);
3729                        // SAFETY: We know that `buf.len() <= len`
3730                        unsafe { rep_index_builder.append(rep_offset as u64) };
3731                    }
3732                    if control.is_visible {
3733                        let window = windows_iter.next().unwrap();
3734                        if control.is_valid_item {
3735                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3736                            buf.extend_from_slice(
3737                                &variable.data[window[0] as usize..window[1] as usize],
3738                            );
3739                        }
3740                    }
3741                    rep_offset = buf.len();
3742                }
3743            }
3744            8 => {
3745                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
3746                let mut rep_offset = 0;
3747                let mut windows_iter = offs.as_ref().windows(2);
3748                while let Some(control) = repdef.append_next(&mut buf) {
3749                    if control.is_new_row {
3750                        // We have finished a row
3751                        debug_assert!(rep_offset <= len);
3752                        // SAFETY: We know that `buf.len() <= len`
3753                        unsafe { rep_index_builder.append(rep_offset as u64) };
3754                    }
3755                    if control.is_visible {
3756                        let window = windows_iter.next().unwrap();
3757                        if control.is_valid_item {
3758                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3759                            buf.extend_from_slice(
3760                                &variable.data[window[0] as usize..window[1] as usize],
3761                            );
3762                        }
3763                    }
3764                    rep_offset = buf.len();
3765                }
3766            }
3767            _ => panic!("Unsupported offset size"),
3768        }
3769
3770        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
3771        // if we are over `len` then we have a bug.
3772        debug_assert!(buf.len() <= len);
3773        // Put the final value in the rep index
3774        // SAFETY: `zipped_data.len() == len`
3775        unsafe {
3776            rep_index_builder.append(buf.len() as u64);
3777        }
3778
3779        let zipped_data = LanceBuffer::Owned(buf);
3780        let rep_index = rep_index_builder.into_data();
3781        debug_assert!(!rep_index.is_empty());
3782        let rep_index = Some(LanceBuffer::Owned(rep_index));
3783        SerializedFullZip {
3784            values: zipped_data,
3785            repetition_index: rep_index,
3786        }
3787    }
3788
3789    /// Serializes data into a single buffer according to the full-zip format which zips
3790    /// together the repetition, definition, and value data into a single buffer.
3791    fn serialize_full_zip(
3792        compressed_data: PerValueDataBlock,
3793        repdef: ControlWordIterator,
3794        num_items: u64,
3795    ) -> SerializedFullZip {
3796        match compressed_data {
3797            PerValueDataBlock::Fixed(fixed) => {
3798                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
3799            }
3800            PerValueDataBlock::Variable(var) => {
3801                Self::serialize_full_zip_variable(var, repdef, num_items)
3802            }
3803        }
3804    }
3805
3806    fn encode_full_zip(
3807        column_idx: u32,
3808        field: &Field,
3809        compression_strategy: &dyn CompressionStrategy,
3810        data: DataBlock,
3811        repdefs: Vec<RepDefBuilder>,
3812        row_number: u64,
3813        num_lists: u64,
3814    ) -> Result<EncodedPage> {
3815        let repdef = RepDefBuilder::serialize(repdefs);
3816        let max_rep = repdef
3817            .repetition_levels
3818            .as_ref()
3819            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
3820        let max_def = repdef
3821            .definition_levels
3822            .as_ref()
3823            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
3824
3825        // To handle FSL we just flatten
3826        // let data = data.flatten();
3827
3828        let (num_items, num_visible_items) =
3829            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
3830                // If there are rep levels there may be "invisible" items and we need to encode
3831                // rep_levels.len() things which might be larger than data.num_values()
3832                (rep_levels.len() as u64, data.num_values())
3833            } else {
3834                // If there are no rep levels then we encode data.num_values() things
3835                (data.num_values(), data.num_values())
3836            };
3837
3838        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
3839
3840        let repdef_iter = build_control_word_iterator(
3841            repdef.repetition_levels.as_deref(),
3842            max_rep,
3843            repdef.definition_levels.as_deref(),
3844            max_def,
3845            max_visible_def,
3846            num_items as usize,
3847        );
3848        let bits_rep = repdef_iter.bits_rep();
3849        let bits_def = repdef_iter.bits_def();
3850
3851        let compressor = compression_strategy.create_per_value(field, &data)?;
3852        let (compressed_data, value_encoding) = compressor.compress(data)?;
3853
3854        let description = match &compressed_data {
3855            PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
3856                bits_rep,
3857                bits_def,
3858                fixed.bits_per_value as u32,
3859                value_encoding,
3860                &repdef.def_meaning,
3861                num_items as u32,
3862                num_visible_items as u32,
3863            ),
3864            PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
3865                bits_rep,
3866                bits_def,
3867                variable.bits_per_offset as u32,
3868                value_encoding,
3869                &repdef.def_meaning,
3870                num_items as u32,
3871                num_visible_items as u32,
3872            ),
3873        };
3874
3875        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
3876
3877        let data = if let Some(repindex) = zipped.repetition_index {
3878            vec![zipped.values, repindex]
3879        } else {
3880            vec![zipped.values]
3881        };
3882
3883        Ok(EncodedPage {
3884            num_rows: num_lists,
3885            column_idx,
3886            data,
3887            description: PageEncoding::Structural(description),
3888            row_number,
3889        })
3890    }
3891
3892    fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) {
3893        let cardinality = data_block
3894            .get_stat(Stat::Cardinality)
3895            .unwrap()
3896            .as_primitive::<UInt64Type>()
3897            .value(0);
3898        match data_block {
3899            DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
3900                // Currently FixedWidth DataBlock with only bits_per_value 128 has cardinality
3901                // TODO: a follow up PR to support `FixedWidth DataBlock with bits_per_value == 256`.
3902                let mut map = HashMap::new();
3903                let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
3904                let u128_slice = u128_slice.as_ref();
3905                let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
3906                let mut indices_buffer =
3907                    Vec::with_capacity(fixed_width_data_block.num_values as usize);
3908                let mut curr_idx: i32 = 0;
3909                u128_slice.iter().for_each(|&value| {
3910                    let idx = *map.entry(value).or_insert_with(|| {
3911                        dictionary_buffer.push(value);
3912                        curr_idx += 1;
3913                        curr_idx - 1
3914                    });
3915                    indices_buffer.push(idx);
3916                });
3917                let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3918                    data: LanceBuffer::reinterpret_vec(dictionary_buffer),
3919                    bits_per_value: 128,
3920                    num_values: curr_idx as u64,
3921                    block_info: BlockInfo::default(),
3922                });
3923                let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3924                    data: LanceBuffer::reinterpret_vec(indices_buffer),
3925                    bits_per_value: 32,
3926                    num_values: fixed_width_data_block.num_values,
3927                    block_info: BlockInfo::default(),
3928                });
3929                // Todo: if we decide to do eager statistics computing, wrap statistics computing
3930                // in DataBlock constructor.
3931                indices_data_block.compute_stat();
3932
3933                (indices_data_block, dictionary_data_block)
3934            }
3935            DataBlock::VariableWidth(ref mut variable_width_data_block) => {
3936                match variable_width_data_block.bits_per_offset {
3937                    32 => {
3938                        let mut map = HashMap::new();
3939                        let offsets = variable_width_data_block
3940                            .offsets
3941                            .borrow_to_typed_slice::<u32>();
3942                        let offsets = offsets.as_ref();
3943
3944                        let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
3945                            "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
3946                        );
3947                        let max_len = max_len.as_primitive::<UInt64Type>().value(0);
3948
3949                        let mut dictionary_buffer: Vec<u8> =
3950                            Vec::with_capacity((max_len * cardinality) as usize);
3951                        let mut dictionary_offsets_buffer = vec![0];
3952                        let mut curr_idx = 0;
3953                        let mut indices_buffer =
3954                            Vec::with_capacity(variable_width_data_block.num_values as usize);
3955
3956                        offsets
3957                            .iter()
3958                            .zip(offsets.iter().skip(1))
3959                            .for_each(|(&start, &end)| {
3960                                let key =
3961                                    &variable_width_data_block.data[start as usize..end as usize];
3962                                let idx: i32 = *map.entry(U8SliceKey(key)).or_insert_with(|| {
3963                                    dictionary_buffer.extend_from_slice(key);
3964                                    dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
3965                                    curr_idx += 1;
3966                                    curr_idx - 1
3967                                });
3968                                indices_buffer.push(idx);
3969                            });
3970
3971                        let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
3972                            data: LanceBuffer::reinterpret_vec(dictionary_buffer),
3973                            offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
3974                            bits_per_offset: 32,
3975                            num_values: curr_idx as u64,
3976                            block_info: BlockInfo::default(),
3977                        });
3978
3979                        let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3980                            data: LanceBuffer::reinterpret_vec(indices_buffer),
3981                            bits_per_value: 32,
3982                            num_values: variable_width_data_block.num_values,
3983                            block_info: BlockInfo::default(),
3984                        });
3985                        // Todo: if we decide to do eager statistics computing, wrap statistics computing
3986                        // in DataBlock constructor.
3987                        indices_data_block.compute_stat();
3988
3989                        (indices_data_block, dictionary_data_block)
3990                    }
3991                    64 => {
3992                        todo!("A follow up PR to support dictionary encoding with dictionary type `VariableWidth DataBlock` with bits_per_offset 64");
3993                    }
3994                    _ => {
3995                        unreachable!()
3996                    }
3997                }
3998            }
3999            _ => {
4000                unreachable!("dictionary encode called with data block {:?}", data_block)
4001            }
4002        }
4003    }
4004
4005    fn should_dictionary_encode(data_block: &DataBlock) -> bool {
4006        // Don't dictionary encode tiny arrays
4007        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4008            .ok()
4009            .and_then(|val| val.parse().ok())
4010            .unwrap_or(100);
4011        if data_block.num_values() < too_small {
4012            return false;
4013        }
4014
4015        // Somewhat arbitrary threshold rule.  Apply dictionary encoding if the number of unique
4016        // values is less than 1/2 the total number of values.
4017        let divisor = env::var("LANCE_ENCODING_DICT_DIVISOR")
4018            .ok()
4019            .and_then(|val| val.parse().ok())
4020            .unwrap_or(2);
4021
4022        // Cap on cardinality.  This should be pushed into the cardinality estimation to avoid
4023        // spending too much time estimating cardinality.
4024        let max_cardinality = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
4025            .ok()
4026            .and_then(|val| val.parse().ok())
4027            .unwrap_or(100000);
4028
4029        let threshold = (data_block.num_values() / divisor).min(max_cardinality);
4030
4031        let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4032            cardinality_array.as_primitive::<UInt64Type>().value(0)
4033        } else {
4034            u64::MAX
4035        };
4036
4037        cardinality < threshold
4038    }
4039
4040    // Creates an encode task, consuming all buffered data
4041    fn do_flush(
4042        &mut self,
4043        arrays: Vec<ArrayRef>,
4044        repdefs: Vec<RepDefBuilder>,
4045        row_number: u64,
4046        num_rows: u64,
4047    ) -> Result<Vec<EncodeTask>> {
4048        let column_idx = self.column_index;
4049        let compression_strategy = self.compression_strategy.clone();
4050        let field = self.field.clone();
4051        let encoding_metadata = self.encoding_metadata.clone();
4052        let task = spawn_cpu(move || {
4053            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4054
4055            if num_values == 0 {
4056                // We should not encode empty arrays.  So if we get here that should mean that we
4057                // either have all empty lists or all null lists (or a mix).  We still need to encode
4058                // the rep/def information but we can skip the data encoding.
4059                return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4060            }
4061            let num_nulls = arrays
4062                .iter()
4063                .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4064                .sum::<u64>();
4065
4066            if num_values == num_nulls {
4067                return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4068                    log::debug!(
4069                        "Encoding column {} with {} items using simple-null layout",
4070                        column_idx,
4071                        num_values
4072                    );
4073                    // Simple case, no rep/def and all nulls, we don't need to encode any data
4074                    Self::encode_simple_all_null(column_idx, num_values, row_number)
4075                } else {
4076                    // If we get here then we have definition levels and we need to store those
4077                    Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4078                };
4079            }
4080
4081            let data_block = DataBlock::from_arrays(&arrays, num_values);
4082
4083            // if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding.
4084            if let DataBlock::Struct(ref struct_data_block) = data_block {
4085                if struct_data_block
4086                    .children
4087                    .iter()
4088                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4089                {
4090                    panic!("packed struct encoding currently only supports fixed-width fields.")
4091                }
4092            }
4093
4094            // The top-level validity is encoded in repdef so we can remove it.
4095            let data_block = data_block.remove_outer_validity();
4096
4097
4098            if Self::should_dictionary_encode(&data_block) {
4099                log::debug!(
4100                    "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4101                    column_idx,
4102                    num_values
4103                );
4104                let (indices_data_block, dictionary_data_block) =
4105                    Self::dictionary_encode(data_block);
4106                Self::encode_miniblock(
4107                    column_idx,
4108                    &field,
4109                    compression_strategy.as_ref(),
4110                    indices_data_block,
4111                    repdefs,
4112                    row_number,
4113                    Some(dictionary_data_block),
4114                    num_rows,
4115                )
4116            } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4117                log::debug!(
4118                    "Encoding column {} with {} items using mini-block layout",
4119                    column_idx,
4120                    num_values
4121                );
4122                Self::encode_miniblock(
4123                    column_idx,
4124                    &field,
4125                    compression_strategy.as_ref(),
4126                    data_block,
4127                    repdefs,
4128                    row_number,
4129                    None,
4130                    num_rows,
4131                )
4132            } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4133                log::debug!(
4134                    "Encoding column {} with {} items using full-zip layout",
4135                    column_idx,
4136                    num_values
4137                );
4138                Self::encode_full_zip(
4139                    column_idx,
4140                    &field,
4141                    compression_strategy.as_ref(),
4142                    data_block,
4143                    repdefs,
4144                    row_number,
4145                    num_rows,
4146                )
4147            } else {
4148                Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4149            }
4150        })
4151        .boxed();
4152        Ok(vec![task])
4153    }
4154
4155    fn extract_validity_buf(
4156        array: &dyn Array,
4157        repdef: &mut RepDefBuilder,
4158        keep_original_array: bool,
4159    ) {
4160        if let Some(validity) = array.nulls() {
4161            if keep_original_array {
4162                repdef.add_validity_bitmap(validity.clone());
4163            } else {
4164                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4165            }
4166        } else {
4167            repdef.add_no_null(array.len());
4168        }
4169    }
4170
4171    fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder, keep_original_array: bool) {
4172        match array.data_type() {
4173            DataType::Null => {
4174                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4175            }
4176            DataType::Dictionary(_, _) => {
4177                unreachable!()
4178            }
4179            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
4180            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
4181            // NOT want to increase the repdef depth.
4182            //
4183            // This would be quite catasrophic for something like vector embeddings.  Imagine we
4184            // had thousands of vectors and some were null but no vector contained null items.  If
4185            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
4186            // with a rep/def value for every single item in the vector.
4187            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4188        }
4189    }
4190}
4191
4192impl FieldEncoder for PrimitiveStructuralEncoder {
4193    // Buffers data, if there is enough to write a page then we create an encode task
4194    fn maybe_encode(
4195        &mut self,
4196        array: ArrayRef,
4197        _external_buffers: &mut OutOfLineBuffers,
4198        mut repdef: RepDefBuilder,
4199        row_number: u64,
4200        num_rows: u64,
4201    ) -> Result<Vec<EncodeTask>> {
4202        Self::extract_validity(array.as_ref(), &mut repdef, self.keep_original_array);
4203        self.accumulated_repdefs.push(repdef);
4204
4205        if let Some((arrays, row_number, num_rows)) =
4206            self.accumulation_queue.insert(array, row_number, num_rows)
4207        {
4208            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4209            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4210        } else {
4211            Ok(vec![])
4212        }
4213    }
4214
4215    // If there is any data left in the buffer then create an encode task from it
4216    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4217        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4218            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4219            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4220        } else {
4221            Ok(vec![])
4222        }
4223    }
4224
4225    fn num_columns(&self) -> u32 {
4226        1
4227    }
4228
4229    fn finish(
4230        &mut self,
4231        _external_buffers: &mut OutOfLineBuffers,
4232    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4233        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4234    }
4235}
4236
4237#[cfg(test)]
4238#[allow(clippy::single_range_in_vec_init)]
4239mod tests {
4240    use std::{collections::VecDeque, sync::Arc};
4241
4242    use crate::encodings::logical::primitive::{
4243        ChunkDrainInstructions, PrimitiveStructuralEncoder,
4244    };
4245    use arrow_array::{ArrayRef, Int8Array, StringArray};
4246
4247    use super::{
4248        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4249        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4250        FullZipScheduler, PerValueDecompressor, PreambleAction, RepetitionIndex,
4251        StructuralPageScheduler,
4252    };
4253
4254    #[test]
4255    fn test_is_narrow() {
4256        let int8_array = Int8Array::from(vec![1, 2, 3]);
4257        let array_ref: ArrayRef = Arc::new(int8_array);
4258        let block = DataBlock::from_array(array_ref);
4259
4260        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4261
4262        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4263        let block = DataBlock::from_array(string_array);
4264        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4265
4266        let string_array = StringArray::from(vec![
4267            Some("hello world".repeat(100)),
4268            Some("world".to_string()),
4269        ]);
4270        let block = DataBlock::from_array(string_array);
4271        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4272    }
4273
4274    #[test]
4275    fn test_map_range() {
4276        // Null in the middle
4277        // [[A, B, C], [D, E], NULL, [F, G, H]]
4278        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4279        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4280        let max_visible_def = 0;
4281        let total_items = 8;
4282        let max_rep = 1;
4283
4284        let check = |range, expected_item_range, expected_level_range| {
4285            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4286                range,
4287                rep.as_ref(),
4288                def.as_ref(),
4289                max_rep,
4290                max_visible_def,
4291                total_items,
4292                PreambleAction::Absent,
4293            );
4294            assert_eq!(item_range, expected_item_range);
4295            assert_eq!(level_range, expected_level_range);
4296        };
4297
4298        check(0..1, 0..3, 0..3);
4299        check(1..2, 3..5, 3..5);
4300        check(2..3, 5..5, 5..6);
4301        check(3..4, 5..8, 6..9);
4302        check(0..2, 0..5, 0..5);
4303        check(1..3, 3..5, 3..6);
4304        check(2..4, 5..8, 5..9);
4305        check(0..3, 0..5, 0..6);
4306        check(1..4, 3..8, 3..9);
4307        check(0..4, 0..8, 0..9);
4308
4309        // Null at start
4310        // [NULL, [A, B], [C]]
4311        let rep = Some(vec![1, 1, 0, 1]);
4312        let def = Some(vec![1, 0, 0, 0]);
4313        let max_visible_def = 0;
4314        let total_items = 3;
4315
4316        let check = |range, expected_item_range, expected_level_range| {
4317            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4318                range,
4319                rep.as_ref(),
4320                def.as_ref(),
4321                max_rep,
4322                max_visible_def,
4323                total_items,
4324                PreambleAction::Absent,
4325            );
4326            assert_eq!(item_range, expected_item_range);
4327            assert_eq!(level_range, expected_level_range);
4328        };
4329
4330        check(0..1, 0..0, 0..1);
4331        check(1..2, 0..2, 1..3);
4332        check(2..3, 2..3, 3..4);
4333        check(0..2, 0..2, 0..3);
4334        check(1..3, 0..3, 1..4);
4335        check(0..3, 0..3, 0..4);
4336
4337        // Null at end
4338        // [[A], [B, C], NULL]
4339        let rep = Some(vec![1, 1, 0, 1]);
4340        let def = Some(vec![0, 0, 0, 1]);
4341        let max_visible_def = 0;
4342        let total_items = 3;
4343
4344        let check = |range, expected_item_range, expected_level_range| {
4345            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4346                range,
4347                rep.as_ref(),
4348                def.as_ref(),
4349                max_rep,
4350                max_visible_def,
4351                total_items,
4352                PreambleAction::Absent,
4353            );
4354            assert_eq!(item_range, expected_item_range);
4355            assert_eq!(level_range, expected_level_range);
4356        };
4357
4358        check(0..1, 0..1, 0..1);
4359        check(1..2, 1..3, 1..3);
4360        check(2..3, 3..3, 3..4);
4361        check(0..2, 0..3, 0..3);
4362        check(1..3, 1..3, 1..4);
4363        check(0..3, 0..3, 0..4);
4364
4365        // No nulls, with repetition
4366        // [[A, B], [C, D], [E, F]]
4367        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4368        let def: Option<&[u16]> = None;
4369        let max_visible_def = 0;
4370        let total_items = 6;
4371
4372        let check = |range, expected_item_range, expected_level_range| {
4373            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4374                range,
4375                rep.as_ref(),
4376                def.as_ref(),
4377                max_rep,
4378                max_visible_def,
4379                total_items,
4380                PreambleAction::Absent,
4381            );
4382            assert_eq!(item_range, expected_item_range);
4383            assert_eq!(level_range, expected_level_range);
4384        };
4385
4386        check(0..1, 0..2, 0..2);
4387        check(1..2, 2..4, 2..4);
4388        check(2..3, 4..6, 4..6);
4389        check(0..2, 0..4, 0..4);
4390        check(1..3, 2..6, 2..6);
4391        check(0..3, 0..6, 0..6);
4392
4393        // No repetition, with nulls (this case is trivial)
4394        // [A, B, NULL, C]
4395        let rep: Option<&[u16]> = None;
4396        let def = Some(vec![0, 0, 1, 0]);
4397        let max_visible_def = 1;
4398        let total_items = 4;
4399
4400        let check = |range, expected_item_range, expected_level_range| {
4401            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4402                range,
4403                rep.as_ref(),
4404                def.as_ref(),
4405                max_rep,
4406                max_visible_def,
4407                total_items,
4408                PreambleAction::Absent,
4409            );
4410            assert_eq!(item_range, expected_item_range);
4411            assert_eq!(level_range, expected_level_range);
4412        };
4413
4414        check(0..1, 0..1, 0..1);
4415        check(1..2, 1..2, 1..2);
4416        check(2..3, 2..3, 2..3);
4417        check(0..2, 0..2, 0..2);
4418        check(1..3, 1..3, 1..3);
4419        check(0..3, 0..3, 0..3);
4420
4421        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
4422        // [[..., A] [B, C], NULL]
4423        //
4424        // What we do will depend on the preamble action
4425        let rep = Some(vec![0, 1, 0, 1]);
4426        let def = Some(vec![0, 0, 0, 1]);
4427        let max_visible_def = 0;
4428        let total_items = 3;
4429
4430        let check = |range, expected_item_range, expected_level_range| {
4431            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4432                range,
4433                rep.as_ref(),
4434                def.as_ref(),
4435                max_rep,
4436                max_visible_def,
4437                total_items,
4438                PreambleAction::Take,
4439            );
4440            assert_eq!(item_range, expected_item_range);
4441            assert_eq!(level_range, expected_level_range);
4442        };
4443
4444        // If we are taking the preamble then the range must start at 0
4445        check(0..1, 0..3, 0..3);
4446        check(0..2, 0..3, 0..4);
4447
4448        let check = |range, expected_item_range, expected_level_range| {
4449            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4450                range,
4451                rep.as_ref(),
4452                def.as_ref(),
4453                max_rep,
4454                max_visible_def,
4455                total_items,
4456                PreambleAction::Skip,
4457            );
4458            assert_eq!(item_range, expected_item_range);
4459            assert_eq!(level_range, expected_level_range);
4460        };
4461
4462        check(0..1, 1..3, 1..3);
4463        check(1..2, 3..3, 3..4);
4464        check(0..2, 1..3, 1..4);
4465
4466        // Another preamble case but now it doesn't end with a new list
4467        // [[..., A], NULL, [D, E]]
4468        //
4469        // What we do will depend on the preamble action
4470        let rep = Some(vec![0, 1, 1, 0]);
4471        let def = Some(vec![0, 1, 0, 0]);
4472        let max_visible_def = 0;
4473        let total_items = 4;
4474
4475        let check = |range, expected_item_range, expected_level_range| {
4476            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4477                range,
4478                rep.as_ref(),
4479                def.as_ref(),
4480                max_rep,
4481                max_visible_def,
4482                total_items,
4483                PreambleAction::Take,
4484            );
4485            assert_eq!(item_range, expected_item_range);
4486            assert_eq!(level_range, expected_level_range);
4487        };
4488
4489        // If we are taking the preamble then the range must start at 0
4490        check(0..1, 0..1, 0..2);
4491        check(0..2, 0..3, 0..4);
4492
4493        let check = |range, expected_item_range, expected_level_range| {
4494            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4495                range,
4496                rep.as_ref(),
4497                def.as_ref(),
4498                max_rep,
4499                max_visible_def,
4500                total_items,
4501                PreambleAction::Skip,
4502            );
4503            assert_eq!(item_range, expected_item_range);
4504            assert_eq!(level_range, expected_level_range);
4505        };
4506
4507        // If we are taking the preamble then the range must start at 0
4508        check(0..1, 1..1, 1..2);
4509        check(1..2, 1..3, 2..4);
4510        check(0..2, 1..3, 1..4);
4511
4512        // Now a preamble case without any definition levels
4513        // [[..., A] [B, C], [D]]
4514        let rep = Some(vec![0, 1, 0, 1]);
4515        let def: Option<Vec<u16>> = None;
4516        let max_visible_def = 0;
4517        let total_items = 4;
4518
4519        let check = |range, expected_item_range, expected_level_range| {
4520            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4521                range,
4522                rep.as_ref(),
4523                def.as_ref(),
4524                max_rep,
4525                max_visible_def,
4526                total_items,
4527                PreambleAction::Take,
4528            );
4529            assert_eq!(item_range, expected_item_range);
4530            assert_eq!(level_range, expected_level_range);
4531        };
4532
4533        // If we are taking the preamble then the range must start at 0
4534        check(0..1, 0..3, 0..3);
4535        check(0..2, 0..4, 0..4);
4536
4537        let check = |range, expected_item_range, expected_level_range| {
4538            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4539                range,
4540                rep.as_ref(),
4541                def.as_ref(),
4542                max_rep,
4543                max_visible_def,
4544                total_items,
4545                PreambleAction::Skip,
4546            );
4547            assert_eq!(item_range, expected_item_range);
4548            assert_eq!(level_range, expected_level_range);
4549        };
4550
4551        check(0..1, 1..3, 1..3);
4552        check(1..2, 3..4, 3..4);
4553        check(0..2, 1..4, 1..4);
4554    }
4555
4556    #[test]
4557    fn test_schedule_instructions() {
4558        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4559        let repetition_index = RepetitionIndex::decode(&repetition_index);
4560
4561        let check = |user_ranges, expected_instructions| {
4562            let instructions =
4563                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4564            assert_eq!(instructions, expected_instructions);
4565        };
4566
4567        // The instructions we expect if we're grabbing the whole range
4568        let expected_take_all = vec![
4569            ChunkInstructions {
4570                chunk_idx: 0,
4571                preamble: PreambleAction::Absent,
4572                rows_to_skip: 0,
4573                rows_to_take: 5,
4574                take_trailer: true,
4575            },
4576            ChunkInstructions {
4577                chunk_idx: 1,
4578                preamble: PreambleAction::Take,
4579                rows_to_skip: 0,
4580                rows_to_take: 2,
4581                take_trailer: false,
4582            },
4583            ChunkInstructions {
4584                chunk_idx: 2,
4585                preamble: PreambleAction::Absent,
4586                rows_to_skip: 0,
4587                rows_to_take: 4,
4588                take_trailer: true,
4589            },
4590            ChunkInstructions {
4591                chunk_idx: 3,
4592                preamble: PreambleAction::Take,
4593                rows_to_skip: 0,
4594                rows_to_take: 1,
4595                take_trailer: false,
4596            },
4597        ];
4598
4599        // Take all as 1 range
4600        check(&[0..14], expected_take_all.clone());
4601
4602        // Take all a individual rows
4603        check(
4604            &[
4605                0..1,
4606                1..2,
4607                2..3,
4608                3..4,
4609                4..5,
4610                5..6,
4611                6..7,
4612                7..8,
4613                8..9,
4614                9..10,
4615                10..11,
4616                11..12,
4617                12..13,
4618                13..14,
4619            ],
4620            expected_take_all,
4621        );
4622
4623        // Test some partial takes
4624
4625        // 2 rows in the same chunk but not contiguous
4626        check(
4627            &[0..1, 3..4],
4628            vec![
4629                ChunkInstructions {
4630                    chunk_idx: 0,
4631                    preamble: PreambleAction::Absent,
4632                    rows_to_skip: 0,
4633                    rows_to_take: 1,
4634                    take_trailer: false,
4635                },
4636                ChunkInstructions {
4637                    chunk_idx: 0,
4638                    preamble: PreambleAction::Absent,
4639                    rows_to_skip: 3,
4640                    rows_to_take: 1,
4641                    take_trailer: false,
4642                },
4643            ],
4644        );
4645
4646        // Taking just a trailer/preamble
4647        check(
4648            &[5..6],
4649            vec![
4650                ChunkInstructions {
4651                    chunk_idx: 0,
4652                    preamble: PreambleAction::Absent,
4653                    rows_to_skip: 5,
4654                    rows_to_take: 0,
4655                    take_trailer: true,
4656                },
4657                ChunkInstructions {
4658                    chunk_idx: 1,
4659                    preamble: PreambleAction::Take,
4660                    rows_to_skip: 0,
4661                    rows_to_take: 0,
4662                    take_trailer: false,
4663                },
4664            ],
4665        );
4666
4667        // Skipping an entire chunk
4668        check(
4669            &[7..10],
4670            vec![
4671                ChunkInstructions {
4672                    chunk_idx: 1,
4673                    preamble: PreambleAction::Skip,
4674                    rows_to_skip: 1,
4675                    rows_to_take: 1,
4676                    take_trailer: false,
4677                },
4678                ChunkInstructions {
4679                    chunk_idx: 2,
4680                    preamble: PreambleAction::Absent,
4681                    rows_to_skip: 0,
4682                    rows_to_take: 2,
4683                    take_trailer: false,
4684                },
4685            ],
4686        );
4687    }
4688
4689    #[test]
4690    fn test_drain_instructions() {
4691        fn drain_from_instructions(
4692            instructions: &mut VecDeque<ChunkInstructions>,
4693            mut rows_desired: u64,
4694            need_preamble: &mut bool,
4695            skip_in_chunk: &mut u64,
4696        ) -> Vec<ChunkDrainInstructions> {
4697            // Note: instructions.len() is an upper bound, we typically take much fewer
4698            let mut drain_instructions = Vec::with_capacity(instructions.len());
4699            while rows_desired > 0 || *need_preamble {
4700                let (next_instructions, consumed_chunk) = instructions
4701                    .front()
4702                    .unwrap()
4703                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
4704                if consumed_chunk {
4705                    instructions.pop_front();
4706                }
4707                drain_instructions.push(next_instructions);
4708            }
4709            drain_instructions
4710        }
4711
4712        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4713        let repetition_index = RepetitionIndex::decode(&repetition_index);
4714        let user_ranges = vec![1..7, 10..14];
4715
4716        // First, schedule the ranges
4717        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4718
4719        let mut to_drain = VecDeque::from(scheduled.clone());
4720
4721        // Now we drain in batches of 4
4722
4723        let mut need_preamble = false;
4724        let mut skip_in_chunk = 0;
4725
4726        let next_batch =
4727            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4728
4729        assert!(!need_preamble);
4730        assert_eq!(skip_in_chunk, 4);
4731        assert_eq!(
4732            next_batch,
4733            vec![ChunkDrainInstructions {
4734                chunk_instructions: scheduled[0].clone(),
4735                rows_to_take: 4,
4736                rows_to_skip: 0,
4737                preamble_action: PreambleAction::Absent,
4738            }]
4739        );
4740
4741        let next_batch =
4742            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4743
4744        assert!(!need_preamble);
4745        assert_eq!(skip_in_chunk, 2);
4746
4747        assert_eq!(
4748            next_batch,
4749            vec![
4750                ChunkDrainInstructions {
4751                    chunk_instructions: scheduled[0].clone(),
4752                    rows_to_take: 1,
4753                    rows_to_skip: 4,
4754                    preamble_action: PreambleAction::Absent,
4755                },
4756                ChunkDrainInstructions {
4757                    chunk_instructions: scheduled[1].clone(),
4758                    rows_to_take: 1,
4759                    rows_to_skip: 0,
4760                    preamble_action: PreambleAction::Take,
4761                },
4762                ChunkDrainInstructions {
4763                    chunk_instructions: scheduled[2].clone(),
4764                    rows_to_take: 2,
4765                    rows_to_skip: 0,
4766                    preamble_action: PreambleAction::Absent,
4767                }
4768            ]
4769        );
4770
4771        let next_batch =
4772            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4773
4774        assert!(!need_preamble);
4775        assert_eq!(skip_in_chunk, 0);
4776
4777        assert_eq!(
4778            next_batch,
4779            vec![
4780                ChunkDrainInstructions {
4781                    chunk_instructions: scheduled[2].clone(),
4782                    rows_to_take: 1,
4783                    rows_to_skip: 2,
4784                    preamble_action: PreambleAction::Absent,
4785                },
4786                ChunkDrainInstructions {
4787                    chunk_instructions: scheduled[3].clone(),
4788                    rows_to_take: 1,
4789                    rows_to_skip: 0,
4790                    preamble_action: PreambleAction::Take,
4791                },
4792            ]
4793        );
4794
4795        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
4796        let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
4797        let repetition_index = RepetitionIndex::decode(&repetition_index);
4798        let user_ranges = vec![0..28];
4799
4800        // First, schedule the ranges
4801        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4802
4803        let mut to_drain = VecDeque::from(scheduled.clone());
4804
4805        // Drain first chunk and some of second chunk
4806
4807        let mut need_preamble = false;
4808        let mut skip_in_chunk = 0;
4809
4810        let next_batch =
4811            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
4812
4813        assert_eq!(
4814            next_batch,
4815            vec![
4816                ChunkDrainInstructions {
4817                    chunk_instructions: scheduled[0].clone(),
4818                    rows_to_take: 6,
4819                    rows_to_skip: 0,
4820                    preamble_action: PreambleAction::Absent,
4821                },
4822                ChunkDrainInstructions {
4823                    chunk_instructions: scheduled[1].clone(),
4824                    rows_to_take: 1,
4825                    rows_to_skip: 0,
4826                    preamble_action: PreambleAction::Take,
4827                },
4828            ]
4829        );
4830
4831        assert!(!need_preamble);
4832        assert_eq!(skip_in_chunk, 1);
4833
4834        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
4835        // we get a drain task to take the preamble of the third chunk (and nothing else)
4836        let next_batch =
4837            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4838
4839        assert_eq!(
4840            next_batch,
4841            vec![
4842                ChunkDrainInstructions {
4843                    chunk_instructions: scheduled[1].clone(),
4844                    rows_to_take: 2,
4845                    rows_to_skip: 1,
4846                    preamble_action: PreambleAction::Skip,
4847                },
4848                ChunkDrainInstructions {
4849                    chunk_instructions: scheduled[2].clone(),
4850                    rows_to_take: 0,
4851                    rows_to_skip: 0,
4852                    preamble_action: PreambleAction::Take,
4853                },
4854            ]
4855        );
4856
4857        assert!(!need_preamble);
4858        assert_eq!(skip_in_chunk, 0);
4859    }
4860
4861    #[tokio::test]
4862    async fn test_fullzip_repetition_index_caching() {
4863        use crate::testing::SimulatedScheduler;
4864        use lance_core::cache::LanceCache;
4865
4866        // Simplified FixedPerValueDecompressor for testing
4867        #[derive(Debug)]
4868        struct TestFixedDecompressor;
4869
4870        impl FixedPerValueDecompressor for TestFixedDecompressor {
4871            fn decompress(
4872                &self,
4873                _data: FixedWidthDataBlock,
4874                _num_rows: u64,
4875            ) -> crate::Result<DataBlock> {
4876                unimplemented!("Test decompressor")
4877            }
4878
4879            fn bits_per_value(&self) -> u64 {
4880                32
4881            }
4882        }
4883
4884        // Create test repetition index data
4885        let rows_in_page = 100u64;
4886        let bytes_per_value = 4u64;
4887        let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
4888
4889        // Create mock repetition index data
4890        let mut rep_index_data = Vec::new();
4891        for i in 0..=rows_in_page {
4892            let offset = (i * 100) as u32; // Each row starts at i * 100 bytes
4893            rep_index_data.extend_from_slice(&offset.to_le_bytes());
4894        }
4895
4896        // Simulate storage with the repetition index at position 1000
4897        let mut full_data = vec![0u8; 1000];
4898        full_data.extend_from_slice(&rep_index_data);
4899        full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after
4900
4901        let data = bytes::Bytes::from(full_data);
4902        let io = Arc::new(SimulatedScheduler::new(data));
4903        let _cache = Arc::new(LanceCache::with_capacity(1024 * 1024));
4904
4905        // Create FullZipScheduler with repetition index
4906        let mut scheduler = FullZipScheduler {
4907            data_buf_position: 0,
4908            rep_index: Some(FullZipRepIndexDetails {
4909                buf_position: 1000,
4910                bytes_per_value,
4911            }),
4912            priority: 0,
4913            rows_in_page,
4914            bits_per_offset: 32,
4915            details: Arc::new(FullZipDecodeDetails {
4916                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
4917                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
4918                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
4919                max_rep: 0,
4920                max_visible_def: 0,
4921            }),
4922            cached_state: None,
4923        };
4924
4925        // First initialization should load and cache the repetition index
4926        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
4927        let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
4928
4929        // Verify that we got a FullZipCacheableState (not NoCachedPageData)
4930        let is_cached = cached_data1
4931            .clone()
4932            .as_arc_any()
4933            .downcast::<FullZipCacheableState>()
4934            .is_ok();
4935        assert!(
4936            is_cached,
4937            "Expected FullZipCacheableState, got NoCachedPageData"
4938        );
4939
4940        // Load the cached data into the scheduler
4941        scheduler.load(&cached_data1);
4942
4943        // Verify that cached_state is now populated
4944        assert!(
4945            scheduler.cached_state.is_some(),
4946            "cached_state should be populated after load"
4947        );
4948
4949        // Verify the cached data contains the repetition index
4950        let cached_state = scheduler.cached_state.as_ref().unwrap();
4951
4952        // Test that schedule_ranges_rep uses the cached data
4953        let ranges = vec![0..10, 20..30];
4954        let result = scheduler.schedule_ranges_rep(
4955            &ranges,
4956            &io_dyn,
4957            &FullZipRepIndexDetails {
4958                buf_position: 1000,
4959                bytes_per_value,
4960            },
4961        );
4962
4963        // The result should be OK (not an error)
4964        assert!(
4965            result.is_ok(),
4966            "schedule_ranges_rep should succeed with cached data"
4967        );
4968
4969        // Second scheduler instance should be able to use the cached data
4970        let mut scheduler2 = FullZipScheduler {
4971            data_buf_position: 0,
4972            rep_index: Some(FullZipRepIndexDetails {
4973                buf_position: 1000,
4974                bytes_per_value,
4975            }),
4976            priority: 0,
4977            rows_in_page,
4978            bits_per_offset: 32,
4979            details: scheduler.details.clone(),
4980            cached_state: None,
4981        };
4982
4983        // Load cached data from the first scheduler
4984        scheduler2.load(&cached_data1);
4985        assert!(
4986            scheduler2.cached_state.is_some(),
4987            "Second scheduler should have cached_state after load"
4988        );
4989
4990        // Verify that both schedulers have the same cached data
4991        let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
4992        assert!(
4993            Arc::ptr_eq(cached_state, cached_state2),
4994            "Both schedulers should share the same cached data"
4995        );
4996    }
4997}