lance_encoding/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    collections::{HashMap, VecDeque},
7    env,
8    fmt::Debug,
9    iter,
10    ops::Range,
11    sync::Arc,
12    vec,
13};
14
15use arrow::array::AsArray;
16use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
17use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
18use arrow_schema::{DataType, Field as ArrowField};
19use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
20use itertools::Itertools;
21use lance_arrow::deepcopy::deep_copy_nulls;
22use lance_core::{
23    cache::{CacheKey, Context, DeepSizeOf},
24    datatypes::{
25        DICT_DIVISOR_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
26        STRUCTURAL_ENCODING_MINIBLOCK,
27    },
28    error::Error,
29    utils::{bit::pad_bytes, hash::U8SliceKey},
30};
31use log::trace;
32use snafu::location;
33
34use crate::{
35    compression::{
36        BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
37    },
38    data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
39    utils::bytepack::BytepackedIntegerEncoder,
40};
41use crate::{
42    compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
43    encodings::logical::primitive::fullzip::PerValueDataBlock,
44};
45use crate::{
46    encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
47};
48use crate::{
49    encodings::logical::primitive::miniblock::MiniBlockCompressed,
50    statistics::{ComputeStat, GetStat, Stat},
51};
52use crate::{
53    repdef::{
54        build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
55        ControlWordParser, DefinitionInterpretation, RepDefSlicer,
56    },
57    utils::accumulation::AccumulationQueue,
58};
59use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
60
61use crate::{
62    buffer::LanceBuffer,
63    data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
64    decoder::{
65        ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPage,
66        MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
67        StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
68        StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
69    },
70    encoder::{
71        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
72    },
73    format::{pb, ProtobufUtils},
74    repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
75    EncodingsIo,
76};
77
78pub mod fullzip;
79pub mod miniblock;
80
81const FILL_BYTE: u8 = 0xFE;
82
83/// A trait for figuring out how to schedule the data within
84/// a single page.
85trait StructuralPageScheduler: std::fmt::Debug + Send {
86    /// Fetches any metadata required for the page
87    fn initialize<'a>(
88        &'a mut self,
89        io: &Arc<dyn EncodingsIo>,
90    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
91    /// Loads metadata from a previous initialize call
92    fn load(&mut self, data: &Arc<dyn CachedPageData>);
93    /// Schedules the read of the given ranges in the page
94    fn schedule_ranges(
95        &self,
96        ranges: &[Range<u64>],
97        io: &Arc<dyn EncodingsIo>,
98    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
99}
100
101/// Metadata describing the decoded size of a mini-block
102#[derive(Debug)]
103struct ChunkMeta {
104    num_values: u64,
105    chunk_size_bytes: u64,
106    offset_bytes: u64,
107}
108
109/// A mini-block chunk that has been decoded and decompressed
110#[derive(Debug)]
111struct DecodedMiniBlockChunk {
112    rep: Option<ScalarBuffer<u16>>,
113    def: Option<ScalarBuffer<u16>>,
114    values: DataBlock,
115}
116
117/// A task to decode a one or more mini-blocks of data into an output batch
118///
119/// Note: Two batches might share the same mini-block of data.  When this happens
120/// then each batch gets a copy of the block and each batch decodes the block independently.
121///
122/// This means we have duplicated work but it is necessary to avoid having to synchronize
123/// the decoding of the block. (TODO: test this theory)
124#[derive(Debug)]
125struct DecodeMiniBlockTask {
126    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
127    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
128    value_decompressor: Arc<dyn MiniBlockDecompressor>,
129    dictionary_data: Option<Arc<DataBlock>>,
130    def_meaning: Arc<[DefinitionInterpretation]>,
131    num_buffers: u64,
132    max_visible_level: u16,
133    instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
134}
135
136impl DecodeMiniBlockTask {
137    fn decode_levels(
138        rep_decompressor: &dyn BlockDecompressor,
139        levels: LanceBuffer,
140        num_levels: u16,
141    ) -> Result<ScalarBuffer<u16>> {
142        let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
143        let mut rep = rep.as_fixed_width().unwrap();
144        debug_assert_eq!(rep.num_values, num_levels as u64);
145        debug_assert_eq!(rep.bits_per_value, 16);
146        Ok(rep.data.borrow_to_typed_slice::<u16>())
147    }
148
149    // We are building a LevelBuffer (levels) and want to copy into it `total_len`
150    // values from `level_buf` starting at `offset`.
151    //
152    // We need to handle both the case where `levels` is None (no nulls encountered
153    // yet) and the case where `level_buf` is None (the input we are copying from has
154    // no nulls)
155    fn extend_levels(
156        range: Range<u64>,
157        levels: &mut Option<LevelBuffer>,
158        level_buf: &Option<impl AsRef<[u16]>>,
159        dest_offset: usize,
160    ) {
161        if let Some(level_buf) = level_buf {
162            if levels.is_none() {
163                // This is the first non-empty def buf we've hit, fill in the past
164                // with 0 (valid)
165                let mut new_levels_vec =
166                    LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
167                new_levels_vec.extend(iter::repeat_n(0, dest_offset));
168                *levels = Some(new_levels_vec);
169            }
170            levels.as_mut().unwrap().extend(
171                level_buf.as_ref()[range.start as usize..range.end as usize]
172                    .iter()
173                    .copied(),
174            );
175        } else if let Some(levels) = levels {
176            let num_values = (range.end - range.start) as usize;
177            // This is an all-valid level_buf but we had nulls earlier and so we
178            // need to materialize it
179            levels.extend(iter::repeat_n(0, num_values));
180        }
181    }
182
183    /// Maps a range of rows to a range of items and a range of levels
184    ///
185    /// If there is no repetition information this just returns the range as-is.
186    ///
187    /// If there is repetition information then we need to do some work to figure out what
188    /// range of items corresponds to the requested range of rows.
189    ///
190    /// For example, if the data is [[1, 2, 3], [4, 5], [6, 7]] and the range is 1..2 (i.e. just row
191    /// 1) then the user actually wants items 3..5.  In the above case the rep levels would be:
192    ///
193    /// Idx: 0 1 2 3 4 5 6
194    /// Rep: 1 0 0 1 0 1 0
195    ///
196    /// So the start (1) maps to the second 1 (idx=3) and the end (2) maps to the third 1 (idx=5)
197    ///
198    /// If there are invisible items then we don't count them when calculating the range of items we
199    /// are interested in but we do count them when calculating the range of levels we are interested
200    /// in.  As a result we have to return both the item range (first return value) and the level range
201    /// (second return value).
202    ///
203    /// For example, if the data is [[1, 2, 3], [4, 5], NULL, [6, 7, 8]] and the range is 2..4 then the
204    /// user wants items 5..8 but they want levels 5..9.  In the above case the rep/def levels would be:
205    ///
206    /// Idx: 0 1 2 3 4 5 6 7 8
207    /// Rep: 1 0 0 1 0 1 1 0 0
208    /// Def: 0 0 0 0 0 1 0 0 0
209    /// Itm: 1 2 3 4 5 6 7 8
210    ///
211    /// Finally, we have to contend with the fact that chunks may or may not start with a "preamble" of
212    /// trailing values that finish up a list from the previous chunk.  In this case the first item does
213    /// not start at max_rep because it is a continuation of the previous chunk.  For our purposes we do
214    /// not consider this a "row" and so the range 0..1 will refer to the first row AFTER the preamble.
215    ///
216    /// We have a separate parameter (`preamble_action`) to control whether we want the preamble or not.
217    ///
218    /// Note that the "trailer" is considered a "row" and if we want it we should include it in the range.
219    fn map_range(
220        range: Range<u64>,
221        rep: Option<&impl AsRef<[u16]>>,
222        def: Option<&impl AsRef<[u16]>>,
223        max_rep: u16,
224        max_visible_def: u16,
225        // The total number of items (not rows) in the chunk.  This is not quite the same as
226        // rep.len() / def.len() because it doesn't count invisible items
227        total_items: u64,
228        preamble_action: PreambleAction,
229    ) -> (Range<u64>, Range<u64>) {
230        if let Some(rep) = rep {
231            let mut rep = rep.as_ref();
232            // If there is a preamble and we need to skip it then do that first.  The work is the same
233            // whether there is def information or not
234            let mut items_in_preamble = 0;
235            let first_row_start = match preamble_action {
236                PreambleAction::Skip | PreambleAction::Take => {
237                    let first_row_start = if let Some(def) = def.as_ref() {
238                        let mut first_row_start = None;
239                        for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
240                            if *rep == max_rep {
241                                first_row_start = Some(idx);
242                                break;
243                            }
244                            if *def <= max_visible_def {
245                                items_in_preamble += 1;
246                            }
247                        }
248                        first_row_start
249                    } else {
250                        let first_row_start = rep.iter().position(|&r| r == max_rep);
251                        items_in_preamble = first_row_start.unwrap_or(rep.len());
252                        first_row_start
253                    };
254                    // It is possible for a chunk to be entirely partial values but if it is then it
255                    // should never show up as a preamble to skip
256                    if first_row_start.is_none() {
257                        assert!(preamble_action == PreambleAction::Take);
258                        return (0..total_items, 0..rep.len() as u64);
259                    }
260                    let first_row_start = first_row_start.unwrap() as u64;
261                    rep = &rep[first_row_start as usize..];
262                    first_row_start
263                }
264                PreambleAction::Absent => {
265                    debug_assert!(rep[0] == max_rep);
266                    0
267                }
268            };
269
270            // We hit this case when all we needed was the preamble
271            if range.start == range.end {
272                debug_assert!(preamble_action == PreambleAction::Take);
273                return (0..items_in_preamble as u64, 0..first_row_start);
274            }
275            assert!(range.start < range.end);
276
277            let mut rows_seen = 0;
278            let mut new_start = 0;
279            let mut new_levels_start = 0;
280
281            if let Some(def) = def {
282                let def = &def.as_ref()[first_row_start as usize..];
283
284                // range.start == 0 always maps to 0 (even with invis items), otherwise we need to walk
285                let mut lead_invis_seen = 0;
286
287                if range.start > 0 {
288                    if def[0] > max_visible_def {
289                        lead_invis_seen += 1;
290                    }
291                    for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
292                        if *rep == max_rep {
293                            rows_seen += 1;
294                            if rows_seen == range.start {
295                                new_start = idx as u64 + 1 - lead_invis_seen;
296                                new_levels_start = idx as u64 + 1;
297                                break;
298                            }
299                            if *def > max_visible_def {
300                                lead_invis_seen += 1;
301                            }
302                        }
303                    }
304                }
305
306                rows_seen += 1;
307
308                let mut new_end = u64::MAX;
309                let mut new_levels_end = rep.len() as u64;
310                let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
311                let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
312                for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
313                    .iter()
314                    .zip(&def[(new_levels_start + 1) as usize..])
315                    .enumerate()
316                {
317                    if *rep == max_rep {
318                        rows_seen += 1;
319                        if rows_seen == range.end + 1 {
320                            new_end = idx as u64 + new_start + 1 - tail_invis_seen;
321                            new_levels_end = idx as u64 + new_levels_start + 1;
322                            break;
323                        }
324                        if *def > max_visible_def {
325                            tail_invis_seen += 1;
326                        }
327                    }
328                }
329
330                if new_end == u64::MAX {
331                    new_levels_end = rep.len() as u64;
332                    // This is the total number of visible items (minus any items in the preamble)
333                    let total_invis_seen = lead_invis_seen + tail_invis_seen;
334                    new_end = rep.len() as u64 - total_invis_seen;
335                }
336
337                assert_ne!(new_end, u64::MAX);
338
339                // Adjust for any skipped preamble
340                if preamble_action == PreambleAction::Skip {
341                    // TODO: Should this be items_in_preamble?  If so, add a
342                    // unit test for this case
343                    new_start += first_row_start;
344                    new_end += first_row_start;
345                    new_levels_start += first_row_start;
346                    new_levels_end += first_row_start;
347                } else if preamble_action == PreambleAction::Take {
348                    debug_assert_eq!(new_start, 0);
349                    debug_assert_eq!(new_levels_start, 0);
350                    new_end += first_row_start;
351                    new_levels_end += first_row_start;
352                }
353
354                (new_start..new_end, new_levels_start..new_levels_end)
355            } else {
356                // Easy case, there are no invisible items, so we don't need to check for them
357                // The items range and levels range will be the same.  We do still need to walk
358                // the rep levels to find the row boundaries
359
360                // range.start == 0 always maps to 0, otherwise we need to walk
361                if range.start > 0 {
362                    for (idx, rep) in rep.iter().skip(1).enumerate() {
363                        if *rep == max_rep {
364                            rows_seen += 1;
365                            if rows_seen == range.start {
366                                new_start = idx as u64 + 1;
367                                break;
368                            }
369                        }
370                    }
371                }
372                let mut new_end = rep.len() as u64;
373                // range.end == max_items always maps to rep.len(), otherwise we need to walk
374                if range.end < total_items {
375                    for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
376                        if *rep == max_rep {
377                            rows_seen += 1;
378                            if rows_seen == range.end {
379                                new_end = idx as u64 + new_start + 1;
380                                break;
381                            }
382                        }
383                    }
384                }
385
386                // Adjust for any skipped preamble
387                if preamble_action == PreambleAction::Skip {
388                    new_start += first_row_start;
389                    new_end += first_row_start;
390                } else if preamble_action == PreambleAction::Take {
391                    debug_assert_eq!(new_start, 0);
392                    new_end += first_row_start;
393                }
394
395                (new_start..new_end, new_start..new_end)
396            }
397        } else {
398            // No repetition info, easy case, just use the range as-is and the item
399            // and level ranges are the same
400            (range.clone(), range)
401        }
402    }
403
404    // Unserialize a miniblock into a collection of vectors
405    fn decode_miniblock_chunk(
406        &self,
407        buf: &LanceBuffer,
408        items_in_chunk: u64,
409    ) -> Result<DecodedMiniBlockChunk> {
410        let mut offset = 0;
411        let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
412        offset += 2;
413
414        let rep_size = if self.rep_decompressor.is_some() {
415            let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
416            offset += 2;
417            Some(rep_size)
418        } else {
419            None
420        };
421        let def_size = if self.def_decompressor.is_some() {
422            let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
423            offset += 2;
424            Some(def_size)
425        } else {
426            None
427        };
428        let buffer_sizes = (0..self.num_buffers)
429            .map(|_| {
430                let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
431                offset += 2;
432                size
433            })
434            .collect::<Vec<_>>();
435
436        offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
437
438        let rep = rep_size.map(|rep_size| {
439            let rep = buf.slice_with_length(offset, rep_size as usize);
440            offset += rep_size as usize;
441            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
442            rep
443        });
444
445        let def = def_size.map(|def_size| {
446            let def = buf.slice_with_length(offset, def_size as usize);
447            offset += def_size as usize;
448            offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
449            def
450        });
451
452        let buffers = buffer_sizes
453            .into_iter()
454            .map(|buf_size| {
455                let buf = buf.slice_with_length(offset, buf_size as usize);
456                offset += buf_size as usize;
457                offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
458                buf
459            })
460            .collect::<Vec<_>>();
461
462        let values = self
463            .value_decompressor
464            .decompress(buffers, items_in_chunk)?;
465
466        let rep = rep
467            .map(|rep| {
468                Self::decode_levels(
469                    self.rep_decompressor.as_ref().unwrap().as_ref(),
470                    rep,
471                    num_levels,
472                )
473            })
474            .transpose()?;
475        let def = def
476            .map(|def| {
477                Self::decode_levels(
478                    self.def_decompressor.as_ref().unwrap().as_ref(),
479                    def,
480                    num_levels,
481                )
482            })
483            .transpose()?;
484
485        Ok(DecodedMiniBlockChunk { rep, def, values })
486    }
487}
488
489impl DecodePageTask for DecodeMiniBlockTask {
490    fn decode(self: Box<Self>) -> Result<DecodedPage> {
491        // First, we create output buffers for the rep and def and data
492        let mut repbuf: Option<LevelBuffer> = None;
493        let mut defbuf: Option<LevelBuffer> = None;
494
495        let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
496
497        // This is probably an over-estimate but it's quick and easy to calculate
498        let estimated_size_bytes = self
499            .instructions
500            .iter()
501            .map(|(_, chunk)| chunk.data.len())
502            .sum::<usize>()
503            * 2;
504        let mut data_builder =
505            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
506
507        // We need to keep track of the offset into repbuf/defbuf that we are building up
508        let mut level_offset = 0;
509        // Now we iterate through each instruction and process it
510        for (instructions, chunk) in self.instructions.iter() {
511            // TODO: It's very possible that we have duplicate `buf` in self.instructions and we
512            // don't want to decode the buf again and again on the same thread.
513
514            let DecodedMiniBlockChunk { rep, def, values } =
515                self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
516
517            // Our instructions tell us which rows we want to take from this chunk
518            let row_range_start =
519                instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
520            let row_range_end = row_range_start + instructions.rows_to_take;
521
522            // We use the rep info to map the row range to an item range / levels range
523            let (item_range, level_range) = Self::map_range(
524                row_range_start..row_range_end,
525                rep.as_ref(),
526                def.as_ref(),
527                max_rep,
528                self.max_visible_level,
529                chunk.items_in_chunk,
530                instructions.preamble_action,
531            );
532
533            // Now we append the data to the output buffers
534            Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
535            Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
536            level_offset += (level_range.end - level_range.start) as usize;
537            data_builder.append(&values, item_range);
538        }
539
540        let data = data_builder.finish();
541
542        let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
543
544        // if dictionary encoding is applied, do dictionary decode here.
545        if let Some(dictionary) = &self.dictionary_data {
546            // assume the indices are uniformly distributed.
547            let estimated_size_bytes = dictionary.data_size()
548                * (data.num_values() + dictionary.num_values() - 1)
549                / dictionary.num_values();
550            let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
551
552            // if dictionary encoding is applied, decode indices based on their actual bit width
553            if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
554                match fixed_width_data_block.bits_per_value {
555                    32 => {
556                        let indices = fixed_width_data_block.data.borrow_to_typed_slice::<i32>();
557                        let indices = indices.as_ref();
558
559                        indices.iter().for_each(|&idx| {
560                            data_builder.append(dictionary, idx as u64..idx as u64 + 1);
561                        });
562                    }
563                    64 => {
564                        let indices = fixed_width_data_block.data.borrow_to_typed_slice::<i64>();
565                        let indices = indices.as_ref();
566
567                        indices.iter().for_each(|&idx| {
568                            data_builder.append(dictionary, idx as u64..idx as u64 + 1);
569                        });
570                    }
571                    _ => {
572                        return Err(lance_core::Error::Internal {
573                            message: format!(
574                                "Unsupported dictionary index bit width: {} bits",
575                                fixed_width_data_block.bits_per_value
576                            ),
577                            location: location!(),
578                        });
579                    }
580                }
581
582                let data = data_builder.finish();
583                return Ok(DecodedPage {
584                    data,
585                    repdef: unraveler,
586                });
587            }
588        }
589
590        Ok(DecodedPage {
591            data,
592            repdef: unraveler,
593        })
594    }
595}
596
597/// A chunk that has been loaded by the miniblock scheduler (but not
598/// yet decoded)
599#[derive(Debug)]
600struct LoadedChunk {
601    data: LanceBuffer,
602    items_in_chunk: u64,
603    byte_range: Range<u64>,
604    chunk_idx: usize,
605}
606
607impl Clone for LoadedChunk {
608    fn clone(&self) -> Self {
609        Self {
610            // Safe as we always create borrowed buffers here
611            data: self.data.try_clone().unwrap(),
612            items_in_chunk: self.items_in_chunk,
613            byte_range: self.byte_range.clone(),
614            chunk_idx: self.chunk_idx,
615        }
616    }
617}
618
619/// Decodes mini-block formatted data.  See [`PrimitiveStructuralEncoder`] for more
620/// details on the different layouts.
621#[derive(Debug)]
622struct MiniBlockDecoder {
623    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
624    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
625    value_decompressor: Arc<dyn MiniBlockDecompressor>,
626    def_meaning: Arc<[DefinitionInterpretation]>,
627    loaded_chunks: VecDeque<LoadedChunk>,
628    instructions: VecDeque<ChunkInstructions>,
629    offset_in_current_chunk: u64,
630    num_rows: u64,
631    num_buffers: u64,
632    dictionary: Option<Arc<DataBlock>>,
633}
634
635/// See [`MiniBlockScheduler`] for more details on the scheduling and decoding
636/// process for miniblock encoded data.
637impl StructuralPageDecoder for MiniBlockDecoder {
638    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
639        let mut items_desired = num_rows;
640        let mut need_preamble = false;
641        let mut skip_in_chunk = self.offset_in_current_chunk;
642        let mut drain_instructions = Vec::new();
643        while items_desired > 0 || need_preamble {
644            let (instructions, consumed) = self
645                .instructions
646                .front()
647                .unwrap()
648                .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
649
650            while self.loaded_chunks.front().unwrap().chunk_idx
651                != instructions.chunk_instructions.chunk_idx
652            {
653                self.loaded_chunks.pop_front();
654            }
655            drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
656            if consumed {
657                self.instructions.pop_front();
658            }
659        }
660        // We can throw away need_preamble here because it must be false.  If it were true it would mean
661        // we were still in the middle of loading rows.  We do need to latch skip_in_chunk though.
662        self.offset_in_current_chunk = skip_in_chunk;
663
664        let max_visible_level = self
665            .def_meaning
666            .iter()
667            .take_while(|l| !l.is_list())
668            .map(|l| l.num_def_levels())
669            .sum::<u16>();
670
671        Ok(Box::new(DecodeMiniBlockTask {
672            instructions: drain_instructions,
673            def_decompressor: self.def_decompressor.clone(),
674            rep_decompressor: self.rep_decompressor.clone(),
675            value_decompressor: self.value_decompressor.clone(),
676            dictionary_data: self.dictionary.clone(),
677            def_meaning: self.def_meaning.clone(),
678            num_buffers: self.num_buffers,
679            max_visible_level,
680        }))
681    }
682
683    fn num_rows(&self) -> u64 {
684        self.num_rows
685    }
686}
687
688#[derive(Debug)]
689struct CachedComplexAllNullState {
690    rep: Option<ScalarBuffer<u16>>,
691    def: Option<ScalarBuffer<u16>>,
692}
693
694impl DeepSizeOf for CachedComplexAllNullState {
695    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
696        self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
697            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
698    }
699}
700
701impl CachedPageData for CachedComplexAllNullState {
702    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
703        self
704    }
705}
706
707/// A scheduler for all-null data that has repetition and definition levels
708///
709/// We still need to do some I/O in this case because we need to figure out what kind of null we
710/// are dealing with (null list, null struct, what level null struct, etc.)
711///
712/// TODO: Right now we just load the entire rep/def at initialization time and cache it.  This is a touch
713/// RAM aggressive and maybe we want something more lazy in the future.  On the other hand, it's simple
714/// and fast so...maybe not :)
715#[derive(Debug)]
716pub struct ComplexAllNullScheduler {
717    // Set from protobuf
718    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
719    def_meaning: Arc<[DefinitionInterpretation]>,
720    repdef: Option<Arc<CachedComplexAllNullState>>,
721}
722
723impl ComplexAllNullScheduler {
724    pub fn new(
725        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
726        def_meaning: Arc<[DefinitionInterpretation]>,
727    ) -> Self {
728        Self {
729            buffer_offsets_and_sizes,
730            def_meaning,
731            repdef: None,
732        }
733    }
734}
735
736impl StructuralPageScheduler for ComplexAllNullScheduler {
737    fn initialize<'a>(
738        &'a mut self,
739        io: &Arc<dyn EncodingsIo>,
740    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
741        // Fully load the rep & def buffers, as needed
742        let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
743        let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
744        let has_rep = rep_size > 0;
745        let has_def = def_size > 0;
746
747        let mut reads = Vec::with_capacity(2);
748        if has_rep {
749            reads.push(rep_pos..rep_pos + rep_size);
750        }
751        if has_def {
752            reads.push(def_pos..def_pos + def_size);
753        }
754
755        let data = io.submit_request(reads, 0);
756
757        async move {
758            let data = data.await?;
759            let mut data_iter = data.into_iter();
760
761            let rep = if has_rep {
762                let rep = data_iter.next().unwrap();
763                let mut rep = LanceBuffer::from_bytes(rep, 2);
764                let rep = rep.borrow_to_typed_slice::<u16>();
765                Some(rep)
766            } else {
767                None
768            };
769
770            let def = if has_def {
771                let def = data_iter.next().unwrap();
772                let mut def = LanceBuffer::from_bytes(def, 2);
773                let def = def.borrow_to_typed_slice::<u16>();
774                Some(def)
775            } else {
776                None
777            };
778
779            let repdef = Arc::new(CachedComplexAllNullState { rep, def });
780
781            self.repdef = Some(repdef.clone());
782
783            Ok(repdef as Arc<dyn CachedPageData>)
784        }
785        .boxed()
786    }
787
788    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
789        self.repdef = Some(
790            data.clone()
791                .as_arc_any()
792                .downcast::<CachedComplexAllNullState>()
793                .unwrap(),
794        );
795    }
796
797    fn schedule_ranges(
798        &self,
799        ranges: &[Range<u64>],
800        _io: &Arc<dyn EncodingsIo>,
801    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
802        let ranges = VecDeque::from_iter(ranges.iter().cloned());
803        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
804        Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
805            ranges,
806            rep: self.repdef.as_ref().unwrap().rep.clone(),
807            def: self.repdef.as_ref().unwrap().def.clone(),
808            num_rows,
809            def_meaning: self.def_meaning.clone(),
810        }) as Box<dyn StructuralPageDecoder>))
811        .boxed())
812    }
813}
814
815#[derive(Debug)]
816pub struct ComplexAllNullPageDecoder {
817    ranges: VecDeque<Range<u64>>,
818    rep: Option<ScalarBuffer<u16>>,
819    def: Option<ScalarBuffer<u16>>,
820    num_rows: u64,
821    def_meaning: Arc<[DefinitionInterpretation]>,
822}
823
824impl ComplexAllNullPageDecoder {
825    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
826        let mut rows_desired = num_rows;
827        let mut ranges = Vec::with_capacity(self.ranges.len());
828        while rows_desired > 0 {
829            let front = self.ranges.front_mut().unwrap();
830            let avail = front.end - front.start;
831            if avail > rows_desired {
832                ranges.push(front.start..front.start + rows_desired);
833                front.start += rows_desired;
834                rows_desired = 0;
835            } else {
836                ranges.push(self.ranges.pop_front().unwrap());
837                rows_desired -= avail;
838            }
839        }
840        ranges
841    }
842}
843
844impl StructuralPageDecoder for ComplexAllNullPageDecoder {
845    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
846        let drained_ranges = self.drain_ranges(num_rows);
847        Ok(Box::new(DecodeComplexAllNullTask {
848            ranges: drained_ranges,
849            rep: self.rep.clone(),
850            def: self.def.clone(),
851            def_meaning: self.def_meaning.clone(),
852        }))
853    }
854
855    fn num_rows(&self) -> u64 {
856        self.num_rows
857    }
858}
859
860/// We use `ranges` to slice into `rep` and `def` and create rep/def buffers
861/// for the null data.
862#[derive(Debug)]
863pub struct DecodeComplexAllNullTask {
864    ranges: Vec<Range<u64>>,
865    rep: Option<ScalarBuffer<u16>>,
866    def: Option<ScalarBuffer<u16>>,
867    def_meaning: Arc<[DefinitionInterpretation]>,
868}
869
870impl DecodeComplexAllNullTask {
871    fn decode_level(
872        &self,
873        levels: &Option<ScalarBuffer<u16>>,
874        num_values: u64,
875    ) -> Option<Vec<u16>> {
876        levels.as_ref().map(|levels| {
877            let mut referenced_levels = Vec::with_capacity(num_values as usize);
878            for range in &self.ranges {
879                referenced_levels.extend(
880                    levels[range.start as usize..range.end as usize]
881                        .iter()
882                        .copied(),
883                );
884            }
885            referenced_levels
886        })
887    }
888}
889
890impl DecodePageTask for DecodeComplexAllNullTask {
891    fn decode(self: Box<Self>) -> Result<DecodedPage> {
892        let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
893        let data = DataBlock::AllNull(AllNullDataBlock { num_values });
894        let rep = self.decode_level(&self.rep, num_values);
895        let def = self.decode_level(&self.def, num_values);
896        let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
897        Ok(DecodedPage {
898            data,
899            repdef: unraveler,
900        })
901    }
902}
903
904/// A scheduler for simple all-null data
905///
906/// "simple" all-null data is data that is all null and only has a single level of definition and
907/// no repetition.  We don't need to read any data at all in this case.
908#[derive(Debug, Default)]
909pub struct SimpleAllNullScheduler {}
910
911impl StructuralPageScheduler for SimpleAllNullScheduler {
912    fn initialize<'a>(
913        &'a mut self,
914        _io: &Arc<dyn EncodingsIo>,
915    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
916        std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
917    }
918
919    fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
920
921    fn schedule_ranges(
922        &self,
923        ranges: &[Range<u64>],
924        _io: &Arc<dyn EncodingsIo>,
925    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
926        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
927        Ok(std::future::ready(Ok(
928            Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
929        ))
930        .boxed())
931    }
932}
933
934/// A page decode task for all-null data without any
935/// repetition and only a single level of definition
936#[derive(Debug)]
937struct SimpleAllNullDecodePageTask {
938    num_values: u64,
939}
940impl DecodePageTask for SimpleAllNullDecodePageTask {
941    fn decode(self: Box<Self>) -> Result<DecodedPage> {
942        let unraveler = RepDefUnraveler::new(
943            None,
944            Some(vec![1; self.num_values as usize]),
945            Arc::new([DefinitionInterpretation::NullableItem]),
946        );
947        Ok(DecodedPage {
948            data: DataBlock::AllNull(AllNullDataBlock {
949                num_values: self.num_values,
950            }),
951            repdef: unraveler,
952        })
953    }
954}
955
956#[derive(Debug)]
957pub struct SimpleAllNullPageDecoder {
958    num_rows: u64,
959}
960
961impl StructuralPageDecoder for SimpleAllNullPageDecoder {
962    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
963        Ok(Box::new(SimpleAllNullDecodePageTask {
964            num_values: num_rows,
965        }))
966    }
967
968    fn num_rows(&self) -> u64 {
969        self.num_rows
970    }
971}
972
973#[derive(Debug, Clone)]
974struct MiniBlockSchedulerDictionary {
975    // These come from the protobuf
976    dictionary_decompressor: Arc<dyn BlockDecompressor>,
977    dictionary_buf_position_and_size: (u64, u64),
978    dictionary_data_alignment: u64,
979    num_dictionary_items: u64,
980}
981
982/// Individual block metadata within a MiniBlock repetition index.
983#[derive(Debug)]
984struct MiniBlockRepIndexBlock {
985    // The index of the first row that starts after the beginning of this block.  If the block
986    // has a preamble this will be the row after the preamble.  If the block is entirely preamble
987    // then this will be a row that starts in some future block.
988    first_row: u64,
989    // The number of rows in the block, including the trailer but not the preamble.
990    // Can be 0 if the block is entirely preamble
991    starts_including_trailer: u64,
992    // Whether the block has a preamble
993    has_preamble: bool,
994    // Whether the block has a trailer
995    has_trailer: bool,
996}
997
998impl DeepSizeOf for MiniBlockRepIndexBlock {
999    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1000        0
1001    }
1002}
1003
1004/// Repetition index for MiniBlock encoding.
1005///
1006/// Stores block-level offset information to enable efficient random
1007/// access to nested data structures within mini-blocks.
1008#[derive(Debug)]
1009struct MiniBlockRepIndex {
1010    blocks: Vec<MiniBlockRepIndexBlock>,
1011}
1012
1013impl DeepSizeOf for MiniBlockRepIndex {
1014    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1015        self.blocks.deep_size_of_children(context)
1016    }
1017}
1018
1019impl MiniBlockRepIndex {
1020    fn decode(rep_index: &[Vec<u64>]) -> Self {
1021        let mut chunk_has_preamble = false;
1022        let mut offset = 0;
1023        let mut blocks = Vec::with_capacity(rep_index.len());
1024        for chunk_rep in rep_index {
1025            let ends_count = chunk_rep[0];
1026            let partial_count = chunk_rep[1];
1027
1028            let chunk_has_trailer = partial_count > 0;
1029            let mut starts_including_trailer = ends_count;
1030            if chunk_has_trailer {
1031                starts_including_trailer += 1;
1032            }
1033            if chunk_has_preamble {
1034                starts_including_trailer -= 1;
1035            }
1036
1037            blocks.push(MiniBlockRepIndexBlock {
1038                first_row: offset,
1039                starts_including_trailer,
1040                has_preamble: chunk_has_preamble,
1041                has_trailer: chunk_has_trailer,
1042            });
1043
1044            chunk_has_preamble = chunk_has_trailer;
1045            offset += starts_including_trailer;
1046        }
1047
1048        Self { blocks }
1049    }
1050}
1051
1052/// State that is loaded once and cached for future lookups
1053#[derive(Debug)]
1054struct MiniBlockCacheableState {
1055    /// Metadata that describes each chunk in the page
1056    chunk_meta: Vec<ChunkMeta>,
1057    /// The decoded repetition index
1058    rep_index: MiniBlockRepIndex,
1059    /// The dictionary for the page, if any
1060    dictionary: Option<Arc<DataBlock>>,
1061}
1062
1063impl DeepSizeOf for MiniBlockCacheableState {
1064    fn deep_size_of_children(&self, context: &mut Context) -> usize {
1065        self.rep_index.deep_size_of_children(context)
1066            + self
1067                .dictionary
1068                .as_ref()
1069                .map(|dict| dict.data_size() as usize)
1070                .unwrap_or(0)
1071    }
1072}
1073
1074impl CachedPageData for MiniBlockCacheableState {
1075    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1076        self
1077    }
1078}
1079
1080/// A scheduler for a page that has been encoded with the mini-block layout
1081///
1082/// Scheduling mini-block encoded data is simple in concept and somewhat complex
1083/// in practice.
1084///
1085/// First, during initialization, we load the chunk metadata, the repetition index,
1086/// and the dictionary (these last two may not be present)
1087///
1088/// Then, during scheduling, we use the user's requested row ranges and the repetition
1089/// index to determine which chunks we need and which rows we need from those chunks.
1090///
1091/// For example, if the repetition index is: [50, 3], [50, 0], [10, 0] and the range
1092/// from the user is 40..60 then we need to:
1093///
1094///  - Read the first chunk and skip the first 40 rows, then read 10 full rows, and
1095///    then read 3 items for the 11th row of our range.
1096///  - Read the second chunk and read the remaining items in our 11th row and then read
1097///    the remaining 9 full rows.
1098///
1099/// Then, if we are going to decode that in batches of 5, we need to make decode tasks.
1100/// The first two decode tasks will just need the first chunk.  The third decode task will
1101/// need the first chunk (for the trailer which has the 11th row in our range) and the second
1102/// chunk.  The final decode task will just need the second chunk.
1103///
1104/// The above prose descriptions are what are represented by [`ChunkInstructions`] and
1105/// [`ChunkDrainInstructions`].
1106#[derive(Debug)]
1107pub struct MiniBlockScheduler {
1108    // These come from the protobuf
1109    buffer_offsets_and_sizes: Vec<(u64, u64)>,
1110    priority: u64,
1111    items_in_page: u64,
1112    repetition_index_depth: u16,
1113    num_buffers: u64,
1114    rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1115    def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1116    value_decompressor: Arc<dyn MiniBlockDecompressor>,
1117    def_meaning: Arc<[DefinitionInterpretation]>,
1118    dictionary: Option<MiniBlockSchedulerDictionary>,
1119    // This is set after initialization
1120    page_meta: Option<Arc<MiniBlockCacheableState>>,
1121}
1122
1123impl MiniBlockScheduler {
1124    fn try_new(
1125        buffer_offsets_and_sizes: &[(u64, u64)],
1126        priority: u64,
1127        items_in_page: u64,
1128        layout: &pb::MiniBlockLayout,
1129        decompressors: &dyn DecompressionStrategy,
1130    ) -> Result<Self> {
1131        let rep_decompressor = layout
1132            .rep_compression
1133            .as_ref()
1134            .map(|rep_compression| {
1135                decompressors
1136                    .create_block_decompressor(rep_compression)
1137                    .map(Arc::from)
1138            })
1139            .transpose()?;
1140        let def_decompressor = layout
1141            .def_compression
1142            .as_ref()
1143            .map(|def_compression| {
1144                decompressors
1145                    .create_block_decompressor(def_compression)
1146                    .map(Arc::from)
1147            })
1148            .transpose()?;
1149        let def_meaning = layout
1150            .layers
1151            .iter()
1152            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1153            .collect::<Vec<_>>();
1154        let value_decompressor = decompressors.create_miniblock_decompressor(
1155            layout.value_compression.as_ref().unwrap(),
1156            decompressors,
1157        )?;
1158
1159        let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1160            let num_dictionary_items = layout.num_dictionary_items;
1161            match dictionary_encoding.array_encoding.as_ref().unwrap() {
1162                pb::array_encoding::ArrayEncoding::Variable(_) => {
1163                    Some(MiniBlockSchedulerDictionary {
1164                        dictionary_decompressor: decompressors
1165                            .create_block_decompressor(dictionary_encoding)?
1166                            .into(),
1167                        dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1168                        dictionary_data_alignment: 4,
1169                        num_dictionary_items,
1170                    })
1171                }
1172                pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1173                    dictionary_decompressor: decompressors
1174                        .create_block_decompressor(dictionary_encoding)?
1175                        .into(),
1176                    dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1177                    dictionary_data_alignment: 16,
1178                    num_dictionary_items,
1179                }),
1180                _ => {
1181                    unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1182                }
1183            }
1184        } else {
1185            None
1186        };
1187
1188        Ok(Self {
1189            buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1190            rep_decompressor,
1191            def_decompressor,
1192            value_decompressor: value_decompressor.into(),
1193            repetition_index_depth: layout.repetition_index_depth as u16,
1194            num_buffers: layout.num_buffers,
1195            priority,
1196            items_in_page,
1197            dictionary,
1198            def_meaning: def_meaning.into(),
1199            page_meta: None,
1200        })
1201    }
1202
1203    fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1204        let page_meta = self.page_meta.as_ref().unwrap();
1205        chunk_indices
1206            .iter()
1207            .map(|&chunk_idx| {
1208                let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1209                let bytes_start = chunk_meta.offset_bytes;
1210                let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1211                LoadedChunk {
1212                    byte_range: bytes_start..bytes_end,
1213                    items_in_chunk: chunk_meta.num_values,
1214                    chunk_idx,
1215                    data: LanceBuffer::empty(),
1216                }
1217            })
1218            .collect()
1219    }
1220}
1221
1222#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1223enum PreambleAction {
1224    Take,
1225    Skip,
1226    Absent,
1227}
1228
1229// TODO: Add test cases for the all-preamble and all-trailer cases
1230
1231// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
1232// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
1233// us how exactly to read from the chunk.
1234#[derive(Clone, Debug, PartialEq, Eq)]
1235struct ChunkInstructions {
1236    // The index of the chunk to read
1237    chunk_idx: usize,
1238    // A "preamble" is when a chunk begins with a continuation of the previous chunk's list.  If there
1239    // is no repetition index there is never a preamble.
1240    //
1241    // It's possible for a chunk to be entirely premable.  For example, if there is a really large list
1242    // that spans several chunks.
1243    preamble: PreambleAction,
1244    // How many complete rows (not including the preamble or trailer) to skip
1245    //
1246    // If this is non-zero then premable must not be Take
1247    rows_to_skip: u64,
1248    // How many complete (non-preamble / non-trailer) rows to take
1249    rows_to_take: u64,
1250    // A "trailer" is when a chunk ends with a partial list.  If there is no repetition index there is
1251    // never a trailer.
1252    //
1253    // It's possible for a chunk to be entirely trailer.  This would mean the chunk starts with the beginning
1254    // of a list and that list is continued in the next chunk.
1255    //
1256    // If this is true then we want to include the trailer
1257    take_trailer: bool,
1258}
1259
1260// First, we schedule a bunch of [`ChunkInstructions`] based on the users ranges.  Then we
1261// start decoding them, based on a batch size, which might not align with what we scheduled.
1262//
1263// This results in `ChunkDrainInstructions` which targets a contiguous slice of a `ChunkInstructions`
1264//
1265// So if `ChunkInstructions` is "skip preamble, skip 10, take 50, take trailer" and we are decoding in
1266// batches of size 10 we might have a `ChunkDrainInstructions` that targets that chunk and has its own
1267// skip of 17 and take of 10.  This would mean we decode the chunk, skip the preamble and 27 rows, and
1268// then take 10 rows.
1269//
1270// One very confusing bit is that `rows_to_take` includes the trailer.  So if we have two chunks:
1271//  -no preamble, skip 5, take 10, take trailer
1272//  -take preamble, skip 0, take 50, no trailer
1273//
1274// and we are draining 20 rows then the drain instructions for the first batch will be:
1275//  - no preamble, skip 0 (from chunk 0), take 11 (from chunk 0)
1276//  - take preamble (from chunk 1), skip 0 (from chunk 1), take 9 (from chunk 1)
1277#[derive(Debug, PartialEq, Eq)]
1278struct ChunkDrainInstructions {
1279    chunk_instructions: ChunkInstructions,
1280    rows_to_skip: u64,
1281    rows_to_take: u64,
1282    preamble_action: PreambleAction,
1283}
1284
1285impl ChunkInstructions {
1286    // Given a repetition index and a set of user ranges we need to figure out how to read from the chunks
1287    //
1288    // We assume that `user_ranges` are in sorted order and non-overlapping
1289    //
1290    // The output will be a set of `ChunkInstructions` which tell us how to read from the chunks
1291    fn schedule_instructions(
1292        rep_index: &MiniBlockRepIndex,
1293        user_ranges: &[Range<u64>],
1294    ) -> Vec<Self> {
1295        // This is an in-exact capacity guess but pretty good.  The actual capacity can be
1296        // smaller if instructions are merged.  It can be larger if there are multiple instructions
1297        // per row which can happen with lists.
1298        let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1299
1300        for user_range in user_ranges {
1301            let mut rows_needed = user_range.end - user_range.start;
1302            let mut need_preamble = false;
1303
1304            // Need to find the first chunk with a first row >= user_range.start.  If there are
1305            // multiple chunks with the same first row we need to take the first one.
1306            let mut block_index = match rep_index
1307                .blocks
1308                .binary_search_by_key(&user_range.start, |block| block.first_row)
1309            {
1310                Ok(idx) => {
1311                    // Slightly tricky case, we may need to walk backwards a bit to make sure we
1312                    // are grabbing first eligible chunk
1313                    let mut idx = idx;
1314                    while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1315                        idx -= 1;
1316                    }
1317                    idx
1318                }
1319                // Easy case.  idx is greater, and idx - 1 is smaller, so idx - 1 contains the start
1320                Err(idx) => idx - 1,
1321            };
1322
1323            let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1324
1325            while rows_needed > 0 || need_preamble {
1326                let chunk = &rep_index.blocks[block_index];
1327                let rows_avail = chunk.starts_including_trailer - to_skip;
1328                debug_assert!(rows_avail > 0);
1329
1330                let rows_to_take = rows_avail.min(rows_needed);
1331                rows_needed -= rows_to_take;
1332
1333                let mut take_trailer = false;
1334                let preamble = if chunk.has_preamble {
1335                    if need_preamble {
1336                        PreambleAction::Take
1337                    } else {
1338                        PreambleAction::Skip
1339                    }
1340                } else {
1341                    PreambleAction::Absent
1342                };
1343                let mut rows_to_take_no_trailer = rows_to_take;
1344
1345                // Are we taking the trailer?  If so, make sure we mark that we need the preamble
1346                if rows_to_take == rows_avail && chunk.has_trailer {
1347                    take_trailer = true;
1348                    need_preamble = true;
1349                    rows_to_take_no_trailer -= 1;
1350                } else {
1351                    need_preamble = false;
1352                };
1353
1354                chunk_instructions.push(Self {
1355                    preamble,
1356                    chunk_idx: block_index,
1357                    rows_to_skip: to_skip,
1358                    rows_to_take: rows_to_take_no_trailer,
1359                    take_trailer,
1360                });
1361
1362                to_skip = 0;
1363                block_index += 1;
1364            }
1365        }
1366
1367        // If there were multiple ranges we may have multiple instructions for a single chunk.  Merge them now if they
1368        // are _adjacent_ (i.e. don't merge "take first row of chunk 0" and "take third row of chunk 0" into "take 2
1369        // rows of chunk 0 starting at 0")
1370        if user_ranges.len() > 1 {
1371            // TODO: Could probably optimize this allocation away
1372            let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1373            let mut instructions_iter = chunk_instructions.into_iter();
1374            merged_instructions.push(instructions_iter.next().unwrap());
1375            for instruction in instructions_iter {
1376                let last = merged_instructions.last_mut().unwrap();
1377                if last.chunk_idx == instruction.chunk_idx
1378                    && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1379                {
1380                    last.rows_to_take += instruction.rows_to_take;
1381                    last.take_trailer |= instruction.take_trailer;
1382                } else {
1383                    merged_instructions.push(instruction);
1384                }
1385            }
1386            merged_instructions
1387        } else {
1388            chunk_instructions
1389        }
1390    }
1391
1392    fn drain_from_instruction(
1393        &self,
1394        rows_desired: &mut u64,
1395        need_preamble: &mut bool,
1396        skip_in_chunk: &mut u64,
1397    ) -> (ChunkDrainInstructions, bool) {
1398        // If we need the premable then we shouldn't be skipping anything
1399        debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1400        let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1401        let has_preamble = self.preamble != PreambleAction::Absent;
1402        let preamble_action = match (*need_preamble, has_preamble) {
1403            (true, true) => PreambleAction::Take,
1404            (true, false) => panic!("Need preamble but there isn't one"),
1405            (false, true) => PreambleAction::Skip,
1406            (false, false) => PreambleAction::Absent,
1407        };
1408
1409        // Did the scheduled chunk have a trailer?  If so, we have one extra row available
1410        if self.take_trailer {
1411            rows_avail += 1;
1412        }
1413
1414        // How many rows are we actually taking in this take step (including the preamble
1415        // and trailer both as individual rows)
1416        let rows_taking = if *rows_desired >= rows_avail {
1417            // We want all the rows.  If there is a trailer we are grabbing it and will need
1418            // the preamble of the next chunk
1419            *need_preamble = self.take_trailer;
1420            rows_avail
1421        } else {
1422            // We aren't taking all the rows.  Even if there is a trailer we aren't taking
1423            // it so we will not need the preamble
1424            *need_preamble = false;
1425            *rows_desired
1426        };
1427        let rows_skipped = *skip_in_chunk;
1428
1429        // Update the state for the next iteration
1430        let consumed_chunk = if *rows_desired >= rows_avail {
1431            *rows_desired -= rows_avail;
1432            *skip_in_chunk = 0;
1433            true
1434        } else {
1435            *skip_in_chunk += *rows_desired;
1436            *rows_desired = 0;
1437            false
1438        };
1439
1440        (
1441            ChunkDrainInstructions {
1442                chunk_instructions: self.clone(),
1443                rows_to_skip: rows_skipped,
1444                rows_to_take: rows_taking,
1445                preamble_action,
1446            },
1447            consumed_chunk,
1448        )
1449    }
1450}
1451
1452impl StructuralPageScheduler for MiniBlockScheduler {
1453    fn initialize<'a>(
1454        &'a mut self,
1455        io: &Arc<dyn EncodingsIo>,
1456    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1457        // We always need to fetch chunk metadata.  We may also need to fetch a dictionary and
1458        // we may also need to fetch the repetition index.  Here, we gather what buffers we
1459        // need.
1460        let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1461        let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1462        let mut bufs_needed = 1;
1463        if self.dictionary.is_some() {
1464            bufs_needed += 1;
1465        }
1466        if self.repetition_index_depth > 0 {
1467            bufs_needed += 1;
1468        }
1469        let mut required_ranges = Vec::with_capacity(bufs_needed);
1470        required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1471        if let Some(ref dictionary) = self.dictionary {
1472            required_ranges.push(
1473                dictionary.dictionary_buf_position_and_size.0
1474                    ..dictionary.dictionary_buf_position_and_size.0
1475                        + dictionary.dictionary_buf_position_and_size.1,
1476            );
1477        }
1478        if self.repetition_index_depth > 0 {
1479            let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1480            required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1481        }
1482        let io_req = io.submit_request(required_ranges, 0);
1483
1484        async move {
1485            let mut buffers = io_req.await?.into_iter().fuse();
1486            let meta_bytes = buffers.next().unwrap();
1487            let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1488            let rep_index_bytes = buffers.next();
1489
1490            // Parse the metadata and build the chunk meta
1491            assert!(meta_bytes.len() % 2 == 0);
1492            let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1493            let words = bytes.borrow_to_typed_slice::<u16>();
1494            let words = words.as_ref();
1495
1496            let mut chunk_meta = Vec::with_capacity(words.len());
1497
1498            let mut rows_counter = 0;
1499            let mut offset_bytes = value_buf_position;
1500            for (word_idx, word) in words.iter().enumerate() {
1501                let log_num_values = word & 0x0F;
1502                let divided_bytes = word >> 4;
1503                let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1504                debug_assert!(num_bytes > 0);
1505                let num_values = if word_idx < words.len() - 1 {
1506                    debug_assert!(log_num_values > 0);
1507                    1 << log_num_values
1508                } else {
1509                    debug_assert!(
1510                        log_num_values == 0
1511                            || (1 << log_num_values) == (self.items_in_page - rows_counter)
1512                    );
1513                    self.items_in_page - rows_counter
1514                };
1515                rows_counter += num_values;
1516
1517                chunk_meta.push(ChunkMeta {
1518                    num_values,
1519                    chunk_size_bytes: num_bytes as u64,
1520                    offset_bytes,
1521                });
1522                offset_bytes += num_bytes as u64;
1523            }
1524
1525            // Build the repetition index
1526            let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1527                // If we have a repetition index then we use that
1528                // TODO: Compress the repetition index :)
1529                assert!(rep_index_data.len() % 8 == 0);
1530                let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1531                let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1532                // Unflatten
1533                repetition_index_vals
1534                    .as_ref()
1535                    .chunks_exact(self.repetition_index_depth as usize + 1)
1536                    .map(|c| c.to_vec())
1537                    .collect::<Vec<_>>()
1538            } else {
1539                // Default rep index is just the number of items in each chunk
1540                // with 0 partials/leftovers
1541                chunk_meta
1542                    .iter()
1543                    .map(|c| vec![c.num_values, 0])
1544                    .collect::<Vec<_>>()
1545            };
1546
1547            let mut page_meta = MiniBlockCacheableState {
1548                chunk_meta,
1549                rep_index: MiniBlockRepIndex::decode(&rep_index),
1550                dictionary: None,
1551            };
1552
1553            // decode dictionary
1554            if let Some(ref mut dictionary) = self.dictionary {
1555                let dictionary_data = dictionary_bytes.unwrap();
1556                page_meta.dictionary =
1557                    Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1558                        LanceBuffer::from_bytes(
1559                            dictionary_data,
1560                            dictionary.dictionary_data_alignment,
1561                        ),
1562                        dictionary.num_dictionary_items,
1563                    )?));
1564            };
1565            let page_meta = Arc::new(page_meta);
1566            self.page_meta = Some(page_meta.clone());
1567            Ok(page_meta as Arc<dyn CachedPageData>)
1568        }
1569        .boxed()
1570    }
1571
1572    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1573        self.page_meta = Some(
1574            data.clone()
1575                .as_arc_any()
1576                .downcast::<MiniBlockCacheableState>()
1577                .unwrap(),
1578        );
1579    }
1580
1581    fn schedule_ranges(
1582        &self,
1583        ranges: &[Range<u64>],
1584        io: &Arc<dyn EncodingsIo>,
1585    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1586        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1587
1588        let page_meta = self.page_meta.as_ref().unwrap();
1589
1590        let chunk_instructions =
1591            ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1592
1593        debug_assert_eq!(
1594            num_rows,
1595            chunk_instructions
1596                .iter()
1597                .map(|ci| {
1598                    let taken = ci.rows_to_take;
1599                    if ci.take_trailer {
1600                        taken + 1
1601                    } else {
1602                        taken
1603                    }
1604                })
1605                .sum::<u64>()
1606        );
1607
1608        let chunks_needed = chunk_instructions
1609            .iter()
1610            .map(|ci| ci.chunk_idx)
1611            .unique()
1612            .collect::<Vec<_>>();
1613        let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1614        let chunk_ranges = loaded_chunks
1615            .iter()
1616            .map(|c| c.byte_range.clone())
1617            .collect::<Vec<_>>();
1618        let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1619
1620        let rep_decompressor = self.rep_decompressor.clone();
1621        let def_decompressor = self.def_decompressor.clone();
1622        let value_decompressor = self.value_decompressor.clone();
1623        let num_buffers = self.num_buffers;
1624        let dictionary = page_meta
1625            .dictionary
1626            .as_ref()
1627            .map(|dictionary| dictionary.clone());
1628        let def_meaning = self.def_meaning.clone();
1629
1630        let res = async move {
1631            let loaded_chunk_data = loaded_chunk_data.await?;
1632            for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1633                loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1634            }
1635
1636            Ok(Box::new(MiniBlockDecoder {
1637                rep_decompressor,
1638                def_decompressor,
1639                value_decompressor,
1640                def_meaning,
1641                loaded_chunks: VecDeque::from_iter(loaded_chunks),
1642                instructions: VecDeque::from(chunk_instructions),
1643                offset_in_current_chunk: 0,
1644                dictionary,
1645                num_rows,
1646                num_buffers,
1647            }) as Box<dyn StructuralPageDecoder>)
1648        }
1649        .boxed();
1650        Ok(res)
1651    }
1652}
1653
1654#[derive(Debug, Clone, Copy)]
1655struct FullZipRepIndexDetails {
1656    buf_position: u64,
1657    bytes_per_value: u64, // Will be 1, 2, 4, or 8
1658}
1659
1660#[derive(Debug)]
1661enum PerValueDecompressor {
1662    Fixed(Arc<dyn FixedPerValueDecompressor>),
1663    Variable(Arc<dyn VariablePerValueDecompressor>),
1664}
1665
1666#[derive(Debug)]
1667struct FullZipDecodeDetails {
1668    value_decompressor: PerValueDecompressor,
1669    def_meaning: Arc<[DefinitionInterpretation]>,
1670    ctrl_word_parser: ControlWordParser,
1671    max_rep: u16,
1672    max_visible_def: u16,
1673}
1674
1675/// A scheduler for full-zip encoded data
1676///
1677/// When the data type has a fixed-width then we simply need to map from
1678/// row ranges to byte ranges using the fixed-width of the data type.
1679///
1680/// When the data type is variable-width or has any repetition then a
1681/// repetition index is required.
1682#[derive(Debug)]
1683pub struct FullZipScheduler {
1684    data_buf_position: u64,
1685    rep_index: Option<FullZipRepIndexDetails>,
1686    priority: u64,
1687    rows_in_page: u64,
1688    bits_per_offset: u8,
1689    details: Arc<FullZipDecodeDetails>,
1690    /// Cached state containing the decoded repetition index
1691    cached_state: Option<Arc<FullZipCacheableState>>,
1692    /// Whether to enable caching of repetition indices
1693    enable_cache: bool,
1694}
1695
1696impl FullZipScheduler {
1697    fn try_new(
1698        buffer_offsets_and_sizes: &[(u64, u64)],
1699        priority: u64,
1700        rows_in_page: u64,
1701        layout: &pb::FullZipLayout,
1702        decompressors: &dyn DecompressionStrategy,
1703    ) -> Result<Self> {
1704        // We don't need the data_buf_size because either the data type is
1705        // fixed-width (and we can tell size from rows_in_page) or it is not
1706        // and we have a repetition index.
1707        let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1708        let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1709            let num_reps = rows_in_page + 1;
1710            let bytes_per_rep = len / num_reps;
1711            debug_assert_eq!(len % num_reps, 0);
1712            debug_assert!(
1713                bytes_per_rep == 1
1714                    || bytes_per_rep == 2
1715                    || bytes_per_rep == 4
1716                    || bytes_per_rep == 8
1717            );
1718            FullZipRepIndexDetails {
1719                buf_position: *pos,
1720                bytes_per_value: bytes_per_rep,
1721            }
1722        });
1723
1724        let value_decompressor = match layout.details {
1725            Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1726                let decompressor = decompressors.create_fixed_per_value_decompressor(
1727                    layout.value_compression.as_ref().unwrap(),
1728                )?;
1729                PerValueDecompressor::Fixed(decompressor.into())
1730            }
1731            Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1732                let decompressor = decompressors.create_variable_per_value_decompressor(
1733                    layout.value_compression.as_ref().unwrap(),
1734                )?;
1735                PerValueDecompressor::Variable(decompressor.into())
1736            }
1737            None => {
1738                panic!("Full-zip layout must have a `details` field");
1739            }
1740        };
1741        let ctrl_word_parser = ControlWordParser::new(
1742            layout.bits_rep.try_into().unwrap(),
1743            layout.bits_def.try_into().unwrap(),
1744        );
1745        let def_meaning = layout
1746            .layers
1747            .iter()
1748            .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1749            .collect::<Vec<_>>();
1750
1751        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1752        let max_visible_def = def_meaning
1753            .iter()
1754            .filter(|d| !d.is_list())
1755            .map(|d| d.num_def_levels())
1756            .sum();
1757
1758        let bits_per_offset = match layout.details {
1759            Some(pb::full_zip_layout::Details::BitsPerValue(_)) => 32,
1760            Some(pb::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1761                bits_per_offset as u8
1762            }
1763            None => panic!("Full-zip layout must have a `details` field"),
1764        };
1765
1766        let details = Arc::new(FullZipDecodeDetails {
1767            value_decompressor,
1768            def_meaning: def_meaning.into(),
1769            ctrl_word_parser,
1770            max_rep,
1771            max_visible_def,
1772        });
1773        Ok(Self {
1774            data_buf_position,
1775            rep_index,
1776            details,
1777            priority,
1778            rows_in_page,
1779            bits_per_offset,
1780            cached_state: None,
1781            enable_cache: false, // Default to false, will be set later
1782        })
1783    }
1784
1785    /// Creates a decoder from the loaded data
1786    fn create_decoder(
1787        details: Arc<FullZipDecodeDetails>,
1788        data: VecDeque<LanceBuffer>,
1789        num_rows: u64,
1790        bits_per_offset: u8,
1791    ) -> Result<Box<dyn StructuralPageDecoder>> {
1792        match &details.value_decompressor {
1793            PerValueDecompressor::Fixed(decompressor) => {
1794                let bits_per_value = decompressor.bits_per_value();
1795                if bits_per_value == 0 {
1796                    return Err(lance_core::Error::Internal {
1797                        message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1798                        location: location!(),
1799                    });
1800                }
1801                if bits_per_value % 8 != 0 {
1802                    return Err(lance_core::Error::NotSupported {
1803                        source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1804                        location: location!(),
1805                    });
1806                }
1807                let bytes_per_value = bits_per_value / 8;
1808                let total_bytes_per_value =
1809                    bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1810                Ok(Box::new(FixedFullZipDecoder {
1811                    details,
1812                    data,
1813                    num_rows,
1814                    offset_in_current: 0,
1815                    bytes_per_value: bytes_per_value as usize,
1816                    total_bytes_per_value,
1817                }) as Box<dyn StructuralPageDecoder>)
1818            }
1819            PerValueDecompressor::Variable(_decompressor) => {
1820                Ok(Box::new(VariableFullZipDecoder::new(
1821                    details,
1822                    data,
1823                    num_rows,
1824                    bits_per_offset,
1825                    bits_per_offset,
1826                )))
1827            }
1828        }
1829    }
1830
1831    /// Extracts byte ranges from a repetition index buffer
1832    /// The buffer contains pairs of (start, end) values for each range
1833    fn extract_byte_ranges_from_pairs(
1834        buffer: LanceBuffer,
1835        bytes_per_value: u64,
1836        data_buf_position: u64,
1837    ) -> Vec<Range<u64>> {
1838        ByteUnpacker::new(buffer, bytes_per_value as usize)
1839            .chunks(2)
1840            .into_iter()
1841            .map(|mut c| {
1842                let start = c.next().unwrap() + data_buf_position;
1843                let end = c.next().unwrap() + data_buf_position;
1844                start..end
1845            })
1846            .collect::<Vec<_>>()
1847    }
1848
1849    /// Extracts byte ranges from a cached repetition index buffer
1850    /// The buffer contains all values and we need to extract specific ranges
1851    fn extract_byte_ranges_from_cached(
1852        buffer: &LanceBuffer,
1853        ranges: &[Range<u64>],
1854        bytes_per_value: u64,
1855        data_buf_position: u64,
1856    ) -> Vec<Range<u64>> {
1857        ranges
1858            .iter()
1859            .map(|r| {
1860                let start_offset = (r.start * bytes_per_value) as usize;
1861                let end_offset = (r.end * bytes_per_value) as usize;
1862
1863                let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
1864                let start_val =
1865                    ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
1866                        .next()
1867                        .unwrap();
1868
1869                let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
1870                let end_val =
1871                    ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
1872                        .next()
1873                        .unwrap();
1874
1875                (data_buf_position + start_val)..(data_buf_position + end_val)
1876            })
1877            .collect()
1878    }
1879
1880    /// Computes the ranges in the repetition index that need to be loaded
1881    fn compute_rep_index_ranges(
1882        ranges: &[Range<u64>],
1883        rep_index: &FullZipRepIndexDetails,
1884    ) -> Vec<Range<u64>> {
1885        ranges
1886            .iter()
1887            .flat_map(|r| {
1888                let first_val_start =
1889                    rep_index.buf_position + (r.start * rep_index.bytes_per_value);
1890                let first_val_end = first_val_start + rep_index.bytes_per_value;
1891                let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
1892                let last_val_end = last_val_start + rep_index.bytes_per_value;
1893                [first_val_start..first_val_end, last_val_start..last_val_end]
1894            })
1895            .collect()
1896    }
1897
1898    /// Resolves byte ranges from repetition index (either from cache or disk)
1899    async fn resolve_byte_ranges(
1900        data_buf_position: u64,
1901        ranges: &[Range<u64>],
1902        io: &Arc<dyn EncodingsIo>,
1903        rep_index: &FullZipRepIndexDetails,
1904        cached_state: Option<&Arc<FullZipCacheableState>>,
1905        priority: u64,
1906    ) -> Result<Vec<Range<u64>>> {
1907        if let Some(cached_state) = cached_state {
1908            // Use cached repetition index
1909            Ok(Self::extract_byte_ranges_from_cached(
1910                &cached_state.rep_index_buffer,
1911                ranges,
1912                rep_index.bytes_per_value,
1913                data_buf_position,
1914            ))
1915        } else {
1916            // Load from disk
1917            let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
1918            let rep_data = io.submit_request(rep_ranges, priority).await?;
1919            let rep_buffer = LanceBuffer::concat(
1920                &rep_data
1921                    .into_iter()
1922                    .map(|d| LanceBuffer::from_bytes(d, 1))
1923                    .collect::<Vec<_>>(),
1924            );
1925            Ok(Self::extract_byte_ranges_from_pairs(
1926                rep_buffer,
1927                rep_index.bytes_per_value,
1928                data_buf_position,
1929            ))
1930        }
1931    }
1932
1933    /// Schedules ranges in the presence of a repetition index
1934    fn schedule_ranges_rep(
1935        &self,
1936        ranges: &[Range<u64>],
1937        io: &Arc<dyn EncodingsIo>,
1938        rep_index: FullZipRepIndexDetails,
1939    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1940        // Copy necessary fields to avoid lifetime issues
1941        let data_buf_position = self.data_buf_position;
1942        let cached_state = self.cached_state.clone();
1943        let priority = self.priority;
1944        let details = self.details.clone();
1945        let bits_per_offset = self.bits_per_offset;
1946        let ranges = ranges.to_vec();
1947        let io_clone = io.clone();
1948
1949        Ok(async move {
1950            // Step 1: Resolve byte ranges from repetition index
1951            let byte_ranges = Self::resolve_byte_ranges(
1952                data_buf_position,
1953                &ranges,
1954                &io_clone,
1955                &rep_index,
1956                cached_state.as_ref(),
1957                priority,
1958            )
1959            .await?;
1960
1961            // Step 2: Load data
1962            let data = io_clone.submit_request(byte_ranges, priority).await?;
1963            let data = data
1964                .into_iter()
1965                .map(|d| LanceBuffer::from_bytes(d, 1))
1966                .collect::<VecDeque<_>>();
1967
1968            // Step 3: Calculate total rows
1969            let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1970
1971            // Step 4: Create decoder
1972            Self::create_decoder(details, data, num_rows, bits_per_offset)
1973        }
1974        .boxed())
1975    }
1976
1977    // In the simple case there is no repetition and we just have large fixed-width
1978    // rows of data.  We can just map row ranges to byte ranges directly using the
1979    // fixed-width of the data type.
1980    fn schedule_ranges_simple(
1981        &self,
1982        ranges: &[Range<u64>],
1983        io: &dyn EncodingsIo,
1984    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1985        // Convert row ranges to item ranges (i.e. multiply by items per row)
1986        let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1987
1988        let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
1989            unreachable!()
1990        };
1991
1992        // Convert item ranges to byte ranges (i.e. multiply by bytes per item)
1993        let bits_per_value = decompressor.bits_per_value();
1994        assert_eq!(bits_per_value % 8, 0);
1995        let bytes_per_value = bits_per_value / 8;
1996        let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
1997        let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
1998        let byte_ranges = ranges.iter().map(|r| {
1999            debug_assert!(r.end <= self.rows_in_page);
2000            let start = self.data_buf_position + r.start * total_bytes_per_value;
2001            let end = self.data_buf_position + r.end * total_bytes_per_value;
2002            start..end
2003        });
2004
2005        // Request byte ranges
2006        let data = io.submit_request(byte_ranges.collect(), self.priority);
2007
2008        let details = self.details.clone();
2009
2010        Ok(async move {
2011            let data = data.await?;
2012            let data = data
2013                .into_iter()
2014                .map(|d| LanceBuffer::from_bytes(d, 1))
2015                .collect();
2016            Ok(Box::new(FixedFullZipDecoder {
2017                details,
2018                data,
2019                num_rows,
2020                offset_in_current: 0,
2021                bytes_per_value: bytes_per_value as usize,
2022                total_bytes_per_value: total_bytes_per_value as usize,
2023            }) as Box<dyn StructuralPageDecoder>)
2024        }
2025        .boxed())
2026    }
2027}
2028
2029/// Cacheable state for FullZip encoding, storing the decoded repetition index
2030#[derive(Debug)]
2031struct FullZipCacheableState {
2032    /// The raw repetition index buffer for future decoding
2033    rep_index_buffer: LanceBuffer,
2034}
2035
2036impl DeepSizeOf for FullZipCacheableState {
2037    fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2038        self.rep_index_buffer.len()
2039    }
2040}
2041
2042impl CachedPageData for FullZipCacheableState {
2043    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2044        self
2045    }
2046}
2047
2048impl StructuralPageScheduler for FullZipScheduler {
2049    /// Initializes the scheduler. If there's a repetition index, loads and caches it.
2050    /// Otherwise returns NoCachedPageData.
2051    fn initialize<'a>(
2052        &'a mut self,
2053        io: &Arc<dyn EncodingsIo>,
2054    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2055        // Check if caching is enabled and we have a repetition index
2056        if self.enable_cache && self.rep_index.is_some() {
2057            let rep_index = self.rep_index.as_ref().unwrap();
2058            // Calculate the total size of the repetition index
2059            let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2060            let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2061
2062            // Load the repetition index buffer
2063            let io_clone = io.clone();
2064            let future = async move {
2065                let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2066                let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2067
2068                // Create and return the cacheable state
2069                Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2070            };
2071
2072            future.boxed()
2073        } else {
2074            // Caching disabled or no repetition index, skip caching
2075            std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2076        }
2077    }
2078
2079    /// Loads previously cached repetition index data from the cache system.
2080    /// This method is called when a scheduler instance needs to use cached data
2081    /// that was initialized by another instance or in a previous operation.
2082    fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2083        // Try to downcast to our specific cache type
2084        if let Ok(cached_state) = cache
2085            .clone()
2086            .as_arc_any()
2087            .downcast::<FullZipCacheableState>()
2088        {
2089            // Store the cached state for use in schedule_ranges
2090            self.cached_state = Some(cached_state);
2091        }
2092    }
2093
2094    fn schedule_ranges(
2095        &self,
2096        ranges: &[Range<u64>],
2097        io: &Arc<dyn EncodingsIo>,
2098    ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2099        if let Some(rep_index) = self.rep_index {
2100            self.schedule_ranges_rep(ranges, io, rep_index)
2101        } else {
2102            self.schedule_ranges_simple(ranges, io.as_ref())
2103        }
2104    }
2105}
2106
2107/// A decoder for full-zip encoded data when the data has a fixed-width
2108///
2109/// Here we need to unzip the control words from the values themselves and
2110/// then decompress the requested values.
2111///
2112/// We use a PerValueDecompressor because we will only be decompressing the
2113/// requested data.  This decoder / scheduler does not do any read amplification.
2114#[derive(Debug)]
2115struct FixedFullZipDecoder {
2116    details: Arc<FullZipDecodeDetails>,
2117    data: VecDeque<LanceBuffer>,
2118    offset_in_current: usize,
2119    bytes_per_value: usize,
2120    total_bytes_per_value: usize,
2121    num_rows: u64,
2122}
2123
2124impl FixedFullZipDecoder {
2125    fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2126        debug_assert!(num_rows > 0);
2127        let cur_buf = self.data.front_mut().unwrap();
2128        let start = self.offset_in_current;
2129        if self.details.ctrl_word_parser.has_rep() {
2130            // This is a slightly slower path.  In order to figure out where to split we need to
2131            // examine the rep index so we can convert num_lists to num_rows
2132            let mut rows_started = 0;
2133            // We always need at least one value.  Now loop through until we have passed num_rows
2134            // values
2135            let mut num_items = 0;
2136            while self.offset_in_current < cur_buf.len() {
2137                let control = self.details.ctrl_word_parser.parse_desc(
2138                    &cur_buf[self.offset_in_current..],
2139                    self.details.max_rep,
2140                    self.details.max_visible_def,
2141                );
2142                if control.is_new_row {
2143                    if rows_started == num_rows {
2144                        break;
2145                    }
2146                    rows_started += 1;
2147                }
2148                num_items += 1;
2149                if control.is_visible {
2150                    self.offset_in_current += self.total_bytes_per_value;
2151                } else {
2152                    self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2153                }
2154            }
2155
2156            let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2157            if self.offset_in_current == cur_buf.len() {
2158                self.data.pop_front();
2159                self.offset_in_current = 0;
2160            }
2161
2162            FullZipDecodeTaskItem {
2163                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2164                    data: task_slice,
2165                    bits_per_value: self.bytes_per_value as u64 * 8,
2166                    num_values: num_items,
2167                    block_info: BlockInfo::new(),
2168                }),
2169                rows_in_buf: rows_started,
2170            }
2171        } else {
2172            // If there's no repetition we can calculate the slicing point by just multiplying
2173            // the number of rows by the total bytes per value
2174            let cur_buf = self.data.front_mut().unwrap();
2175            let bytes_avail = cur_buf.len() - self.offset_in_current;
2176            let offset_in_cur = self.offset_in_current;
2177
2178            let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2179            let mut rows_taken = num_rows;
2180            let task_slice = if bytes_needed >= bytes_avail {
2181                self.offset_in_current = 0;
2182                rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2183                self.data
2184                    .pop_front()
2185                    .unwrap()
2186                    .slice_with_length(offset_in_cur, bytes_avail)
2187            } else {
2188                self.offset_in_current += bytes_needed;
2189                cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2190            };
2191            FullZipDecodeTaskItem {
2192                data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2193                    data: task_slice,
2194                    bits_per_value: self.bytes_per_value as u64 * 8,
2195                    num_values: rows_taken,
2196                    block_info: BlockInfo::new(),
2197                }),
2198                rows_in_buf: rows_taken,
2199            }
2200        }
2201    }
2202}
2203
2204impl StructuralPageDecoder for FixedFullZipDecoder {
2205    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2206        let mut task_data = Vec::with_capacity(self.data.len());
2207        let mut remaining = num_rows;
2208        while remaining > 0 {
2209            let task_item = self.slice_next_task(remaining);
2210            remaining -= task_item.rows_in_buf;
2211            task_data.push(task_item);
2212        }
2213        Ok(Box::new(FixedFullZipDecodeTask {
2214            details: self.details.clone(),
2215            data: task_data,
2216            bytes_per_value: self.bytes_per_value,
2217            num_rows: num_rows as usize,
2218        }))
2219    }
2220
2221    fn num_rows(&self) -> u64 {
2222        self.num_rows
2223    }
2224}
2225
2226/// A decoder for full-zip encoded data when the data has a variable-width
2227///
2228/// Here we need to unzip the control words AND lengths from the values and
2229/// then decompress the requested values.
2230#[derive(Debug)]
2231struct VariableFullZipDecoder {
2232    details: Arc<FullZipDecodeDetails>,
2233    decompressor: Arc<dyn VariablePerValueDecompressor>,
2234    data: LanceBuffer,
2235    offsets: LanceBuffer,
2236    rep: ScalarBuffer<u16>,
2237    def: ScalarBuffer<u16>,
2238    repdef_starts: Vec<usize>,
2239    data_starts: Vec<usize>,
2240    offset_starts: Vec<usize>,
2241    visible_item_counts: Vec<u64>,
2242    bits_per_offset: u8,
2243    current_idx: usize,
2244    num_rows: u64,
2245}
2246
2247impl VariableFullZipDecoder {
2248    fn new(
2249        details: Arc<FullZipDecodeDetails>,
2250        data: VecDeque<LanceBuffer>,
2251        num_rows: u64,
2252        in_bits_per_length: u8,
2253        out_bits_per_offset: u8,
2254    ) -> Self {
2255        let decompressor = match details.value_decompressor {
2256            PerValueDecompressor::Variable(ref d) => d.clone(),
2257            _ => unreachable!(),
2258        };
2259
2260        assert_eq!(in_bits_per_length % 8, 0);
2261        assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2262
2263        let mut decoder = Self {
2264            details,
2265            decompressor,
2266            data: LanceBuffer::empty(),
2267            offsets: LanceBuffer::empty(),
2268            rep: LanceBuffer::empty().borrow_to_typed_slice(),
2269            def: LanceBuffer::empty().borrow_to_typed_slice(),
2270            bits_per_offset: out_bits_per_offset,
2271            repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2272            data_starts: Vec::with_capacity(num_rows as usize + 1),
2273            offset_starts: Vec::with_capacity(num_rows as usize + 1),
2274            visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2275            current_idx: 0,
2276            num_rows,
2277        };
2278
2279        // There's no great time to do this and this is the least worst time.  If we don't unzip then
2280        // we can't slice the data during the decode phase.  This is because we need the offsets to be
2281        // unpacked to know where the values start and end.
2282        //
2283        // We don't want to unzip on the decode thread because that is a single-threaded path
2284        // We don't want to unzip on the scheduling thread because that is a single-threaded path
2285        //
2286        // Fortunately, we know variable length data will always be read indirectly and so we can do it
2287        // here, which should be on the indirect thread.  The primary disadvantage to doing it here is that
2288        // we load all the data into memory and then throw it away only to load it all into memory again during
2289        // the decode.
2290        //
2291        // There are some alternatives to investigate:
2292        //   - Instead of just reading the beginning and end of the rep index we could read the entire
2293        //     range in between.  This will give us the break points that we need for slicing and won't increase
2294        //     the number of IOPs but it will mean we are doing more total I/O and we need to load the rep index
2295        //     even when doing a full scan.
2296        //   - We could force each decode task to do a full unzip of all the data.  Each decode task now
2297        //     has to do more work but the work is all fused.
2298        //   - We could just try doing this work on the decode thread and see if it is a problem.
2299        decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2300
2301        decoder
2302    }
2303
2304    unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2305        match bits_per_offset {
2306            8 => *data.get_unchecked(0) as u64,
2307            16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2308            32 => u32::from_le_bytes([
2309                *data.get_unchecked(0),
2310                *data.get_unchecked(1),
2311                *data.get_unchecked(2),
2312                *data.get_unchecked(3),
2313            ]) as u64,
2314            64 => u64::from_le_bytes([
2315                *data.get_unchecked(0),
2316                *data.get_unchecked(1),
2317                *data.get_unchecked(2),
2318                *data.get_unchecked(3),
2319                *data.get_unchecked(4),
2320                *data.get_unchecked(5),
2321                *data.get_unchecked(6),
2322                *data.get_unchecked(7),
2323            ]),
2324            _ => unreachable!(),
2325        }
2326    }
2327
2328    fn unzip(
2329        &mut self,
2330        data: VecDeque<LanceBuffer>,
2331        in_bits_per_length: u8,
2332        out_bits_per_offset: u8,
2333        num_rows: u64,
2334    ) {
2335        // This undercounts if there are lists but, at this point, we don't really know how many items we have
2336        let mut rep = Vec::with_capacity(num_rows as usize);
2337        let mut def = Vec::with_capacity(num_rows as usize);
2338        let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2339
2340        // This undercounts if there are lists
2341        // It can also overcount if there are invisible items
2342        let bytes_per_offset = out_bits_per_offset as usize / 8;
2343        let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2344        let mut offsets_data = Vec::with_capacity(bytes_offsets);
2345
2346        let bytes_per_length = in_bits_per_length as usize / 8;
2347        let bytes_lengths = bytes_per_length * num_rows as usize;
2348
2349        let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2350        // This overcounts since bytes_lengths and bytes_cw are undercounts
2351        // It can also undercount if there are invisible items (hence the saturating_sub)
2352        let mut unzipped_data =
2353            Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2354
2355        let mut current_offset = 0_u64;
2356        let mut visible_item_count = 0_u64;
2357        for databuf in data.into_iter() {
2358            let mut databuf = databuf.as_ref();
2359            while !databuf.is_empty() {
2360                let data_start = unzipped_data.len();
2361                let offset_start = offsets_data.len();
2362                // We might have only-rep or only-def, neither, or both.  They move at the same
2363                // speed though so we only need one index into it
2364                let repdef_start = rep.len().max(def.len());
2365                // TODO: Kind of inefficient we parse the control word twice here
2366                let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2367                    databuf,
2368                    self.details.max_rep,
2369                    self.details.max_visible_def,
2370                );
2371                self.details
2372                    .ctrl_word_parser
2373                    .parse(databuf, &mut rep, &mut def);
2374                databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2375
2376                if ctrl_desc.is_new_row {
2377                    self.repdef_starts.push(repdef_start);
2378                    self.data_starts.push(data_start);
2379                    self.offset_starts.push(offset_start);
2380                    self.visible_item_counts.push(visible_item_count);
2381                }
2382                if ctrl_desc.is_visible {
2383                    visible_item_count += 1;
2384                    if ctrl_desc.is_valid_item {
2385                        // Safety: Data should have at least bytes_per_length bytes remaining
2386                        debug_assert!(databuf.len() >= bytes_per_length);
2387                        let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2388                        match out_bits_per_offset {
2389                            32 => offsets_data
2390                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2391                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2392                            _ => unreachable!(),
2393                        };
2394                        databuf = &databuf[bytes_per_offset..];
2395                        unzipped_data.extend_from_slice(&databuf[..length as usize]);
2396                        databuf = &databuf[length as usize..];
2397                        current_offset += length;
2398                    } else {
2399                        // Null items still get an offset
2400                        match out_bits_per_offset {
2401                            32 => offsets_data
2402                                .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2403                            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2404                            _ => unreachable!(),
2405                        }
2406                    }
2407                }
2408            }
2409        }
2410        self.repdef_starts.push(rep.len().max(def.len()));
2411        self.data_starts.push(unzipped_data.len());
2412        self.offset_starts.push(offsets_data.len());
2413        self.visible_item_counts.push(visible_item_count);
2414        match out_bits_per_offset {
2415            32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2416            64 => offsets_data.extend_from_slice(&current_offset.to_le_bytes()),
2417            _ => unreachable!(),
2418        };
2419        self.rep = ScalarBuffer::from(rep);
2420        self.def = ScalarBuffer::from(def);
2421        self.data = LanceBuffer::Owned(unzipped_data);
2422        self.offsets = LanceBuffer::Owned(offsets_data);
2423    }
2424}
2425
2426impl StructuralPageDecoder for VariableFullZipDecoder {
2427    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2428        let start = self.current_idx;
2429        let end = start + num_rows as usize;
2430
2431        // This might seem a little peculiar.  We are returning the entire data for every single
2432        // batch.  This is because the offsets are relative to the start of the data.  In other words
2433        // imagine we have a data buffer that is 100 bytes long and the offsets are [0, 10, 20, 30, 40]
2434        // and we return in batches of two.  The second set of offsets will be [20, 30, 40].
2435        //
2436        // So either we pay for a copy to normalize the offsets or we just return the entire data buffer
2437        // which is slightly cheaper.
2438        let data = self.data.borrow_and_clone();
2439
2440        let offset_start = self.offset_starts[start];
2441        let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2442        let offsets = self
2443            .offsets
2444            .slice_with_length(offset_start, offset_end - offset_start);
2445
2446        let repdef_start = self.repdef_starts[start];
2447        let repdef_end = self.repdef_starts[end];
2448        let rep = if self.rep.is_empty() {
2449            self.rep.clone()
2450        } else {
2451            self.rep.slice(repdef_start, repdef_end - repdef_start)
2452        };
2453        let def = if self.def.is_empty() {
2454            self.def.clone()
2455        } else {
2456            self.def.slice(repdef_start, repdef_end - repdef_start)
2457        };
2458
2459        let visible_item_counts_start = self.visible_item_counts[start];
2460        let visible_item_counts_end = self.visible_item_counts[end];
2461        let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2462
2463        self.current_idx += num_rows as usize;
2464
2465        Ok(Box::new(VariableFullZipDecodeTask {
2466            details: self.details.clone(),
2467            decompressor: self.decompressor.clone(),
2468            data,
2469            offsets,
2470            bits_per_offset: self.bits_per_offset,
2471            num_visible_items,
2472            rep,
2473            def,
2474        }))
2475    }
2476
2477    fn num_rows(&self) -> u64 {
2478        self.num_rows
2479    }
2480}
2481
2482#[derive(Debug)]
2483struct VariableFullZipDecodeTask {
2484    details: Arc<FullZipDecodeDetails>,
2485    decompressor: Arc<dyn VariablePerValueDecompressor>,
2486    data: LanceBuffer,
2487    offsets: LanceBuffer,
2488    bits_per_offset: u8,
2489    num_visible_items: u64,
2490    rep: ScalarBuffer<u16>,
2491    def: ScalarBuffer<u16>,
2492}
2493
2494impl DecodePageTask for VariableFullZipDecodeTask {
2495    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2496        let block = VariableWidthBlock {
2497            data: self.data,
2498            offsets: self.offsets,
2499            bits_per_offset: self.bits_per_offset,
2500            num_values: self.num_visible_items,
2501            block_info: BlockInfo::new(),
2502        };
2503        let decomopressed = self.decompressor.decompress(block)?;
2504        let rep = self.rep.to_vec();
2505        let def = self.def.to_vec();
2506        let unraveler =
2507            RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2508        Ok(DecodedPage {
2509            data: decomopressed,
2510            repdef: unraveler,
2511        })
2512    }
2513}
2514
2515#[derive(Debug)]
2516struct FullZipDecodeTaskItem {
2517    data: PerValueDataBlock,
2518    rows_in_buf: u64,
2519}
2520
2521/// A task to unzip and decompress full-zip encoded data when that data
2522/// has a fixed-width.
2523#[derive(Debug)]
2524struct FixedFullZipDecodeTask {
2525    details: Arc<FullZipDecodeDetails>,
2526    data: Vec<FullZipDecodeTaskItem>,
2527    num_rows: usize,
2528    bytes_per_value: usize,
2529}
2530
2531impl DecodePageTask for FixedFullZipDecodeTask {
2532    fn decode(self: Box<Self>) -> Result<DecodedPage> {
2533        // Multiply by 2 to make a stab at the size of the output buffer (which will be decompressed and thus bigger)
2534        let estimated_size_bytes = self
2535            .data
2536            .iter()
2537            .map(|task_item| task_item.data.data_size() as usize)
2538            .sum::<usize>()
2539            * 2;
2540        let mut data_builder =
2541            DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2542
2543        if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2544            // Fast path, no need to unzip because there is no rep/def
2545            //
2546            // We decompress each buffer and add it to our output buffer
2547            for task_item in self.data.into_iter() {
2548                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2549                    unreachable!()
2550                };
2551                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2552                else {
2553                    unreachable!()
2554                };
2555                debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2556                let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2557                data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2558            }
2559
2560            let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2561
2562            Ok(DecodedPage {
2563                data: data_builder.finish(),
2564                repdef: unraveler,
2565            })
2566        } else {
2567            // Slow path, unzipping needed
2568            let mut rep = Vec::with_capacity(self.num_rows);
2569            let mut def = Vec::with_capacity(self.num_rows);
2570
2571            for task_item in self.data.into_iter() {
2572                let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2573                    unreachable!()
2574                };
2575                let mut buf_slice = fixed_data.data.as_ref();
2576                let num_values = fixed_data.num_values as usize;
2577                // We will be unzipping repdef in to `rep` and `def` and the
2578                // values into `values` (which contains the compressed values)
2579                let mut values = Vec::with_capacity(
2580                    fixed_data.data.len()
2581                        - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2582                );
2583                let mut visible_items = 0;
2584                for _ in 0..num_values {
2585                    // Extract rep/def
2586                    self.details
2587                        .ctrl_word_parser
2588                        .parse(buf_slice, &mut rep, &mut def);
2589                    buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2590
2591                    let is_visible = def
2592                        .last()
2593                        .map(|d| *d <= self.details.max_visible_def)
2594                        .unwrap_or(true);
2595                    if is_visible {
2596                        // Extract value
2597                        values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2598                        buf_slice = &buf_slice[self.bytes_per_value..];
2599                        visible_items += 1;
2600                    }
2601                }
2602
2603                // Finally, we decompress the values and add them to our output buffer
2604                let values_buf = LanceBuffer::Owned(values);
2605                let fixed_data = FixedWidthDataBlock {
2606                    bits_per_value: self.bytes_per_value as u64 * 8,
2607                    block_info: BlockInfo::new(),
2608                    data: values_buf,
2609                    num_values: visible_items,
2610                };
2611                let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2612                else {
2613                    unreachable!()
2614                };
2615                let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2616                data_builder.append(&decompressed, 0..visible_items);
2617            }
2618
2619            let repetition = if rep.is_empty() { None } else { Some(rep) };
2620            let definition = if def.is_empty() { None } else { Some(def) };
2621
2622            let unraveler =
2623                RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2624            let data = data_builder.finish();
2625
2626            Ok(DecodedPage {
2627                data,
2628                repdef: unraveler,
2629            })
2630        }
2631    }
2632}
2633
2634#[derive(Debug)]
2635struct StructuralPrimitiveFieldSchedulingJob<'a> {
2636    scheduler: &'a StructuralPrimitiveFieldScheduler,
2637    ranges: Vec<Range<u64>>,
2638    page_idx: usize,
2639    range_idx: usize,
2640    global_row_offset: u64,
2641}
2642
2643impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2644    pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2645        Self {
2646            scheduler,
2647            ranges,
2648            page_idx: 0,
2649            range_idx: 0,
2650            global_row_offset: 0,
2651        }
2652    }
2653}
2654
2655impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2656    fn schedule_next(
2657        &mut self,
2658        context: &mut SchedulerContext,
2659    ) -> Result<Option<ScheduledScanLine>> {
2660        if self.range_idx >= self.ranges.len() {
2661            return Ok(None);
2662        }
2663        // Get our current range
2664        let mut range = self.ranges[self.range_idx].clone();
2665        let priority = range.start;
2666
2667        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2668        trace!(
2669            "Current range is {:?} and current page has {} rows",
2670            range,
2671            cur_page.num_rows
2672        );
2673        // Skip entire pages until we have some overlap with our next range
2674        while cur_page.num_rows + self.global_row_offset <= range.start {
2675            self.global_row_offset += cur_page.num_rows;
2676            self.page_idx += 1;
2677            trace!("Skipping entire page of {} rows", cur_page.num_rows);
2678            cur_page = &self.scheduler.page_schedulers[self.page_idx];
2679        }
2680
2681        // Now the cur_page has overlap with range.  Continue looping through ranges
2682        // until we find a range that exceeds the current page
2683
2684        let mut ranges_in_page = Vec::new();
2685        while cur_page.num_rows + self.global_row_offset > range.start {
2686            range.start = range.start.max(self.global_row_offset);
2687            let start_in_page = range.start - self.global_row_offset;
2688            let end_in_page = start_in_page + (range.end - range.start);
2689            let end_in_page = end_in_page.min(cur_page.num_rows);
2690            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2691
2692            ranges_in_page.push(start_in_page..end_in_page);
2693            if last_in_range {
2694                self.range_idx += 1;
2695                if self.range_idx == self.ranges.len() {
2696                    break;
2697                }
2698                range = self.ranges[self.range_idx].clone();
2699            } else {
2700                break;
2701            }
2702        }
2703
2704        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2705        trace!(
2706            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2707            num_rows_in_next,
2708            ranges_in_page.len(),
2709            cur_page.num_rows,
2710            priority,
2711            self.scheduler.column_index,
2712            cur_page.page_index,
2713        );
2714
2715        self.global_row_offset += cur_page.num_rows;
2716        self.page_idx += 1;
2717
2718        let page_decoder = cur_page
2719            .scheduler
2720            .schedule_ranges(&ranges_in_page, context.io())?;
2721
2722        let cur_path = context.current_path();
2723        let page_index = cur_page.page_index;
2724        let unloaded_page = async move {
2725            let page_decoder = page_decoder.await?;
2726            Ok(LoadedPage {
2727                decoder: page_decoder,
2728                path: cur_path,
2729                page_index,
2730            })
2731        }
2732        .boxed();
2733
2734        Ok(Some(ScheduledScanLine {
2735            decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2736            rows_scheduled: num_rows_in_next,
2737        }))
2738    }
2739}
2740
2741#[derive(Debug)]
2742struct PageInfoAndScheduler {
2743    page_index: usize,
2744    num_rows: u64,
2745    scheduler: Box<dyn StructuralPageScheduler>,
2746}
2747
2748/// A scheduler for a leaf node
2749///
2750/// Here we look at the layout of the various pages and delegate scheduling to a scheduler
2751/// appropriate for the layout of the page.
2752#[derive(Debug)]
2753pub struct StructuralPrimitiveFieldScheduler {
2754    page_schedulers: Vec<PageInfoAndScheduler>,
2755    column_index: u32,
2756}
2757
2758impl StructuralPrimitiveFieldScheduler {
2759    pub fn try_new(
2760        column_info: &ColumnInfo,
2761        decompressors: &dyn DecompressionStrategy,
2762        cache_repetition_index: bool,
2763    ) -> Result<Self> {
2764        let page_schedulers = column_info
2765            .page_infos
2766            .iter()
2767            .enumerate()
2768            .map(|(page_index, page_info)| {
2769                Self::page_info_to_scheduler(
2770                    page_info,
2771                    page_index,
2772                    column_info.index as usize,
2773                    decompressors,
2774                    cache_repetition_index,
2775                )
2776            })
2777            .collect::<Result<Vec<_>>>()?;
2778        Ok(Self {
2779            page_schedulers,
2780            column_index: column_info.index,
2781        })
2782    }
2783
2784    fn page_info_to_scheduler(
2785        page_info: &PageInfo,
2786        page_index: usize,
2787        _column_index: usize,
2788        decompressors: &dyn DecompressionStrategy,
2789        cache_repetition_index: bool,
2790    ) -> Result<PageInfoAndScheduler> {
2791        let scheduler: Box<dyn StructuralPageScheduler> =
2792            match page_info.encoding.as_structural().layout.as_ref() {
2793                Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2794                    Box::new(MiniBlockScheduler::try_new(
2795                        &page_info.buffer_offsets_and_sizes,
2796                        page_info.priority,
2797                        mini_block.num_items,
2798                        mini_block,
2799                        decompressors,
2800                    )?)
2801                }
2802                Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2803                    let mut scheduler = FullZipScheduler::try_new(
2804                        &page_info.buffer_offsets_and_sizes,
2805                        page_info.priority,
2806                        page_info.num_rows,
2807                        full_zip,
2808                        decompressors,
2809                    )?;
2810                    scheduler.enable_cache = cache_repetition_index;
2811                    Box::new(scheduler)
2812                }
2813                Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2814                    let def_meaning = all_null
2815                        .layers
2816                        .iter()
2817                        .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2818                        .collect::<Vec<_>>();
2819                    if def_meaning.len() == 1
2820                        && def_meaning[0] == DefinitionInterpretation::NullableItem
2821                    {
2822                        Box::new(SimpleAllNullScheduler::default())
2823                            as Box<dyn StructuralPageScheduler>
2824                    } else {
2825                        Box::new(ComplexAllNullScheduler::new(
2826                            page_info.buffer_offsets_and_sizes.clone(),
2827                            def_meaning.into(),
2828                        )) as Box<dyn StructuralPageScheduler>
2829                    }
2830                }
2831                _ => todo!(),
2832            };
2833        Ok(PageInfoAndScheduler {
2834            page_index,
2835            num_rows: page_info.num_rows,
2836            scheduler,
2837        })
2838    }
2839}
2840
2841pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2842    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2843}
2844
2845pub struct NoCachedPageData;
2846
2847impl DeepSizeOf for NoCachedPageData {
2848    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2849        0
2850    }
2851}
2852impl CachedPageData for NoCachedPageData {
2853    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2854        self
2855    }
2856}
2857
2858pub struct CachedFieldData {
2859    pages: Vec<Arc<dyn CachedPageData>>,
2860}
2861
2862impl DeepSizeOf for CachedFieldData {
2863    fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2864        self.pages.deep_size_of_children(ctx)
2865    }
2866}
2867
2868// Cache key for field data
2869#[derive(Debug, Clone)]
2870pub struct FieldDataCacheKey {
2871    pub column_index: u32,
2872}
2873
2874impl CacheKey for FieldDataCacheKey {
2875    type ValueType = CachedFieldData;
2876
2877    fn key(&self) -> std::borrow::Cow<'_, str> {
2878        self.column_index.to_string().into()
2879    }
2880}
2881
2882impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2883    fn initialize<'a>(
2884        &'a mut self,
2885        _filter: &'a FilterExpression,
2886        context: &'a SchedulerContext,
2887    ) -> BoxFuture<'a, Result<()>> {
2888        let cache_key = FieldDataCacheKey {
2889            column_index: self.column_index,
2890        };
2891        let cache = context.cache().clone();
2892
2893        async move {
2894            if let Some(cached_data) = cache.get_with_key(&cache_key).await {
2895                self.page_schedulers
2896                    .iter_mut()
2897                    .zip(cached_data.pages.iter())
2898                    .for_each(|(page_scheduler, cached_data)| {
2899                        page_scheduler.scheduler.load(cached_data);
2900                    });
2901                return Ok(());
2902            }
2903
2904            let page_data = self
2905                .page_schedulers
2906                .iter_mut()
2907                .map(|s| s.scheduler.initialize(context.io()))
2908                .collect::<FuturesOrdered<_>>();
2909
2910            let page_data = page_data.try_collect::<Vec<_>>().await?;
2911            let cached_data = Arc::new(CachedFieldData { pages: page_data });
2912            cache.insert_with_key(&cache_key, cached_data).await;
2913            Ok(())
2914        }
2915        .boxed()
2916    }
2917
2918    fn schedule_ranges<'a>(
2919        &'a self,
2920        ranges: &[Range<u64>],
2921        _filter: &FilterExpression,
2922    ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2923        let ranges = ranges.to_vec();
2924        Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2925            self, ranges,
2926        )))
2927    }
2928}
2929
2930/// Takes the output from several pages decoders and
2931/// concatenates them.
2932#[derive(Debug)]
2933pub struct StructuralCompositeDecodeArrayTask {
2934    tasks: Vec<Box<dyn DecodePageTask>>,
2935    should_validate: bool,
2936    data_type: DataType,
2937}
2938
2939impl StructuralCompositeDecodeArrayTask {
2940    fn restore_validity(
2941        array: Arc<dyn Array>,
2942        unraveler: &mut CompositeRepDefUnraveler,
2943    ) -> Arc<dyn Array> {
2944        let validity = unraveler.unravel_validity(array.len());
2945        let Some(validity) = validity else {
2946            return array;
2947        };
2948        if array.data_type() == &DataType::Null {
2949            // We unravel from a null array but we don't add the null buffer because arrow-rs doesn't like it
2950            return array;
2951        }
2952        assert_eq!(validity.len(), array.len());
2953        // SAFETY: We've should have already asserted the buffers are all valid, we are just
2954        // adding null buffers to the array here
2955        make_array(unsafe {
2956            array
2957                .to_data()
2958                .into_builder()
2959                .nulls(Some(validity))
2960                .build_unchecked()
2961        })
2962    }
2963}
2964
2965impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
2966    fn decode(self: Box<Self>) -> Result<DecodedArray> {
2967        let mut arrays = Vec::with_capacity(self.tasks.len());
2968        let mut unravelers = Vec::with_capacity(self.tasks.len());
2969        for task in self.tasks {
2970            let decoded = task.decode()?;
2971            unravelers.push(decoded.repdef);
2972
2973            let array = make_array(
2974                decoded
2975                    .data
2976                    .into_arrow(self.data_type.clone(), self.should_validate)?,
2977            );
2978
2979            arrays.push(array);
2980        }
2981        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
2982        let array = arrow_select::concat::concat(&array_refs)?;
2983        let mut repdef = CompositeRepDefUnraveler::new(unravelers);
2984
2985        let array = Self::restore_validity(array, &mut repdef);
2986
2987        Ok(DecodedArray { array, repdef })
2988    }
2989}
2990
2991#[derive(Debug)]
2992pub struct StructuralPrimitiveFieldDecoder {
2993    field: Arc<ArrowField>,
2994    page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
2995    should_validate: bool,
2996    rows_drained_in_current: u64,
2997}
2998
2999impl StructuralPrimitiveFieldDecoder {
3000    pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3001        Self {
3002            field: field.clone(),
3003            page_decoders: VecDeque::new(),
3004            should_validate,
3005            rows_drained_in_current: 0,
3006        }
3007    }
3008}
3009
3010impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3011    fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
3012        assert!(child.path.is_empty());
3013        self.page_decoders.push_back(child.decoder);
3014        Ok(())
3015    }
3016
3017    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3018        let mut remaining = num_rows;
3019        let mut tasks = Vec::new();
3020        while remaining > 0 {
3021            let cur_page = self.page_decoders.front_mut().unwrap();
3022            let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3023            let to_take = num_in_page.min(remaining);
3024
3025            let task = cur_page.drain(to_take)?;
3026            tasks.push(task);
3027
3028            if to_take == num_in_page {
3029                self.page_decoders.pop_front();
3030                self.rows_drained_in_current = 0;
3031            } else {
3032                self.rows_drained_in_current += to_take;
3033            }
3034
3035            remaining -= to_take;
3036        }
3037        Ok(Box::new(StructuralCompositeDecodeArrayTask {
3038            tasks,
3039            should_validate: self.should_validate,
3040            data_type: self.field.data_type().clone(),
3041        }))
3042    }
3043
3044    fn data_type(&self) -> &DataType {
3045        self.field.data_type()
3046    }
3047}
3048
3049/// The serialized representation of full-zip data
3050struct SerializedFullZip {
3051    /// The zipped values buffer
3052    values: LanceBuffer,
3053    /// The repetition index (only present if there is repetition)
3054    repetition_index: Option<LanceBuffer>,
3055}
3056
3057// We align and pad mini-blocks to 8 byte boundaries for two reasons.  First,
3058// to allow us to store a chunk size in 12 bits.
3059//
3060// If we directly record the size in bytes with 12 bits we would be limited to
3061// 4KiB which is too small.  Since we know each mini-block consists of 8 byte
3062// words we can store the # of words instead which gives us 32KiB.  We want
3063// at least 24KiB so we can handle even the worst case of
3064// - 4Ki values compressed into an 8186 byte buffer
3065// - 4 bytes to describe rep & def lengths
3066// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
3067//   plan for it)
3068//
3069// Second, each chunk in a mini-block is aligned to 8 bytes.  This allows multi-byte
3070// values like offsets to be stored in a mini-block and safely read back out.  It also
3071// helps ensure zero-copy reads in cases where zero-copy is possible (e.g. no decoding
3072// needed).
3073//
3074// Note: by "aligned to 8 bytes" we mean BOTH "aligned to 8 bytes from the start of
3075// the page" and "aligned to 8 bytes from the start of the file."
3076const MINIBLOCK_ALIGNMENT: usize = 8;
3077
3078/// An encoder for primitive (leaf) arrays
3079///
3080/// This encoder is fairly complicated and follows a number of paths depending
3081/// on the data.
3082///
3083/// First, we convert the validity & offsets information into repetition and
3084/// definition levels.  Then we compress the data itself into a single buffer.
3085///
3086/// If the data is narrow then we encode the data in small chunks (each chunk
3087/// should be a few disk sectors and contains a buffer of repetition, a buffer
3088/// of definition, and a buffer of value data).  This approach is called
3089/// "mini-block".  These mini-blocks are stored into a single data buffer.
3090///
3091/// If the data is wide then we zip together the repetition and definition value
3092/// with the value data into a single buffer.  This approach is called "zipped".
3093///
3094/// If there is any repetition information then we create a repetition index (TODO)
3095///
3096/// In addition, the compression process may create zero or more metadata buffers.
3097/// For example, a dictionary compression will create dictionary metadata.  Any
3098/// mini-block approach has a metadata buffer of block sizes.  This metadata is
3099/// stored in a separate buffer on disk and read at initialization time.
3100///
3101/// TODO: We should concatenate metadata buffers from all pages into a single buffer
3102/// at (roughly) the end of the file so there is, at most, one read per column of
3103/// metadata per file.
3104pub struct PrimitiveStructuralEncoder {
3105    // Accumulates arrays until we have enough data to justify a disk page
3106    accumulation_queue: AccumulationQueue,
3107
3108    keep_original_array: bool,
3109    accumulated_repdefs: Vec<RepDefBuilder>,
3110    // The compression strategy we will use to compress the data
3111    compression_strategy: Arc<dyn CompressionStrategy>,
3112    column_index: u32,
3113    field: Field,
3114    encoding_metadata: Arc<HashMap<String, String>>,
3115}
3116
3117struct CompressedLevelsChunk {
3118    data: LanceBuffer,
3119    num_levels: u16,
3120}
3121
3122struct CompressedLevels {
3123    data: Vec<CompressedLevelsChunk>,
3124    compression: pb::ArrayEncoding,
3125    rep_index: Option<LanceBuffer>,
3126}
3127
3128struct SerializedMiniBlockPage {
3129    num_buffers: u64,
3130    data: LanceBuffer,
3131    metadata: LanceBuffer,
3132}
3133
3134impl PrimitiveStructuralEncoder {
3135    pub fn try_new(
3136        options: &EncodingOptions,
3137        compression_strategy: Arc<dyn CompressionStrategy>,
3138        column_index: u32,
3139        field: Field,
3140        encoding_metadata: Arc<HashMap<String, String>>,
3141    ) -> Result<Self> {
3142        Ok(Self {
3143            accumulation_queue: AccumulationQueue::new(
3144                options.cache_bytes_per_column,
3145                column_index,
3146                options.keep_original_array,
3147            ),
3148            keep_original_array: options.keep_original_array,
3149            accumulated_repdefs: Vec::new(),
3150            column_index,
3151            compression_strategy,
3152            field,
3153            encoding_metadata,
3154        })
3155    }
3156
3157    // TODO: This is a heuristic we may need to tune at some point
3158    //
3159    // As data gets narrow then the "zipping" process gets too expensive
3160    //   and we prefer mini-block
3161    // As data gets wide then the # of values per block shrinks (very wide)
3162    //   data doesn't even fit in a mini-block and the block overhead gets
3163    //   too large and we prefer zipped.
3164    fn is_narrow(data_block: &DataBlock) -> bool {
3165        const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3166
3167        if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3168            let max_len_array = max_len_array
3169                .as_any()
3170                .downcast_ref::<PrimitiveArray<UInt64Type>>()
3171                .unwrap();
3172            if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3173                return true;
3174            }
3175        }
3176        false
3177    }
3178
3179    fn prefers_miniblock(
3180        data_block: &DataBlock,
3181        encoding_metadata: &HashMap<String, String>,
3182    ) -> bool {
3183        // If the user specifically requested miniblock then use it
3184        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3185            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3186        }
3187        // Otherwise only use miniblock if it is narrow
3188        Self::is_narrow(data_block)
3189    }
3190
3191    fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3192        // Fullzip is the backup option so the only reason we wouldn't use it is if the
3193        // user specifically requested not to use it (in which case we're probably going
3194        // to emit an error)
3195        if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3196            return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3197        }
3198        true
3199    }
3200
3201    // Converts value data, repetition levels, and definition levels into a single
3202    // buffer of mini-blocks.  In addition, creates a buffer of mini-block metadata
3203    // which tells us the size of each block.  Finally, if repetition is present then
3204    // we also create a buffer for the repetition index.
3205    //
3206    // Each chunk is serialized as:
3207    // | num_bufs (1 byte) | buf_lens (2 bytes per buffer) | P | buf0 | P | buf1 | ... | bufN | P |
3208    //
3209    // P - Padding inserted to ensure each buffer is 8-byte aligned and the buffer size is a multiple
3210    //     of 8 bytes (so that the next chunk is 8-byte aligned).
3211    //
3212    // Each block has a u16 word of metadata.  The upper 12 bits contain 1/6 the
3213    // # of bytes in the block (if the block does not have an even number of bytes
3214    // then up to 7 bytes of padding are added).  The lower 4 bits describe the log_2
3215    // number of values (e.g. if there are 1024 then the lower 4 bits will be
3216    // 0xA)  All blocks except the last must have power-of-two number of values.
3217    // This not only makes metadata smaller but it makes decoding easier since
3218    // batch sizes are typically a power of 2.  4 bits would allow us to express
3219    // up to 16Ki values but we restrict this further to 4Ki values.
3220    //
3221    // This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
3222    //
3223    // All metadata words are serialized (as little endian) into a single buffer
3224    // of metadata values.
3225    //
3226    // If there is repetition then we also create a repetition index.  This is a
3227    // single buffer of integer vectors (stored in row major order).  There is one
3228    // entry for each chunk.  The size of the vector is based on the depth of random
3229    // access we want to support.
3230    //
3231    // A vector of size 2 is the minimum and will support row-based random access (e.g.
3232    // "take the 57th row").  A vector of size 3 will support 1 level of nested access
3233    // (e.g. "take the 3rd item in the 57th row").  A vector of size 4 will support 2
3234    // levels of nested access and so on.
3235    //
3236    // The first number in the vector is the number of top-level rows that complete in
3237    // the chunk.  The second number is the number of second-level rows that complete
3238    // after the final top-level row completed (or beginning of the chunk if no top-level
3239    // row completes in the chunk).  And so on.  The final number in the vector is always
3240    // the number of leftover items not covered by earlier entries in the vector.
3241    //
3242    // Currently we are limited to 0 levels of nested access but that will change in the
3243    // future.
3244    //
3245    // The repetition index and the chunk metadata are read at initialization time and
3246    // cached in memory.
3247    fn serialize_miniblocks(
3248        miniblocks: MiniBlockCompressed,
3249        rep: Option<Vec<CompressedLevelsChunk>>,
3250        def: Option<Vec<CompressedLevelsChunk>>,
3251    ) -> SerializedMiniBlockPage {
3252        let bytes_rep = rep
3253            .as_ref()
3254            .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3255            .unwrap_or(0);
3256        let bytes_def = def
3257            .as_ref()
3258            .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3259            .unwrap_or(0);
3260        let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3261        let mut num_buffers = miniblocks.data.len();
3262        if rep.is_some() {
3263            num_buffers += 1;
3264        }
3265        if def.is_some() {
3266            num_buffers += 1;
3267        }
3268        // 2 bytes for the length of each buffer and up to 7 bytes of padding per buffer
3269        let max_extra = 9 * num_buffers;
3270        let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3271        let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3272
3273        let mut rep_iter = rep.map(|r| r.into_iter());
3274        let mut def_iter = def.map(|d| d.into_iter());
3275
3276        let mut buffer_offsets = vec![0; miniblocks.data.len()];
3277        for chunk in miniblocks.chunks {
3278            let start_pos = data_buffer.len();
3279            // Start of chunk should be aligned
3280            debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3281
3282            let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3283            let def = def_iter.as_mut().map(|d| d.next().unwrap());
3284
3285            // Write the number of levels, or 0 if there is no rep/def
3286            let num_levels = rep
3287                .as_ref()
3288                .map(|r| r.num_levels)
3289                .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3290            data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3291
3292            // Write the buffer lengths
3293            if let Some(rep) = rep.as_ref() {
3294                let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3295                data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3296            }
3297            if let Some(def) = def.as_ref() {
3298                let bytes_def = u16::try_from(def.data.len()).unwrap();
3299                data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3300            }
3301
3302            for buffer_size in &chunk.buffer_sizes {
3303                let bytes = *buffer_size;
3304                data_buffer.extend_from_slice(&bytes.to_le_bytes());
3305            }
3306
3307            // Pad
3308            let add_padding = |data_buffer: &mut Vec<u8>| {
3309                let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3310                data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3311            };
3312            add_padding(&mut data_buffer);
3313
3314            // Write the buffers themselves
3315            if let Some(rep) = rep.as_ref() {
3316                data_buffer.extend_from_slice(&rep.data);
3317                add_padding(&mut data_buffer);
3318            }
3319            if let Some(def) = def.as_ref() {
3320                data_buffer.extend_from_slice(&def.data);
3321                add_padding(&mut data_buffer);
3322            }
3323            for (buffer_size, (buffer, buffer_offset)) in chunk
3324                .buffer_sizes
3325                .iter()
3326                .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3327            {
3328                let start = *buffer_offset;
3329                let end = start + *buffer_size as usize;
3330                *buffer_offset += *buffer_size as usize;
3331                data_buffer.extend_from_slice(&buffer[start..end]);
3332                add_padding(&mut data_buffer);
3333            }
3334
3335            let chunk_bytes = data_buffer.len() - start_pos;
3336            assert!(chunk_bytes <= 16 * 1024);
3337            assert!(chunk_bytes > 0);
3338            assert_eq!(chunk_bytes % 8, 0);
3339            // We subtract 1 here from chunk_bytes because we want to be able to express
3340            // a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
3341            // 0xFFF
3342            let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3343            let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3344
3345            let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3346            meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3347        }
3348
3349        let data_buffer = LanceBuffer::Owned(data_buffer);
3350        let metadata_buffer = LanceBuffer::Owned(meta_buffer);
3351
3352        SerializedMiniBlockPage {
3353            num_buffers: miniblocks.data.len() as u64,
3354            data: data_buffer,
3355            metadata: metadata_buffer,
3356        }
3357    }
3358
3359    /// Compresses a buffer of levels into chunks
3360    ///
3361    /// If these are repetition levels then we also calculate the repetition index here (that
3362    /// is the third return value)
3363    fn compress_levels(
3364        mut levels: RepDefSlicer<'_>,
3365        num_elements: u64,
3366        compression_strategy: &dyn CompressionStrategy,
3367        chunks: &[MiniBlockChunk],
3368        // This will be 0 if we are compressing def levels
3369        max_rep: u16,
3370    ) -> Result<CompressedLevels> {
3371        let mut rep_index = if max_rep > 0 {
3372            Vec::with_capacity(chunks.len())
3373        } else {
3374            vec![]
3375        };
3376        // Make the levels into a FixedWidth data block
3377        let num_levels = levels.num_levels() as u64;
3378        let mut levels_buf = levels.all_levels().try_clone().unwrap();
3379        let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3380            data: levels_buf.borrow_and_clone(),
3381            bits_per_value: 16,
3382            num_values: num_levels,
3383            block_info: BlockInfo::new(),
3384        });
3385        let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3386        // Pick a block compressor
3387        let (compressor, compressor_desc) =
3388            compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3389        // Compress blocks of levels (sized according to the chunks)
3390        let mut level_chunks = Vec::with_capacity(chunks.len());
3391        let mut values_counter = 0;
3392        for (chunk_idx, chunk) in chunks.iter().enumerate() {
3393            let chunk_num_values = chunk.num_values(values_counter, num_elements);
3394            values_counter += chunk_num_values;
3395            let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3396                levels.slice_next(chunk_num_values as usize)
3397            } else {
3398                levels.slice_rest()
3399            };
3400            let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3401            if max_rep > 0 {
3402                // If max_rep > 0 then we are working with rep levels and we need
3403                // to calculate the repetition index.  The repetition index for a
3404                // chunk is currently 2 values (in the future it may be more).
3405                //
3406                // The first value is the number of rows that _finish_ in the
3407                // chunk.
3408                //
3409                // The second value is the number of "leftovers" after the last
3410                // finished row in the chunk.
3411                let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3412                let rep_values = rep_values.as_ref();
3413
3414                // We skip 1 here because a max_rep at spot 0 doesn't count as a finished list (we
3415                // will count it in the previous chunk)
3416                let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3417                let num_leftovers = if chunk_idx < chunks.len() - 1 {
3418                    rep_values
3419                        .iter()
3420                        .rev()
3421                        .position(|v| *v == max_rep)
3422                        // # of leftovers includes the max_rep spot
3423                        .map(|pos| pos + 1)
3424                        .unwrap_or(rep_values.len())
3425                } else {
3426                    // Last chunk can't have leftovers
3427                    0
3428                };
3429
3430                if chunk_idx != 0 && rep_values[0] == max_rep {
3431                    // This chunk starts with a new row and so, if we thought we had leftovers
3432                    // in the previous chunk, we were mistaken
3433                    // TODO: Can use unchecked here
3434                    let rep_len = rep_index.len();
3435                    if rep_index[rep_len - 1] != 0 {
3436                        // We thought we had leftovers but that was actually a full row
3437                        rep_index[rep_len - 2] += 1;
3438                        rep_index[rep_len - 1] = 0;
3439                    }
3440                }
3441
3442                if chunk_idx == chunks.len() - 1 {
3443                    // The final list
3444                    num_rows += 1;
3445                }
3446                rep_index.push(num_rows as u64);
3447                rep_index.push(num_leftovers as u64);
3448            }
3449            let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3450                data: chunk_levels,
3451                bits_per_value: 16,
3452                num_values: num_chunk_levels,
3453                block_info: BlockInfo::new(),
3454            });
3455            let compressed_levels = compressor.compress(chunk_levels_block)?;
3456            level_chunks.push(CompressedLevelsChunk {
3457                data: compressed_levels,
3458                num_levels: num_chunk_levels as u16,
3459            });
3460        }
3461        debug_assert_eq!(levels.num_levels_remaining(), 0);
3462        let rep_index = if rep_index.is_empty() {
3463            None
3464        } else {
3465            Some(LanceBuffer::reinterpret_vec(rep_index))
3466        };
3467        Ok(CompressedLevels {
3468            data: level_chunks,
3469            compression: compressor_desc,
3470            rep_index,
3471        })
3472    }
3473
3474    fn encode_simple_all_null(
3475        column_idx: u32,
3476        num_rows: u64,
3477        row_number: u64,
3478    ) -> Result<EncodedPage> {
3479        let description = ProtobufUtils::simple_all_null_layout();
3480        Ok(EncodedPage {
3481            column_idx,
3482            data: vec![],
3483            description: PageEncoding::Structural(description),
3484            num_rows,
3485            row_number,
3486        })
3487    }
3488
3489    // Encodes a page where all values are null but we have rep/def
3490    // information that we need to store (e.g. to distinguish between
3491    // different kinds of null)
3492    fn encode_complex_all_null(
3493        column_idx: u32,
3494        repdefs: Vec<RepDefBuilder>,
3495        row_number: u64,
3496        num_rows: u64,
3497    ) -> Result<EncodedPage> {
3498        let repdef = RepDefBuilder::serialize(repdefs);
3499
3500        // TODO: Actually compress repdef
3501        let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3502            LanceBuffer::reinterpret_slice(rep.clone())
3503        } else {
3504            LanceBuffer::empty()
3505        };
3506
3507        let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3508            LanceBuffer::reinterpret_slice(def.clone())
3509        } else {
3510            LanceBuffer::empty()
3511        };
3512
3513        let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3514        Ok(EncodedPage {
3515            column_idx,
3516            data: vec![rep_bytes, def_bytes],
3517            description: PageEncoding::Structural(description),
3518            num_rows,
3519            row_number,
3520        })
3521    }
3522
3523    #[allow(clippy::too_many_arguments)]
3524    fn encode_miniblock(
3525        column_idx: u32,
3526        field: &Field,
3527        compression_strategy: &dyn CompressionStrategy,
3528        data: DataBlock,
3529        repdefs: Vec<RepDefBuilder>,
3530        row_number: u64,
3531        dictionary_data: Option<DataBlock>,
3532        num_rows: u64,
3533    ) -> Result<EncodedPage> {
3534        let repdef = RepDefBuilder::serialize(repdefs);
3535
3536        if let DataBlock::AllNull(_null_block) = data {
3537            // If we got here then all the data is null but we have rep/def information that
3538            // we need to store.
3539            todo!()
3540        }
3541
3542        let num_items = data.num_values();
3543
3544        let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3545        let (compressed_data, value_encoding) = compressor.compress(data)?;
3546
3547        let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3548
3549        let mut compressed_rep = repdef
3550            .rep_slicer()
3551            .map(|rep_slicer| {
3552                Self::compress_levels(
3553                    rep_slicer,
3554                    num_items,
3555                    compression_strategy,
3556                    &compressed_data.chunks,
3557                    max_rep,
3558                )
3559            })
3560            .transpose()?;
3561
3562        let (rep_index, rep_index_depth) =
3563            match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3564                Some(rep_index) => (Some(rep_index.borrow_and_clone()), 1),
3565                None => (None, 0),
3566            };
3567
3568        let mut compressed_def = repdef
3569            .def_slicer()
3570            .map(|def_slicer| {
3571                Self::compress_levels(
3572                    def_slicer,
3573                    num_items,
3574                    compression_strategy,
3575                    &compressed_data.chunks,
3576                    /*max_rep=*/ 0,
3577                )
3578            })
3579            .transpose()?;
3580
3581        // TODO: Parquet sparsely encodes values here.  We could do the same but
3582        // then we won't have log2 values per chunk.  This means more metadata
3583        // and potentially more decoder asymmetry.  However, it may be worth
3584        // investigating at some point
3585
3586        let rep_data = compressed_rep
3587            .as_mut()
3588            .map(|cr| std::mem::take(&mut cr.data));
3589        let def_data = compressed_def
3590            .as_mut()
3591            .map(|cd| std::mem::take(&mut cd.data));
3592
3593        let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3594
3595        // Metadata, Data, Dictionary, (maybe) Repetition Index
3596        let mut data = Vec::with_capacity(4);
3597        data.push(serialized.metadata);
3598        data.push(serialized.data);
3599
3600        if let Some(dictionary_data) = dictionary_data {
3601            let num_dictionary_items = dictionary_data.num_values();
3602            // field in `create_block_compressor` is not used currently.
3603            let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3604
3605            let (compressor, dictionary_encoding) = compression_strategy
3606                .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3607            let dictionary_buffer = compressor.compress(dictionary_data)?;
3608
3609            data.push(dictionary_buffer);
3610            if let Some(rep_index) = rep_index {
3611                data.push(rep_index);
3612            }
3613
3614            let description = ProtobufUtils::miniblock_layout(
3615                compressed_rep.map(|cr| cr.compression),
3616                compressed_def.map(|cd| cd.compression),
3617                value_encoding,
3618                rep_index_depth,
3619                serialized.num_buffers,
3620                Some((dictionary_encoding, num_dictionary_items)),
3621                &repdef.def_meaning,
3622                num_items,
3623            );
3624            Ok(EncodedPage {
3625                num_rows,
3626                column_idx,
3627                data,
3628                description: PageEncoding::Structural(description),
3629                row_number,
3630            })
3631        } else {
3632            let description = ProtobufUtils::miniblock_layout(
3633                compressed_rep.map(|cr| cr.compression),
3634                compressed_def.map(|cd| cd.compression),
3635                value_encoding,
3636                rep_index_depth,
3637                serialized.num_buffers,
3638                None,
3639                &repdef.def_meaning,
3640                num_items,
3641            );
3642
3643            if let Some(mut rep_index) = rep_index {
3644                let view = rep_index.borrow_to_typed_slice::<u64>();
3645                let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3646                debug_assert_eq!(total, num_rows);
3647
3648                data.push(rep_index);
3649            }
3650
3651            Ok(EncodedPage {
3652                num_rows,
3653                column_idx,
3654                data,
3655                description: PageEncoding::Structural(description),
3656                row_number,
3657            })
3658        }
3659    }
3660
3661    // For fixed-size data we encode < control word | data > for each value
3662    fn serialize_full_zip_fixed(
3663        fixed: FixedWidthDataBlock,
3664        mut repdef: ControlWordIterator,
3665        num_values: u64,
3666    ) -> SerializedFullZip {
3667        let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3668        let mut zipped_data = Vec::with_capacity(len);
3669
3670        let max_rep_index_val = if repdef.has_repetition() {
3671            len as u64
3672        } else {
3673            // Setting this to 0 means we won't write a repetition index
3674            0
3675        };
3676        let mut rep_index_builder =
3677            BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3678
3679        // I suppose we can just pad to the nearest byte but I'm not sure we need to worry about this anytime soon
3680        // because it is unlikely compression of large values is going to yield a result that is not byte aligned
3681        assert_eq!(
3682            fixed.bits_per_value % 8,
3683            0,
3684            "Non-byte aligned full-zip compression not yet supported"
3685        );
3686
3687        let bytes_per_value = fixed.bits_per_value as usize / 8;
3688
3689        let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3690        let mut offset = 0;
3691        while let Some(control) = repdef.append_next(&mut zipped_data) {
3692            if control.is_new_row {
3693                // We have finished a row
3694                debug_assert!(offset <= len);
3695                // SAFETY: We know that `start <= len`
3696                unsafe { rep_index_builder.append(offset as u64) };
3697            }
3698            if control.is_visible {
3699                let value = data_iter.next().unwrap();
3700                zipped_data.extend_from_slice(value);
3701            }
3702            offset = zipped_data.len();
3703        }
3704
3705        debug_assert_eq!(zipped_data.len(), len);
3706        // Put the final value in the rep index
3707        // SAFETY: `zipped_data.len() == len`
3708        unsafe {
3709            rep_index_builder.append(zipped_data.len() as u64);
3710        }
3711
3712        let zipped_data = LanceBuffer::Owned(zipped_data);
3713        let rep_index = rep_index_builder.into_data();
3714        let rep_index = if rep_index.is_empty() {
3715            None
3716        } else {
3717            Some(LanceBuffer::Owned(rep_index))
3718        };
3719        SerializedFullZip {
3720            values: zipped_data,
3721            repetition_index: rep_index,
3722        }
3723    }
3724
3725    // For variable-size data we encode < control word | length | data > for each value
3726    //
3727    // In addition, we create a second buffer, the repetition index
3728    fn serialize_full_zip_variable(
3729        mut variable: VariableWidthBlock,
3730        mut repdef: ControlWordIterator,
3731        num_items: u64,
3732    ) -> SerializedFullZip {
3733        let bytes_per_offset = variable.bits_per_offset as usize / 8;
3734        assert_eq!(
3735            variable.bits_per_offset % 8,
3736            0,
3737            "Only byte-aligned offsets supported"
3738        );
3739        let len = variable.data.len()
3740            + repdef.bytes_per_word() * num_items as usize
3741            + bytes_per_offset * variable.num_values as usize;
3742        let mut buf = Vec::with_capacity(len);
3743
3744        let max_rep_index_val = len as u64;
3745        let mut rep_index_builder =
3746            BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
3747
3748        // TODO: byte pack the item lengths with varint encoding
3749        match bytes_per_offset {
3750            4 => {
3751                let offs = variable.offsets.borrow_to_typed_slice::<u32>();
3752                let mut rep_offset = 0;
3753                let mut windows_iter = offs.as_ref().windows(2);
3754                while let Some(control) = repdef.append_next(&mut buf) {
3755                    if control.is_new_row {
3756                        // We have finished a row
3757                        debug_assert!(rep_offset <= len);
3758                        // SAFETY: We know that `buf.len() <= len`
3759                        unsafe { rep_index_builder.append(rep_offset as u64) };
3760                    }
3761                    if control.is_visible {
3762                        let window = windows_iter.next().unwrap();
3763                        if control.is_valid_item {
3764                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3765                            buf.extend_from_slice(
3766                                &variable.data[window[0] as usize..window[1] as usize],
3767                            );
3768                        }
3769                    }
3770                    rep_offset = buf.len();
3771                }
3772            }
3773            8 => {
3774                let offs = variable.offsets.borrow_to_typed_slice::<u64>();
3775                let mut rep_offset = 0;
3776                let mut windows_iter = offs.as_ref().windows(2);
3777                while let Some(control) = repdef.append_next(&mut buf) {
3778                    if control.is_new_row {
3779                        // We have finished a row
3780                        debug_assert!(rep_offset <= len);
3781                        // SAFETY: We know that `buf.len() <= len`
3782                        unsafe { rep_index_builder.append(rep_offset as u64) };
3783                    }
3784                    if control.is_visible {
3785                        let window = windows_iter.next().unwrap();
3786                        if control.is_valid_item {
3787                            buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3788                            buf.extend_from_slice(
3789                                &variable.data[window[0] as usize..window[1] as usize],
3790                            );
3791                        }
3792                    }
3793                    rep_offset = buf.len();
3794                }
3795            }
3796            _ => panic!("Unsupported offset size"),
3797        }
3798
3799        // We might have saved a few bytes by not copying lengths when the length was zero.  However,
3800        // if we are over `len` then we have a bug.
3801        debug_assert!(buf.len() <= len);
3802        // Put the final value in the rep index
3803        // SAFETY: `zipped_data.len() == len`
3804        unsafe {
3805            rep_index_builder.append(buf.len() as u64);
3806        }
3807
3808        let zipped_data = LanceBuffer::Owned(buf);
3809        let rep_index = rep_index_builder.into_data();
3810        debug_assert!(!rep_index.is_empty());
3811        let rep_index = Some(LanceBuffer::Owned(rep_index));
3812        SerializedFullZip {
3813            values: zipped_data,
3814            repetition_index: rep_index,
3815        }
3816    }
3817
3818    /// Serializes data into a single buffer according to the full-zip format which zips
3819    /// together the repetition, definition, and value data into a single buffer.
3820    fn serialize_full_zip(
3821        compressed_data: PerValueDataBlock,
3822        repdef: ControlWordIterator,
3823        num_items: u64,
3824    ) -> SerializedFullZip {
3825        match compressed_data {
3826            PerValueDataBlock::Fixed(fixed) => {
3827                Self::serialize_full_zip_fixed(fixed, repdef, num_items)
3828            }
3829            PerValueDataBlock::Variable(var) => {
3830                Self::serialize_full_zip_variable(var, repdef, num_items)
3831            }
3832        }
3833    }
3834
3835    fn encode_full_zip(
3836        column_idx: u32,
3837        field: &Field,
3838        compression_strategy: &dyn CompressionStrategy,
3839        data: DataBlock,
3840        repdefs: Vec<RepDefBuilder>,
3841        row_number: u64,
3842        num_lists: u64,
3843    ) -> Result<EncodedPage> {
3844        let repdef = RepDefBuilder::serialize(repdefs);
3845        let max_rep = repdef
3846            .repetition_levels
3847            .as_ref()
3848            .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
3849        let max_def = repdef
3850            .definition_levels
3851            .as_ref()
3852            .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
3853
3854        // To handle FSL we just flatten
3855        // let data = data.flatten();
3856
3857        let (num_items, num_visible_items) =
3858            if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
3859                // If there are rep levels there may be "invisible" items and we need to encode
3860                // rep_levels.len() things which might be larger than data.num_values()
3861                (rep_levels.len() as u64, data.num_values())
3862            } else {
3863                // If there are no rep levels then we encode data.num_values() things
3864                (data.num_values(), data.num_values())
3865            };
3866
3867        let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
3868
3869        let repdef_iter = build_control_word_iterator(
3870            repdef.repetition_levels.as_deref(),
3871            max_rep,
3872            repdef.definition_levels.as_deref(),
3873            max_def,
3874            max_visible_def,
3875            num_items as usize,
3876        );
3877        let bits_rep = repdef_iter.bits_rep();
3878        let bits_def = repdef_iter.bits_def();
3879
3880        let compressor = compression_strategy.create_per_value(field, &data)?;
3881        let (compressed_data, value_encoding) = compressor.compress(data)?;
3882
3883        let description = match &compressed_data {
3884            PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
3885                bits_rep,
3886                bits_def,
3887                fixed.bits_per_value as u32,
3888                value_encoding,
3889                &repdef.def_meaning,
3890                num_items as u32,
3891                num_visible_items as u32,
3892            ),
3893            PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
3894                bits_rep,
3895                bits_def,
3896                variable.bits_per_offset as u32,
3897                value_encoding,
3898                &repdef.def_meaning,
3899                num_items as u32,
3900                num_visible_items as u32,
3901            ),
3902        };
3903
3904        let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
3905
3906        let data = if let Some(repindex) = zipped.repetition_index {
3907            vec![zipped.values, repindex]
3908        } else {
3909            vec![zipped.values]
3910        };
3911
3912        Ok(EncodedPage {
3913            num_rows: num_lists,
3914            column_idx,
3915            data,
3916            description: PageEncoding::Structural(description),
3917            row_number,
3918        })
3919    }
3920
3921    fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) {
3922        let cardinality = data_block
3923            .get_stat(Stat::Cardinality)
3924            .unwrap()
3925            .as_primitive::<UInt64Type>()
3926            .value(0);
3927        match data_block {
3928            DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
3929                // Currently FixedWidth DataBlock with only bits_per_value 128 has cardinality
3930                // TODO: a follow up PR to support `FixedWidth DataBlock with bits_per_value == 256`.
3931                let mut map = HashMap::new();
3932                let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
3933                let u128_slice = u128_slice.as_ref();
3934                let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
3935                let mut indices_buffer =
3936                    Vec::with_capacity(fixed_width_data_block.num_values as usize);
3937                let mut curr_idx: i32 = 0;
3938                u128_slice.iter().for_each(|&value| {
3939                    let idx = *map.entry(value).or_insert_with(|| {
3940                        dictionary_buffer.push(value);
3941                        curr_idx += 1;
3942                        curr_idx - 1
3943                    });
3944                    indices_buffer.push(idx);
3945                });
3946                let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3947                    data: LanceBuffer::reinterpret_vec(dictionary_buffer),
3948                    bits_per_value: 128,
3949                    num_values: curr_idx as u64,
3950                    block_info: BlockInfo::default(),
3951                });
3952                let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3953                    data: LanceBuffer::reinterpret_vec(indices_buffer),
3954                    bits_per_value: 32,
3955                    num_values: fixed_width_data_block.num_values,
3956                    block_info: BlockInfo::default(),
3957                });
3958                // Todo: if we decide to do eager statistics computing, wrap statistics computing
3959                // in DataBlock constructor.
3960                indices_data_block.compute_stat();
3961
3962                (indices_data_block, dictionary_data_block)
3963            }
3964            DataBlock::VariableWidth(ref mut variable_width_data_block) => {
3965                match variable_width_data_block.bits_per_offset {
3966                    32 => {
3967                        let mut map = HashMap::new();
3968                        let offsets = variable_width_data_block
3969                            .offsets
3970                            .borrow_to_typed_slice::<u32>();
3971                        let offsets = offsets.as_ref();
3972
3973                        let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
3974                            "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
3975                        );
3976                        let max_len = max_len.as_primitive::<UInt64Type>().value(0);
3977
3978                        let mut dictionary_buffer: Vec<u8> =
3979                            Vec::with_capacity((max_len * cardinality) as usize);
3980                        let mut dictionary_offsets_buffer = vec![0];
3981                        let mut curr_idx = 0;
3982                        let mut indices_buffer =
3983                            Vec::with_capacity(variable_width_data_block.num_values as usize);
3984
3985                        offsets
3986                            .iter()
3987                            .zip(offsets.iter().skip(1))
3988                            .for_each(|(&start, &end)| {
3989                                let key =
3990                                    &variable_width_data_block.data[start as usize..end as usize];
3991                                let idx: i32 = *map.entry(U8SliceKey(key)).or_insert_with(|| {
3992                                    dictionary_buffer.extend_from_slice(key);
3993                                    dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
3994                                    curr_idx += 1;
3995                                    curr_idx - 1
3996                                });
3997                                indices_buffer.push(idx);
3998                            });
3999
4000                        let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4001                            data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4002                            offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4003                            bits_per_offset: 32,
4004                            num_values: curr_idx as u64,
4005                            block_info: BlockInfo::default(),
4006                        });
4007
4008                        let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4009                            data: LanceBuffer::reinterpret_vec(indices_buffer),
4010                            bits_per_value: 32,
4011                            num_values: variable_width_data_block.num_values,
4012                            block_info: BlockInfo::default(),
4013                        });
4014                        // Todo: if we decide to do eager statistics computing, wrap statistics computing
4015                        // in DataBlock constructor.
4016                        indices_data_block.compute_stat();
4017
4018                        (indices_data_block, dictionary_data_block)
4019                    }
4020                    64 => {
4021                        let mut map = HashMap::new();
4022                        let offsets = variable_width_data_block
4023                            .offsets
4024                            .borrow_to_typed_slice::<u64>();
4025                        let offsets = offsets.as_ref();
4026
4027                        let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
4028                            "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
4029                        );
4030                        let max_len = max_len.as_primitive::<UInt64Type>().value(0);
4031
4032                        let mut dictionary_buffer: Vec<u8> =
4033                            Vec::with_capacity((max_len * cardinality) as usize);
4034                        let mut dictionary_offsets_buffer = vec![0];
4035                        let mut curr_idx = 0;
4036                        let mut indices_buffer =
4037                            Vec::with_capacity(variable_width_data_block.num_values as usize);
4038
4039                        offsets
4040                            .iter()
4041                            .zip(offsets.iter().skip(1))
4042                            .for_each(|(&start, &end)| {
4043                                let key =
4044                                    &variable_width_data_block.data[start as usize..end as usize];
4045                                let idx: i64 = *map.entry(U8SliceKey(key)).or_insert_with(|| {
4046                                    dictionary_buffer.extend_from_slice(key);
4047                                    dictionary_offsets_buffer.push(dictionary_buffer.len() as u64);
4048                                    curr_idx += 1;
4049                                    curr_idx - 1
4050                                });
4051                                indices_buffer.push(idx);
4052                            });
4053
4054                        let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4055                            data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4056                            offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4057                            bits_per_offset: 64,
4058                            num_values: curr_idx as u64,
4059                            block_info: BlockInfo::default(),
4060                        });
4061
4062                        let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4063                            data: LanceBuffer::reinterpret_vec(indices_buffer),
4064                            bits_per_value: 64,
4065                            num_values: variable_width_data_block.num_values,
4066                            block_info: BlockInfo::default(),
4067                        });
4068                        // Todo: if we decide to do eager statistics computing, wrap statistics computing
4069                        // in DataBlock constructor.
4070                        indices_data_block.compute_stat();
4071
4072                        (indices_data_block, dictionary_data_block)
4073                    }
4074                    _ => {
4075                        unreachable!()
4076                    }
4077                }
4078            }
4079            _ => {
4080                unreachable!("dictionary encode called with data block {:?}", data_block)
4081            }
4082        }
4083    }
4084
4085    fn should_dictionary_encode(data_block: &DataBlock, field: &Field) -> bool {
4086        // Don't dictionary encode tiny arrays
4087        let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4088            .ok()
4089            .and_then(|val| val.parse().ok())
4090            .unwrap_or(100);
4091        if data_block.num_values() < too_small {
4092            return false;
4093        }
4094
4095        // Somewhat arbitrary threshold rule.  Apply dictionary encoding if the number of unique
4096        // values is less than 1/2 the total number of values.
4097        let divisor: u64 = field
4098            .metadata
4099            .get(DICT_DIVISOR_META_KEY)
4100            .map(|val| val.parse().ok())
4101            .unwrap_or_else(|| {
4102                env::var("LANCE_ENCODING_DICT_DIVISOR")
4103                    .ok()
4104                    .and_then(|val| val.parse().ok())
4105            })
4106            .unwrap_or(2);
4107
4108        // Cap on cardinality.  This should be pushed into the cardinality estimation to avoid
4109        // spending too much time estimating cardinality.
4110        let max_cardinality = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
4111            .ok()
4112            .and_then(|val| val.parse().ok())
4113            .unwrap_or(100000);
4114
4115        let threshold = (data_block.num_values() / divisor).min(max_cardinality);
4116
4117        let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4118            cardinality_array.as_primitive::<UInt64Type>().value(0)
4119        } else {
4120            u64::MAX
4121        };
4122
4123        cardinality < threshold
4124    }
4125
4126    // Creates an encode task, consuming all buffered data
4127    fn do_flush(
4128        &mut self,
4129        arrays: Vec<ArrayRef>,
4130        repdefs: Vec<RepDefBuilder>,
4131        row_number: u64,
4132        num_rows: u64,
4133    ) -> Result<Vec<EncodeTask>> {
4134        let column_idx = self.column_index;
4135        let compression_strategy = self.compression_strategy.clone();
4136        let field = self.field.clone();
4137        let encoding_metadata = self.encoding_metadata.clone();
4138        let task = spawn_cpu(move || {
4139            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4140
4141            if num_values == 0 {
4142                // We should not encode empty arrays.  So if we get here that should mean that we
4143                // either have all empty lists or all null lists (or a mix).  We still need to encode
4144                // the rep/def information but we can skip the data encoding.
4145                return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4146            }
4147            let num_nulls = arrays
4148                .iter()
4149                .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4150                .sum::<u64>();
4151
4152            if num_values == num_nulls {
4153                return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4154                    log::debug!(
4155                        "Encoding column {} with {} items using simple-null layout",
4156                        column_idx,
4157                        num_values
4158                    );
4159                    // Simple case, no rep/def and all nulls, we don't need to encode any data
4160                    Self::encode_simple_all_null(column_idx, num_values, row_number)
4161                } else {
4162                    // If we get here then we have definition levels and we need to store those
4163                    Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4164                };
4165            }
4166
4167            let data_block = DataBlock::from_arrays(&arrays, num_values);
4168
4169            // if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding.
4170            if let DataBlock::Struct(ref struct_data_block) = data_block {
4171                if struct_data_block
4172                    .children
4173                    .iter()
4174                    .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4175                {
4176                    panic!("packed struct encoding currently only supports fixed-width fields.")
4177                }
4178            }
4179
4180            // The top-level validity is encoded in repdef so we can remove it.
4181            let data_block = data_block.remove_outer_validity();
4182
4183
4184            if Self::should_dictionary_encode(&data_block, &field) {
4185                log::debug!(
4186                    "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4187                    column_idx,
4188                    num_values
4189                );
4190                let (indices_data_block, dictionary_data_block) =
4191                    Self::dictionary_encode(data_block);
4192                Self::encode_miniblock(
4193                    column_idx,
4194                    &field,
4195                    compression_strategy.as_ref(),
4196                    indices_data_block,
4197                    repdefs,
4198                    row_number,
4199                    Some(dictionary_data_block),
4200                    num_rows,
4201                )
4202            } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4203                log::debug!(
4204                    "Encoding column {} with {} items using mini-block layout",
4205                    column_idx,
4206                    num_values
4207                );
4208                Self::encode_miniblock(
4209                    column_idx,
4210                    &field,
4211                    compression_strategy.as_ref(),
4212                    data_block,
4213                    repdefs,
4214                    row_number,
4215                    None,
4216                    num_rows,
4217                )
4218            } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4219                log::debug!(
4220                    "Encoding column {} with {} items using full-zip layout",
4221                    column_idx,
4222                    num_values
4223                );
4224                Self::encode_full_zip(
4225                    column_idx,
4226                    &field,
4227                    compression_strategy.as_ref(),
4228                    data_block,
4229                    repdefs,
4230                    row_number,
4231                    num_rows,
4232                )
4233            } else {
4234                Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}.  This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4235            }
4236        })
4237        .boxed();
4238        Ok(vec![task])
4239    }
4240
4241    fn extract_validity_buf(
4242        array: &dyn Array,
4243        repdef: &mut RepDefBuilder,
4244        keep_original_array: bool,
4245    ) {
4246        if let Some(validity) = array.nulls() {
4247            if keep_original_array {
4248                repdef.add_validity_bitmap(validity.clone());
4249            } else {
4250                repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4251            }
4252        } else {
4253            repdef.add_no_null(array.len());
4254        }
4255    }
4256
4257    fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder, keep_original_array: bool) {
4258        match array.data_type() {
4259            DataType::Null => {
4260                repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4261            }
4262            DataType::Dictionary(_, _) => {
4263                unreachable!()
4264            }
4265            // Extract our validity buf but NOT any child validity bufs. (they will be encoded in
4266            // as part of the values).  Note: for FSL we do not use repdef.add_fsl because we do
4267            // NOT want to increase the repdef depth.
4268            //
4269            // This would be quite catasrophic for something like vector embeddings.  Imagine we
4270            // had thousands of vectors and some were null but no vector contained null items.  If
4271            // we treated the vectors (primitive FSL) like we treat structural FSL we would end up
4272            // with a rep/def value for every single item in the vector.
4273            _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4274        }
4275    }
4276}
4277
4278impl FieldEncoder for PrimitiveStructuralEncoder {
4279    // Buffers data, if there is enough to write a page then we create an encode task
4280    fn maybe_encode(
4281        &mut self,
4282        array: ArrayRef,
4283        _external_buffers: &mut OutOfLineBuffers,
4284        mut repdef: RepDefBuilder,
4285        row_number: u64,
4286        num_rows: u64,
4287    ) -> Result<Vec<EncodeTask>> {
4288        Self::extract_validity(array.as_ref(), &mut repdef, self.keep_original_array);
4289        self.accumulated_repdefs.push(repdef);
4290
4291        if let Some((arrays, row_number, num_rows)) =
4292            self.accumulation_queue.insert(array, row_number, num_rows)
4293        {
4294            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4295            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4296        } else {
4297            Ok(vec![])
4298        }
4299    }
4300
4301    // If there is any data left in the buffer then create an encode task from it
4302    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4303        if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4304            let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4305            Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4306        } else {
4307            Ok(vec![])
4308        }
4309    }
4310
4311    fn num_columns(&self) -> u32 {
4312        1
4313    }
4314
4315    fn finish(
4316        &mut self,
4317        _external_buffers: &mut OutOfLineBuffers,
4318    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4319        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4320    }
4321}
4322
4323#[cfg(test)]
4324#[allow(clippy::single_range_in_vec_init)]
4325mod tests {
4326    use std::{collections::VecDeque, sync::Arc};
4327
4328    use crate::encodings::logical::primitive::{
4329        ChunkDrainInstructions, PrimitiveStructuralEncoder,
4330    };
4331    use arrow_array::{ArrayRef, Int8Array, StringArray};
4332
4333    use super::{
4334        ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4335        FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4336        FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4337        StructuralPageScheduler,
4338    };
4339
4340    #[test]
4341    fn test_is_narrow() {
4342        let int8_array = Int8Array::from(vec![1, 2, 3]);
4343        let array_ref: ArrayRef = Arc::new(int8_array);
4344        let block = DataBlock::from_array(array_ref);
4345
4346        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4347
4348        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4349        let block = DataBlock::from_array(string_array);
4350        assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4351
4352        let string_array = StringArray::from(vec![
4353            Some("hello world".repeat(100)),
4354            Some("world".to_string()),
4355        ]);
4356        let block = DataBlock::from_array(string_array);
4357        assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4358    }
4359
4360    #[test]
4361    fn test_map_range() {
4362        // Null in the middle
4363        // [[A, B, C], [D, E], NULL, [F, G, H]]
4364        let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4365        let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4366        let max_visible_def = 0;
4367        let total_items = 8;
4368        let max_rep = 1;
4369
4370        let check = |range, expected_item_range, expected_level_range| {
4371            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4372                range,
4373                rep.as_ref(),
4374                def.as_ref(),
4375                max_rep,
4376                max_visible_def,
4377                total_items,
4378                PreambleAction::Absent,
4379            );
4380            assert_eq!(item_range, expected_item_range);
4381            assert_eq!(level_range, expected_level_range);
4382        };
4383
4384        check(0..1, 0..3, 0..3);
4385        check(1..2, 3..5, 3..5);
4386        check(2..3, 5..5, 5..6);
4387        check(3..4, 5..8, 6..9);
4388        check(0..2, 0..5, 0..5);
4389        check(1..3, 3..5, 3..6);
4390        check(2..4, 5..8, 5..9);
4391        check(0..3, 0..5, 0..6);
4392        check(1..4, 3..8, 3..9);
4393        check(0..4, 0..8, 0..9);
4394
4395        // Null at start
4396        // [NULL, [A, B], [C]]
4397        let rep = Some(vec![1, 1, 0, 1]);
4398        let def = Some(vec![1, 0, 0, 0]);
4399        let max_visible_def = 0;
4400        let total_items = 3;
4401
4402        let check = |range, expected_item_range, expected_level_range| {
4403            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4404                range,
4405                rep.as_ref(),
4406                def.as_ref(),
4407                max_rep,
4408                max_visible_def,
4409                total_items,
4410                PreambleAction::Absent,
4411            );
4412            assert_eq!(item_range, expected_item_range);
4413            assert_eq!(level_range, expected_level_range);
4414        };
4415
4416        check(0..1, 0..0, 0..1);
4417        check(1..2, 0..2, 1..3);
4418        check(2..3, 2..3, 3..4);
4419        check(0..2, 0..2, 0..3);
4420        check(1..3, 0..3, 1..4);
4421        check(0..3, 0..3, 0..4);
4422
4423        // Null at end
4424        // [[A], [B, C], NULL]
4425        let rep = Some(vec![1, 1, 0, 1]);
4426        let def = Some(vec![0, 0, 0, 1]);
4427        let max_visible_def = 0;
4428        let total_items = 3;
4429
4430        let check = |range, expected_item_range, expected_level_range| {
4431            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4432                range,
4433                rep.as_ref(),
4434                def.as_ref(),
4435                max_rep,
4436                max_visible_def,
4437                total_items,
4438                PreambleAction::Absent,
4439            );
4440            assert_eq!(item_range, expected_item_range);
4441            assert_eq!(level_range, expected_level_range);
4442        };
4443
4444        check(0..1, 0..1, 0..1);
4445        check(1..2, 1..3, 1..3);
4446        check(2..3, 3..3, 3..4);
4447        check(0..2, 0..3, 0..3);
4448        check(1..3, 1..3, 1..4);
4449        check(0..3, 0..3, 0..4);
4450
4451        // No nulls, with repetition
4452        // [[A, B], [C, D], [E, F]]
4453        let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4454        let def: Option<&[u16]> = None;
4455        let max_visible_def = 0;
4456        let total_items = 6;
4457
4458        let check = |range, expected_item_range, expected_level_range| {
4459            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4460                range,
4461                rep.as_ref(),
4462                def.as_ref(),
4463                max_rep,
4464                max_visible_def,
4465                total_items,
4466                PreambleAction::Absent,
4467            );
4468            assert_eq!(item_range, expected_item_range);
4469            assert_eq!(level_range, expected_level_range);
4470        };
4471
4472        check(0..1, 0..2, 0..2);
4473        check(1..2, 2..4, 2..4);
4474        check(2..3, 4..6, 4..6);
4475        check(0..2, 0..4, 0..4);
4476        check(1..3, 2..6, 2..6);
4477        check(0..3, 0..6, 0..6);
4478
4479        // No repetition, with nulls (this case is trivial)
4480        // [A, B, NULL, C]
4481        let rep: Option<&[u16]> = None;
4482        let def = Some(vec![0, 0, 1, 0]);
4483        let max_visible_def = 1;
4484        let total_items = 4;
4485
4486        let check = |range, expected_item_range, expected_level_range| {
4487            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4488                range,
4489                rep.as_ref(),
4490                def.as_ref(),
4491                max_rep,
4492                max_visible_def,
4493                total_items,
4494                PreambleAction::Absent,
4495            );
4496            assert_eq!(item_range, expected_item_range);
4497            assert_eq!(level_range, expected_level_range);
4498        };
4499
4500        check(0..1, 0..1, 0..1);
4501        check(1..2, 1..2, 1..2);
4502        check(2..3, 2..3, 2..3);
4503        check(0..2, 0..2, 0..2);
4504        check(1..3, 1..3, 1..3);
4505        check(0..3, 0..3, 0..3);
4506
4507        // Tricky case, this chunk is a continuation and starts with a rep-index = 0
4508        // [[..., A] [B, C], NULL]
4509        //
4510        // What we do will depend on the preamble action
4511        let rep = Some(vec![0, 1, 0, 1]);
4512        let def = Some(vec![0, 0, 0, 1]);
4513        let max_visible_def = 0;
4514        let total_items = 3;
4515
4516        let check = |range, expected_item_range, expected_level_range| {
4517            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4518                range,
4519                rep.as_ref(),
4520                def.as_ref(),
4521                max_rep,
4522                max_visible_def,
4523                total_items,
4524                PreambleAction::Take,
4525            );
4526            assert_eq!(item_range, expected_item_range);
4527            assert_eq!(level_range, expected_level_range);
4528        };
4529
4530        // If we are taking the preamble then the range must start at 0
4531        check(0..1, 0..3, 0..3);
4532        check(0..2, 0..3, 0..4);
4533
4534        let check = |range, expected_item_range, expected_level_range| {
4535            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4536                range,
4537                rep.as_ref(),
4538                def.as_ref(),
4539                max_rep,
4540                max_visible_def,
4541                total_items,
4542                PreambleAction::Skip,
4543            );
4544            assert_eq!(item_range, expected_item_range);
4545            assert_eq!(level_range, expected_level_range);
4546        };
4547
4548        check(0..1, 1..3, 1..3);
4549        check(1..2, 3..3, 3..4);
4550        check(0..2, 1..3, 1..4);
4551
4552        // Another preamble case but now it doesn't end with a new list
4553        // [[..., A], NULL, [D, E]]
4554        //
4555        // What we do will depend on the preamble action
4556        let rep = Some(vec![0, 1, 1, 0]);
4557        let def = Some(vec![0, 1, 0, 0]);
4558        let max_visible_def = 0;
4559        let total_items = 4;
4560
4561        let check = |range, expected_item_range, expected_level_range| {
4562            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4563                range,
4564                rep.as_ref(),
4565                def.as_ref(),
4566                max_rep,
4567                max_visible_def,
4568                total_items,
4569                PreambleAction::Take,
4570            );
4571            assert_eq!(item_range, expected_item_range);
4572            assert_eq!(level_range, expected_level_range);
4573        };
4574
4575        // If we are taking the preamble then the range must start at 0
4576        check(0..1, 0..1, 0..2);
4577        check(0..2, 0..3, 0..4);
4578
4579        let check = |range, expected_item_range, expected_level_range| {
4580            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4581                range,
4582                rep.as_ref(),
4583                def.as_ref(),
4584                max_rep,
4585                max_visible_def,
4586                total_items,
4587                PreambleAction::Skip,
4588            );
4589            assert_eq!(item_range, expected_item_range);
4590            assert_eq!(level_range, expected_level_range);
4591        };
4592
4593        // If we are taking the preamble then the range must start at 0
4594        check(0..1, 1..1, 1..2);
4595        check(1..2, 1..3, 2..4);
4596        check(0..2, 1..3, 1..4);
4597
4598        // Now a preamble case without any definition levels
4599        // [[..., A] [B, C], [D]]
4600        let rep = Some(vec![0, 1, 0, 1]);
4601        let def: Option<Vec<u16>> = None;
4602        let max_visible_def = 0;
4603        let total_items = 4;
4604
4605        let check = |range, expected_item_range, expected_level_range| {
4606            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4607                range,
4608                rep.as_ref(),
4609                def.as_ref(),
4610                max_rep,
4611                max_visible_def,
4612                total_items,
4613                PreambleAction::Take,
4614            );
4615            assert_eq!(item_range, expected_item_range);
4616            assert_eq!(level_range, expected_level_range);
4617        };
4618
4619        // If we are taking the preamble then the range must start at 0
4620        check(0..1, 0..3, 0..3);
4621        check(0..2, 0..4, 0..4);
4622
4623        let check = |range, expected_item_range, expected_level_range| {
4624            let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4625                range,
4626                rep.as_ref(),
4627                def.as_ref(),
4628                max_rep,
4629                max_visible_def,
4630                total_items,
4631                PreambleAction::Skip,
4632            );
4633            assert_eq!(item_range, expected_item_range);
4634            assert_eq!(level_range, expected_level_range);
4635        };
4636
4637        check(0..1, 1..3, 1..3);
4638        check(1..2, 3..4, 3..4);
4639        check(0..2, 1..4, 1..4);
4640    }
4641
4642    #[test]
4643    fn test_schedule_instructions() {
4644        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4645        let repetition_index = MiniBlockRepIndex::decode(&repetition_index);
4646
4647        let check = |user_ranges, expected_instructions| {
4648            let instructions =
4649                ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4650            assert_eq!(instructions, expected_instructions);
4651        };
4652
4653        // The instructions we expect if we're grabbing the whole range
4654        let expected_take_all = vec![
4655            ChunkInstructions {
4656                chunk_idx: 0,
4657                preamble: PreambleAction::Absent,
4658                rows_to_skip: 0,
4659                rows_to_take: 5,
4660                take_trailer: true,
4661            },
4662            ChunkInstructions {
4663                chunk_idx: 1,
4664                preamble: PreambleAction::Take,
4665                rows_to_skip: 0,
4666                rows_to_take: 2,
4667                take_trailer: false,
4668            },
4669            ChunkInstructions {
4670                chunk_idx: 2,
4671                preamble: PreambleAction::Absent,
4672                rows_to_skip: 0,
4673                rows_to_take: 4,
4674                take_trailer: true,
4675            },
4676            ChunkInstructions {
4677                chunk_idx: 3,
4678                preamble: PreambleAction::Take,
4679                rows_to_skip: 0,
4680                rows_to_take: 1,
4681                take_trailer: false,
4682            },
4683        ];
4684
4685        // Take all as 1 range
4686        check(&[0..14], expected_take_all.clone());
4687
4688        // Take all a individual rows
4689        check(
4690            &[
4691                0..1,
4692                1..2,
4693                2..3,
4694                3..4,
4695                4..5,
4696                5..6,
4697                6..7,
4698                7..8,
4699                8..9,
4700                9..10,
4701                10..11,
4702                11..12,
4703                12..13,
4704                13..14,
4705            ],
4706            expected_take_all,
4707        );
4708
4709        // Test some partial takes
4710
4711        // 2 rows in the same chunk but not contiguous
4712        check(
4713            &[0..1, 3..4],
4714            vec![
4715                ChunkInstructions {
4716                    chunk_idx: 0,
4717                    preamble: PreambleAction::Absent,
4718                    rows_to_skip: 0,
4719                    rows_to_take: 1,
4720                    take_trailer: false,
4721                },
4722                ChunkInstructions {
4723                    chunk_idx: 0,
4724                    preamble: PreambleAction::Absent,
4725                    rows_to_skip: 3,
4726                    rows_to_take: 1,
4727                    take_trailer: false,
4728                },
4729            ],
4730        );
4731
4732        // Taking just a trailer/preamble
4733        check(
4734            &[5..6],
4735            vec![
4736                ChunkInstructions {
4737                    chunk_idx: 0,
4738                    preamble: PreambleAction::Absent,
4739                    rows_to_skip: 5,
4740                    rows_to_take: 0,
4741                    take_trailer: true,
4742                },
4743                ChunkInstructions {
4744                    chunk_idx: 1,
4745                    preamble: PreambleAction::Take,
4746                    rows_to_skip: 0,
4747                    rows_to_take: 0,
4748                    take_trailer: false,
4749                },
4750            ],
4751        );
4752
4753        // Skipping an entire chunk
4754        check(
4755            &[7..10],
4756            vec![
4757                ChunkInstructions {
4758                    chunk_idx: 1,
4759                    preamble: PreambleAction::Skip,
4760                    rows_to_skip: 1,
4761                    rows_to_take: 1,
4762                    take_trailer: false,
4763                },
4764                ChunkInstructions {
4765                    chunk_idx: 2,
4766                    preamble: PreambleAction::Absent,
4767                    rows_to_skip: 0,
4768                    rows_to_take: 2,
4769                    take_trailer: false,
4770                },
4771            ],
4772        );
4773    }
4774
4775    #[test]
4776    fn test_drain_instructions() {
4777        fn drain_from_instructions(
4778            instructions: &mut VecDeque<ChunkInstructions>,
4779            mut rows_desired: u64,
4780            need_preamble: &mut bool,
4781            skip_in_chunk: &mut u64,
4782        ) -> Vec<ChunkDrainInstructions> {
4783            // Note: instructions.len() is an upper bound, we typically take much fewer
4784            let mut drain_instructions = Vec::with_capacity(instructions.len());
4785            while rows_desired > 0 || *need_preamble {
4786                let (next_instructions, consumed_chunk) = instructions
4787                    .front()
4788                    .unwrap()
4789                    .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
4790                if consumed_chunk {
4791                    instructions.pop_front();
4792                }
4793                drain_instructions.push(next_instructions);
4794            }
4795            drain_instructions
4796        }
4797
4798        let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4799        let repetition_index = MiniBlockRepIndex::decode(&repetition_index);
4800        let user_ranges = vec![1..7, 10..14];
4801
4802        // First, schedule the ranges
4803        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4804
4805        let mut to_drain = VecDeque::from(scheduled.clone());
4806
4807        // Now we drain in batches of 4
4808
4809        let mut need_preamble = false;
4810        let mut skip_in_chunk = 0;
4811
4812        let next_batch =
4813            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4814
4815        assert!(!need_preamble);
4816        assert_eq!(skip_in_chunk, 4);
4817        assert_eq!(
4818            next_batch,
4819            vec![ChunkDrainInstructions {
4820                chunk_instructions: scheduled[0].clone(),
4821                rows_to_take: 4,
4822                rows_to_skip: 0,
4823                preamble_action: PreambleAction::Absent,
4824            }]
4825        );
4826
4827        let next_batch =
4828            drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4829
4830        assert!(!need_preamble);
4831        assert_eq!(skip_in_chunk, 2);
4832
4833        assert_eq!(
4834            next_batch,
4835            vec![
4836                ChunkDrainInstructions {
4837                    chunk_instructions: scheduled[0].clone(),
4838                    rows_to_take: 1,
4839                    rows_to_skip: 4,
4840                    preamble_action: PreambleAction::Absent,
4841                },
4842                ChunkDrainInstructions {
4843                    chunk_instructions: scheduled[1].clone(),
4844                    rows_to_take: 1,
4845                    rows_to_skip: 0,
4846                    preamble_action: PreambleAction::Take,
4847                },
4848                ChunkDrainInstructions {
4849                    chunk_instructions: scheduled[2].clone(),
4850                    rows_to_take: 2,
4851                    rows_to_skip: 0,
4852                    preamble_action: PreambleAction::Absent,
4853                }
4854            ]
4855        );
4856
4857        let next_batch =
4858            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4859
4860        assert!(!need_preamble);
4861        assert_eq!(skip_in_chunk, 0);
4862
4863        assert_eq!(
4864            next_batch,
4865            vec![
4866                ChunkDrainInstructions {
4867                    chunk_instructions: scheduled[2].clone(),
4868                    rows_to_take: 1,
4869                    rows_to_skip: 2,
4870                    preamble_action: PreambleAction::Absent,
4871                },
4872                ChunkDrainInstructions {
4873                    chunk_instructions: scheduled[3].clone(),
4874                    rows_to_take: 1,
4875                    rows_to_skip: 0,
4876                    preamble_action: PreambleAction::Take,
4877                },
4878            ]
4879        );
4880
4881        // Regression case.  Need a chunk with preamble, rows, and trailer (the middle chunk here)
4882        let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
4883        let repetition_index = MiniBlockRepIndex::decode(&repetition_index);
4884        let user_ranges = vec![0..28];
4885
4886        // First, schedule the ranges
4887        let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4888
4889        let mut to_drain = VecDeque::from(scheduled.clone());
4890
4891        // Drain first chunk and some of second chunk
4892
4893        let mut need_preamble = false;
4894        let mut skip_in_chunk = 0;
4895
4896        let next_batch =
4897            drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
4898
4899        assert_eq!(
4900            next_batch,
4901            vec![
4902                ChunkDrainInstructions {
4903                    chunk_instructions: scheduled[0].clone(),
4904                    rows_to_take: 6,
4905                    rows_to_skip: 0,
4906                    preamble_action: PreambleAction::Absent,
4907                },
4908                ChunkDrainInstructions {
4909                    chunk_instructions: scheduled[1].clone(),
4910                    rows_to_take: 1,
4911                    rows_to_skip: 0,
4912                    preamble_action: PreambleAction::Take,
4913                },
4914            ]
4915        );
4916
4917        assert!(!need_preamble);
4918        assert_eq!(skip_in_chunk, 1);
4919
4920        // Now, the tricky part.  We drain the second chunk, including the trailer, and need to make sure
4921        // we get a drain task to take the preamble of the third chunk (and nothing else)
4922        let next_batch =
4923            drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4924
4925        assert_eq!(
4926            next_batch,
4927            vec![
4928                ChunkDrainInstructions {
4929                    chunk_instructions: scheduled[1].clone(),
4930                    rows_to_take: 2,
4931                    rows_to_skip: 1,
4932                    preamble_action: PreambleAction::Skip,
4933                },
4934                ChunkDrainInstructions {
4935                    chunk_instructions: scheduled[2].clone(),
4936                    rows_to_take: 0,
4937                    rows_to_skip: 0,
4938                    preamble_action: PreambleAction::Take,
4939                },
4940            ]
4941        );
4942
4943        assert!(!need_preamble);
4944        assert_eq!(skip_in_chunk, 0);
4945    }
4946
4947    #[tokio::test]
4948    async fn test_fullzip_repetition_index_caching() {
4949        use crate::testing::SimulatedScheduler;
4950        use lance_core::cache::LanceCache;
4951
4952        // Simplified FixedPerValueDecompressor for testing
4953        #[derive(Debug)]
4954        struct TestFixedDecompressor;
4955
4956        impl FixedPerValueDecompressor for TestFixedDecompressor {
4957            fn decompress(
4958                &self,
4959                _data: FixedWidthDataBlock,
4960                _num_rows: u64,
4961            ) -> crate::Result<DataBlock> {
4962                unimplemented!("Test decompressor")
4963            }
4964
4965            fn bits_per_value(&self) -> u64 {
4966                32
4967            }
4968        }
4969
4970        // Create test repetition index data
4971        let rows_in_page = 100u64;
4972        let bytes_per_value = 4u64;
4973        let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
4974
4975        // Create mock repetition index data
4976        let mut rep_index_data = Vec::new();
4977        for i in 0..=rows_in_page {
4978            let offset = (i * 100) as u32; // Each row starts at i * 100 bytes
4979            rep_index_data.extend_from_slice(&offset.to_le_bytes());
4980        }
4981
4982        // Simulate storage with the repetition index at position 1000
4983        let mut full_data = vec![0u8; 1000];
4984        full_data.extend_from_slice(&rep_index_data);
4985        full_data.extend_from_slice(&vec![0u8; 10000]); // Add some data after
4986
4987        let data = bytes::Bytes::from(full_data);
4988        let io = Arc::new(SimulatedScheduler::new(data));
4989        let _cache = Arc::new(LanceCache::with_capacity(1024 * 1024));
4990
4991        // Create FullZipScheduler with repetition index
4992        let mut scheduler = FullZipScheduler {
4993            data_buf_position: 0,
4994            rep_index: Some(FullZipRepIndexDetails {
4995                buf_position: 1000,
4996                bytes_per_value,
4997            }),
4998            priority: 0,
4999            rows_in_page,
5000            bits_per_offset: 32,
5001            details: Arc::new(FullZipDecodeDetails {
5002                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5003                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5004                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5005                max_rep: 0,
5006                max_visible_def: 0,
5007            }),
5008            cached_state: None,
5009            enable_cache: true, // Enable caching for test
5010        };
5011
5012        // First initialization should load and cache the repetition index
5013        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5014        let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5015
5016        // Verify that we got a FullZipCacheableState (not NoCachedPageData)
5017        let is_cached = cached_data1
5018            .clone()
5019            .as_arc_any()
5020            .downcast::<FullZipCacheableState>()
5021            .is_ok();
5022        assert!(
5023            is_cached,
5024            "Expected FullZipCacheableState, got NoCachedPageData"
5025        );
5026
5027        // Load the cached data into the scheduler
5028        scheduler.load(&cached_data1);
5029
5030        // Verify that cached_state is now populated
5031        assert!(
5032            scheduler.cached_state.is_some(),
5033            "cached_state should be populated after load"
5034        );
5035
5036        // Verify the cached data contains the repetition index
5037        let cached_state = scheduler.cached_state.as_ref().unwrap();
5038
5039        // Test that schedule_ranges_rep uses the cached data
5040        let ranges = vec![0..10, 20..30];
5041        let result = scheduler.schedule_ranges_rep(
5042            &ranges,
5043            &io_dyn,
5044            FullZipRepIndexDetails {
5045                buf_position: 1000,
5046                bytes_per_value,
5047            },
5048        );
5049
5050        // The result should be OK (not an error)
5051        assert!(
5052            result.is_ok(),
5053            "schedule_ranges_rep should succeed with cached data"
5054        );
5055
5056        // Second scheduler instance should be able to use the cached data
5057        let mut scheduler2 = FullZipScheduler {
5058            data_buf_position: 0,
5059            rep_index: Some(FullZipRepIndexDetails {
5060                buf_position: 1000,
5061                bytes_per_value,
5062            }),
5063            priority: 0,
5064            rows_in_page,
5065            bits_per_offset: 32,
5066            details: scheduler.details.clone(),
5067            cached_state: None,
5068            enable_cache: true, // Enable caching for test
5069        };
5070
5071        // Load cached data from the first scheduler
5072        scheduler2.load(&cached_data1);
5073        assert!(
5074            scheduler2.cached_state.is_some(),
5075            "Second scheduler should have cached_state after load"
5076        );
5077
5078        // Verify that both schedulers have the same cached data
5079        let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5080        assert!(
5081            Arc::ptr_eq(cached_state, cached_state2),
5082            "Both schedulers should share the same cached data"
5083        );
5084    }
5085
5086    #[tokio::test]
5087    async fn test_fullzip_cache_config_controls_caching() {
5088        use crate::testing::SimulatedScheduler;
5089
5090        // Simplified FixedPerValueDecompressor for testing
5091        #[derive(Debug)]
5092        struct TestFixedDecompressor;
5093
5094        impl FixedPerValueDecompressor for TestFixedDecompressor {
5095            fn decompress(
5096                &self,
5097                _data: FixedWidthDataBlock,
5098                _num_rows: u64,
5099            ) -> crate::Result<DataBlock> {
5100                unimplemented!("Test decompressor")
5101            }
5102
5103            fn bits_per_value(&self) -> u64 {
5104                32
5105            }
5106        }
5107
5108        // Test that enable_cache flag actually controls caching behavior
5109        let rows_in_page = 1000_u64;
5110        let bytes_per_value = 4_u64;
5111
5112        // Create simulated data
5113        let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5114        let value_data = vec![0u8; 4000]; // Dummy value data
5115        let mut full_data = vec![0u8; 1000]; // Padding before rep index
5116        full_data.extend_from_slice(&rep_index_data);
5117        full_data.extend_from_slice(&value_data);
5118
5119        let data = bytes::Bytes::from(full_data);
5120        let io = Arc::new(SimulatedScheduler::new(data));
5121
5122        // Test 1: With caching disabled
5123        let mut scheduler_no_cache = FullZipScheduler {
5124            data_buf_position: 0,
5125            rep_index: Some(FullZipRepIndexDetails {
5126                buf_position: 1000,
5127                bytes_per_value,
5128            }),
5129            priority: 0,
5130            rows_in_page,
5131            bits_per_offset: 32,
5132            details: Arc::new(FullZipDecodeDetails {
5133                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5134                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5135                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5136                max_rep: 0,
5137                max_visible_def: 0,
5138            }),
5139            cached_state: None,
5140            enable_cache: false, // Caching disabled
5141        };
5142
5143        let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5144        let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5145
5146        // Should return NoCachedPageData when caching is disabled
5147        assert!(
5148            cached_data
5149                .as_arc_any()
5150                .downcast_ref::<super::NoCachedPageData>()
5151                .is_some(),
5152            "With enable_cache=false, should return NoCachedPageData"
5153        );
5154
5155        // Test 2: With caching enabled
5156        let mut scheduler_with_cache = FullZipScheduler {
5157            data_buf_position: 0,
5158            rep_index: Some(FullZipRepIndexDetails {
5159                buf_position: 1000,
5160                bytes_per_value,
5161            }),
5162            priority: 0,
5163            rows_in_page,
5164            bits_per_offset: 32,
5165            details: Arc::new(FullZipDecodeDetails {
5166                value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5167                def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5168                ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5169                max_rep: 0,
5170                max_visible_def: 0,
5171            }),
5172            cached_state: None,
5173            enable_cache: true, // Caching enabled
5174        };
5175
5176        let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5177
5178        // Should return FullZipCacheableState when caching is enabled
5179        assert!(
5180            cached_data2
5181                .as_arc_any()
5182                .downcast_ref::<super::FullZipCacheableState>()
5183                .is_some(),
5184            "With enable_cache=true, should return FullZipCacheableState"
5185        );
5186    }
5187}